feat: execute corestore backups
This commit is contained in:
@@ -157,6 +157,8 @@ volumes: [
|
|||||||
|
|
||||||
If `name` is omitted, Coreflow derives a stable Docker volume name from the service id and mount path. During service creation it sends a Docker volume mount with `DriverConfig.Name = 'corestore'`, plus service metadata as driver options and volume labels.
|
If `name` is omitted, Coreflow derives a stable Docker volume name from the service id and mount path. During service creation it sends a Docker volume mount with `DriverConfig.Name = 'corestore'`, plus service metadata as driver options and volume labels.
|
||||||
|
|
||||||
|
Coreflow also exposes Cloudly-triggered backup handlers over its TypedSocket connection. `executeServiceBackup` snapshots corestore volumes plus provisioned smartdb/smartstorage resources, and `executeServiceRestore` restores those snapshots back into the service's corestore resources.
|
||||||
|
|
||||||
## Coretraffic Integration
|
## Coretraffic Integration
|
||||||
|
|
||||||
Coreflow starts an internal SmartServe/TypedSocket server on port `3000`. Coretraffic is expected to connect to that server and tag its connection as `coretraffic`.
|
Coreflow starts an internal SmartServe/TypedSocket server on port `3000`. Coretraffic is expected to connect to that server and tag its connection as `coretraffic`.
|
||||||
|
|||||||
@@ -0,0 +1,272 @@
|
|||||||
|
import type { Coreflow } from './coreflow.classes.coreflow.js';
|
||||||
|
import * as plugins from './coreflow.plugins.js';
|
||||||
|
import * as crypto from 'node:crypto';
|
||||||
|
|
||||||
|
type TBackupSnapshotType = 'volume' | 'database' | 'objectstorage';
|
||||||
|
|
||||||
|
type TBackupSnapshot = {
|
||||||
|
type: TBackupSnapshotType;
|
||||||
|
snapshotId: string;
|
||||||
|
snapshotName?: string;
|
||||||
|
originalSize: number;
|
||||||
|
storedSize: number;
|
||||||
|
createdAt: number;
|
||||||
|
tags?: Record<string, string>;
|
||||||
|
volumeName?: string;
|
||||||
|
mountPath?: string;
|
||||||
|
resourceName?: string;
|
||||||
|
databaseName?: string;
|
||||||
|
bucketName?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
type TServiceVolumeConfig = {
|
||||||
|
name?: string;
|
||||||
|
source?: string;
|
||||||
|
mountPath?: string;
|
||||||
|
target?: string;
|
||||||
|
containerFsPath?: string;
|
||||||
|
driver?: string;
|
||||||
|
readOnly?: boolean;
|
||||||
|
backup?: boolean;
|
||||||
|
options?: Record<string, string>;
|
||||||
|
};
|
||||||
|
|
||||||
|
export class CoreflowBackupManager {
|
||||||
|
public coreflowRef: Coreflow;
|
||||||
|
|
||||||
|
constructor(coreflowRefArg: Coreflow) {
|
||||||
|
this.coreflowRef = coreflowRefArg;
|
||||||
|
this.coreflowRef.typedrouter.addTypedHandler(
|
||||||
|
new plugins.typedrequest.TypedHandler<any>('executeServiceBackup', async (requestArg) => {
|
||||||
|
return await this.executeServiceBackup(requestArg);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
this.coreflowRef.typedrouter.addTypedHandler(
|
||||||
|
new plugins.typedrequest.TypedHandler<any>('executeServiceRestore', async (requestArg) => {
|
||||||
|
return await this.executeServiceRestore(requestArg);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async start() {}
|
||||||
|
|
||||||
|
public async stop() {}
|
||||||
|
|
||||||
|
public async executeServiceBackup(requestArg: {
|
||||||
|
backupId: string;
|
||||||
|
service: plugins.servezoneInterfaces.data.IService;
|
||||||
|
tags?: Record<string, string>;
|
||||||
|
}) {
|
||||||
|
await this.provisionCorestoreBindingsIfAvailable(requestArg.service);
|
||||||
|
|
||||||
|
const snapshots: TBackupSnapshot[] = [];
|
||||||
|
snapshots.push(...await this.snapshotServiceVolumes(requestArg));
|
||||||
|
snapshots.push(...await this.snapshotServiceResources(requestArg));
|
||||||
|
|
||||||
|
return { snapshots };
|
||||||
|
}
|
||||||
|
|
||||||
|
public async executeServiceRestore(requestArg: {
|
||||||
|
backupId: string;
|
||||||
|
service: plugins.servezoneInterfaces.data.IService;
|
||||||
|
snapshots: TBackupSnapshot[];
|
||||||
|
clear?: boolean;
|
||||||
|
resourceTypes?: TBackupSnapshotType[];
|
||||||
|
}) {
|
||||||
|
await this.provisionCorestoreBindingsIfAvailable(requestArg.service);
|
||||||
|
|
||||||
|
const selectedSnapshots = (requestArg.snapshots || []).filter((snapshotArg) => {
|
||||||
|
return !requestArg.resourceTypes?.length || requestArg.resourceTypes.includes(snapshotArg.type);
|
||||||
|
});
|
||||||
|
const restored: TBackupSnapshot[] = [];
|
||||||
|
|
||||||
|
for (const snapshot of selectedSnapshots.filter((snapshotArg) => snapshotArg.type === 'volume')) {
|
||||||
|
await this.createCorestoreVolume(requestArg.service, {
|
||||||
|
source: snapshot.volumeName,
|
||||||
|
mountPath: snapshot.mountPath || '/data',
|
||||||
|
backup: true,
|
||||||
|
});
|
||||||
|
await this.postCorestore('/volumes/restore', {
|
||||||
|
name: snapshot.volumeName,
|
||||||
|
snapshotId: snapshot.snapshotId,
|
||||||
|
clear: requestArg.clear,
|
||||||
|
});
|
||||||
|
restored.push(snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
const resourceSnapshots = selectedSnapshots.filter((snapshotArg) => {
|
||||||
|
return snapshotArg.type === 'database' || snapshotArg.type === 'objectstorage';
|
||||||
|
});
|
||||||
|
if (resourceSnapshots.length > 0) {
|
||||||
|
await this.postCorestore('/resources/restore', {
|
||||||
|
serviceId: requestArg.service.id,
|
||||||
|
snapshots: resourceSnapshots.map((snapshotArg) => ({
|
||||||
|
capability: snapshotArg.type,
|
||||||
|
resourceName: snapshotArg.resourceName || '',
|
||||||
|
snapshotId: snapshotArg.snapshotId,
|
||||||
|
snapshotName: snapshotArg.snapshotName,
|
||||||
|
originalSize: snapshotArg.originalSize,
|
||||||
|
storedSize: snapshotArg.storedSize,
|
||||||
|
createdAt: snapshotArg.createdAt,
|
||||||
|
tags: snapshotArg.tags || {},
|
||||||
|
databaseName: snapshotArg.databaseName,
|
||||||
|
bucketName: snapshotArg.bucketName,
|
||||||
|
})),
|
||||||
|
});
|
||||||
|
restored.push(...resourceSnapshots);
|
||||||
|
}
|
||||||
|
|
||||||
|
return { restored };
|
||||||
|
}
|
||||||
|
|
||||||
|
private async provisionCorestoreBindingsIfAvailable(
|
||||||
|
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||||
|
) {
|
||||||
|
if (!(this.coreflowRef.cloudlyConnector as any).cloudlyApiClient) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await this.coreflowRef.platformManager.provisionBindingsForService(serviceArg);
|
||||||
|
}
|
||||||
|
|
||||||
|
private getCoreStoreControlUrl() {
|
||||||
|
return (process.env.CORESTORE_CONTROL_URL || 'http://corestore:3000').replace(/\/+$/, '');
|
||||||
|
}
|
||||||
|
|
||||||
|
private getCoreStoreApiToken() {
|
||||||
|
return process.env.CORESTORE_API_TOKEN;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async postCorestore<T = any>(pathArg: string, bodyArg: unknown): Promise<T> {
|
||||||
|
const token = this.getCoreStoreApiToken();
|
||||||
|
const response = await fetch(`${this.getCoreStoreControlUrl()}${pathArg}`, {
|
||||||
|
method: 'POST',
|
||||||
|
headers: {
|
||||||
|
'content-type': 'application/json',
|
||||||
|
...(token ? { authorization: `Bearer ${token}` } : {}),
|
||||||
|
},
|
||||||
|
body: JSON.stringify(bodyArg),
|
||||||
|
});
|
||||||
|
const responseText = await response.text();
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`CoreStore request failed ${response.status}: ${responseText}`);
|
||||||
|
}
|
||||||
|
return responseText ? JSON.parse(responseText) as T : ({} as T);
|
||||||
|
}
|
||||||
|
|
||||||
|
private getDockerSafeName(valueArg: string, maxLengthArg = 64) {
|
||||||
|
const safeName = valueArg
|
||||||
|
.replace(/[^a-zA-Z0-9_.-]+/g, '-')
|
||||||
|
.replace(/^[^a-zA-Z0-9]+|[^a-zA-Z0-9]+$/g, '')
|
||||||
|
.slice(0, maxLengthArg)
|
||||||
|
.replace(/[^a-zA-Z0-9]+$/g, '');
|
||||||
|
return safeName || 'resource';
|
||||||
|
}
|
||||||
|
|
||||||
|
private getServiceVolumeConfigs(serviceArg: plugins.servezoneInterfaces.data.IService) {
|
||||||
|
const serviceData = serviceArg.data as plugins.servezoneInterfaces.data.IService['data'] & {
|
||||||
|
volumes?: TServiceVolumeConfig[];
|
||||||
|
};
|
||||||
|
return (serviceData.volumes || []).filter((volumeArg) => {
|
||||||
|
return Boolean(volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private getCoreStoreVolumeName(
|
||||||
|
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||||
|
volumeArg: TServiceVolumeConfig,
|
||||||
|
) {
|
||||||
|
const requestedName = volumeArg.source || volumeArg.name;
|
||||||
|
if (requestedName) {
|
||||||
|
return this.getDockerSafeName(requestedName, 120);
|
||||||
|
}
|
||||||
|
const mountPath = volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath || 'data';
|
||||||
|
const serviceName = this.getDockerSafeName(serviceArg.data.name, 36);
|
||||||
|
const mountName = this.getDockerSafeName(mountPath.replace(/^\/+/, '').replace(/\/+$/g, ''), 28);
|
||||||
|
const hash = crypto.createHash('sha1').update(`${serviceArg.id}:${mountPath}`).digest('hex').slice(0, 12);
|
||||||
|
return this.getDockerSafeName(`sz-${serviceName}-${mountName}-${hash}`, 120);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async createCorestoreVolume(
|
||||||
|
serviceArg: plugins.servezoneInterfaces.data.IService,
|
||||||
|
volumeArg: TServiceVolumeConfig,
|
||||||
|
) {
|
||||||
|
const mountPath = volumeArg.mountPath || volumeArg.target || volumeArg.containerFsPath || '/data';
|
||||||
|
const volumeName = this.getCoreStoreVolumeName(serviceArg, volumeArg);
|
||||||
|
await this.postCorestore('/volumes/create', {
|
||||||
|
name: volumeName,
|
||||||
|
serviceId: serviceArg.id,
|
||||||
|
serviceName: serviceArg.data.name,
|
||||||
|
mountPath,
|
||||||
|
backup: volumeArg.backup !== false,
|
||||||
|
options: volumeArg.options,
|
||||||
|
});
|
||||||
|
return { volumeName, mountPath };
|
||||||
|
}
|
||||||
|
|
||||||
|
private async snapshotServiceVolumes(requestArg: {
|
||||||
|
backupId: string;
|
||||||
|
service: plugins.servezoneInterfaces.data.IService;
|
||||||
|
tags?: Record<string, string>;
|
||||||
|
}) {
|
||||||
|
const snapshots: TBackupSnapshot[] = [];
|
||||||
|
const volumes = this.getServiceVolumeConfigs(requestArg.service).filter((volumeArg) => {
|
||||||
|
return volumeArg.driver === undefined || volumeArg.driver === 'corestore';
|
||||||
|
}).filter((volumeArg) => volumeArg.backup !== false);
|
||||||
|
|
||||||
|
for (const volume of volumes) {
|
||||||
|
const { volumeName, mountPath } = await this.createCorestoreVolume(requestArg.service, volume);
|
||||||
|
const response = await this.postCorestore<any>('/volumes/snapshot', {
|
||||||
|
name: volumeName,
|
||||||
|
snapshotName: requestArg.backupId,
|
||||||
|
tags: {
|
||||||
|
backupId: requestArg.backupId,
|
||||||
|
serviceId: requestArg.service.id,
|
||||||
|
serviceName: requestArg.service.data.name,
|
||||||
|
mountPath,
|
||||||
|
...(requestArg.tags || {}),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
snapshots.push({
|
||||||
|
type: 'volume',
|
||||||
|
volumeName,
|
||||||
|
mountPath,
|
||||||
|
snapshotId: response.snapshot.id,
|
||||||
|
snapshotName: requestArg.backupId,
|
||||||
|
originalSize: response.snapshot.originalSize,
|
||||||
|
storedSize: response.snapshot.storedSize,
|
||||||
|
createdAt: Date.now(),
|
||||||
|
tags: response.snapshot.tags,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return snapshots;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async snapshotServiceResources(requestArg: {
|
||||||
|
backupId: string;
|
||||||
|
service: plugins.servezoneInterfaces.data.IService;
|
||||||
|
tags?: Record<string, string>;
|
||||||
|
}) {
|
||||||
|
const response = await this.postCorestore<any>('/resources/snapshot', {
|
||||||
|
serviceId: requestArg.service.id,
|
||||||
|
snapshotName: requestArg.backupId,
|
||||||
|
tags: {
|
||||||
|
backupId: requestArg.backupId,
|
||||||
|
serviceId: requestArg.service.id,
|
||||||
|
serviceName: requestArg.service.data.name,
|
||||||
|
...(requestArg.tags || {}),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return (response.snapshots || []).map((snapshotArg: any) => ({
|
||||||
|
type: snapshotArg.capability,
|
||||||
|
resourceName: snapshotArg.resourceName,
|
||||||
|
databaseName: snapshotArg.databaseName,
|
||||||
|
bucketName: snapshotArg.bucketName,
|
||||||
|
snapshotId: snapshotArg.snapshotId,
|
||||||
|
snapshotName: snapshotArg.snapshotName,
|
||||||
|
originalSize: snapshotArg.originalSize,
|
||||||
|
storedSize: snapshotArg.storedSize,
|
||||||
|
createdAt: snapshotArg.createdAt,
|
||||||
|
tags: snapshotArg.tags,
|
||||||
|
})) as TBackupSnapshot[];
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import { CoretrafficConnector } from './coreflow.connector.coretrafficconnector.
|
|||||||
import { ExternalGatewayConnector } from './coreflow.connector.externalgateway.js';
|
import { ExternalGatewayConnector } from './coreflow.connector.externalgateway.js';
|
||||||
import { InternalServer } from './coreflow.classes.internalserver.js';
|
import { InternalServer } from './coreflow.classes.internalserver.js';
|
||||||
import { PlatformManager } from './coreflow.classes.platformmanager.js';
|
import { PlatformManager } from './coreflow.classes.platformmanager.js';
|
||||||
|
import { CoreflowBackupManager } from './coreflow.classes.backupmanager.js';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* the main Coreflow class
|
* the main Coreflow class
|
||||||
@@ -22,6 +23,7 @@ export class Coreflow {
|
|||||||
public externalGatewayConnector: ExternalGatewayConnector;
|
public externalGatewayConnector: ExternalGatewayConnector;
|
||||||
public clusterManager: ClusterManager;
|
public clusterManager: ClusterManager;
|
||||||
public platformManager: PlatformManager;
|
public platformManager: PlatformManager;
|
||||||
|
public backupManager: CoreflowBackupManager;
|
||||||
public taskManager: CoreflowTaskmanager;
|
public taskManager: CoreflowTaskmanager;
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
@@ -33,6 +35,7 @@ export class Coreflow {
|
|||||||
this.externalGatewayConnector = new ExternalGatewayConnector(this);
|
this.externalGatewayConnector = new ExternalGatewayConnector(this);
|
||||||
this.clusterManager = new ClusterManager(this);
|
this.clusterManager = new ClusterManager(this);
|
||||||
this.platformManager = new PlatformManager(this);
|
this.platformManager = new PlatformManager(this);
|
||||||
|
this.backupManager = new CoreflowBackupManager(this);
|
||||||
this.taskManager = new CoreflowTaskmanager(this);
|
this.taskManager = new CoreflowTaskmanager(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,6 +70,8 @@ export class Coreflow {
|
|||||||
console.log('cluster manager started!');
|
console.log('cluster manager started!');
|
||||||
await this.platformManager.start();
|
await this.platformManager.start();
|
||||||
console.log('platform manager started!');
|
console.log('platform manager started!');
|
||||||
|
await this.backupManager.start();
|
||||||
|
console.log('backup manager started!');
|
||||||
await this.taskManager.start();
|
await this.taskManager.start();
|
||||||
console.log('task manager started!');
|
console.log('task manager started!');
|
||||||
}
|
}
|
||||||
@@ -78,6 +83,7 @@ export class Coreflow {
|
|||||||
await this.cloudlyConnector.stop();
|
await this.cloudlyConnector.stop();
|
||||||
await this.clusterManager.stop();
|
await this.clusterManager.stop();
|
||||||
await this.platformManager.stop();
|
await this.platformManager.stop();
|
||||||
|
await this.backupManager.stop();
|
||||||
await this.taskManager.stop();
|
await this.taskManager.stop();
|
||||||
await this.internalServer.stop();
|
await this.internalServer.stop();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user