feat: add backup replication targets

This commit is contained in:
2026-05-07 17:44:31 +00:00
parent b0f0963143
commit 1792ea89e1
6 changed files with 496 additions and 6 deletions
+276 -5
View File
@@ -1,9 +1,43 @@
import type { Cloudly } from '../classes.cloudly.js';
import * as plugins from '../plugins.js';
import { BackupRecord } from './classes.backuprecord.js';
import { createBackupTargetWriterFromEnv, type IBackupTargetWriter } from './classes.replicationtarget.js';
export type TBackupStatus = 'pending' | 'running' | 'ready' | 'failed' | 'restoring' | 'restored';
export type TBackupStatus =
| 'pending'
| 'running'
| 'replicating'
| 'replicated'
| 'ready'
| 'failed'
| 'restoring'
| 'restored';
export type TBackupResourceType = 'volume' | 'database' | 'objectstorage';
export type TBackupReplicationTargetType = 's3' | 'smb';
export interface IBackupArchiveObject {
path: string;
size: number;
sha256: string;
}
export interface IBackupArchiveManifest {
version: 1;
backupId: string;
createdAt: number;
objects: IBackupArchiveObject[];
totalSize: number;
}
export interface IBackupReplicationResult {
targetType: TBackupReplicationTargetType;
targetPath: string;
manifestPath: string;
manifestSha256: string;
objectCount: number;
totalSize: number;
completedAt: number;
}
export interface IBackupSnapshotData {
type: TBackupResourceType;
@@ -28,6 +62,7 @@ export interface IBackupRecordData {
status: TBackupStatus;
trigger: 'manual' | 'scheduled';
snapshots: IBackupSnapshotData[];
replication?: IBackupReplicationResult;
createdAt: number;
updatedAt: number;
completedAt?: number;
@@ -44,6 +79,7 @@ export interface IBackupRecordData {
export class CloudlyBackupManager {
public typedrouter = new plugins.typedrequest.TypedRouter();
public cloudlyRef: Cloudly;
private backupTargetWriter?: IBackupTargetWriter;
get db() {
return this.cloudlyRef.mongodbConnector.smartdataDb;
@@ -93,6 +129,41 @@ export class CloudlyBackupManager {
};
}),
);
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>('prepareBackupReplication', async (requestArg) => {
await this.passClusterIdentity(requestArg);
return await this.prepareBackupReplication(requestArg);
}),
);
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>('uploadBackupArchiveObject', async (requestArg) => {
await this.passClusterIdentity(requestArg);
return await this.uploadBackupArchiveObject(requestArg);
}),
);
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>('completeBackupReplication', async (requestArg) => {
await this.passClusterIdentity(requestArg);
return await this.completeBackupReplication(requestArg);
}),
);
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>('getBackupArchiveManifest', async (requestArg) => {
await this.passClusterIdentity(requestArg);
return await this.getBackupArchiveManifest(requestArg);
}),
);
this.typedrouter.addTypedHandler(
new plugins.typedrequest.TypedHandler<any>('downloadBackupArchiveObject', async (requestArg) => {
await this.passClusterIdentity(requestArg);
return await this.downloadBackupArchiveObject(requestArg);
}),
);
}
public async start() {
@@ -190,9 +261,16 @@ export class CloudlyBackupManager {
backupId: backup.id,
service: await service.createSavableObject(),
tags: requestArg.tags,
replication: {
enabled: true,
},
}, backup.clusterId);
backup.snapshots = result.snapshots || [];
backup.status = 'ready';
if (!result.replication) {
throw new Error('Coreflow did not complete remote backup replication');
}
backup.replication = result.replication;
backup.status = 'replicated';
backup.completedAt = Date.now();
backup.updatedAt = Date.now();
await backup.save();
@@ -218,7 +296,7 @@ export class CloudlyBackupManager {
serviceId: serviceIdArg,
});
const completedBackups = backups
.filter((backupArg) => backupArg.status === 'ready' || backupArg.status === 'restored' || backupArg.status === 'failed')
.filter((backupArg) => backupArg.status === 'replicated' || backupArg.status === 'restored' || backupArg.status === 'failed')
.sort((a, b) => (b.createdAt || 0) - (a.createdAt || 0));
for (const backup of completedBackups.slice(keepLast)) {
await backup.delete();
@@ -235,7 +313,7 @@ export class CloudlyBackupManager {
if (!backup) {
throw new plugins.typedrequest.TypedResponseError(`Backup ${requestArg.backupId} not found`);
}
if (backup.status !== 'ready' && backup.status !== 'restored') {
if (backup.status !== 'replicated' && backup.status !== 'restored') {
throw new plugins.typedrequest.TypedResponseError(`Backup ${backup.id} is not restorable in status ${backup.status}`);
}
const service = await this.cloudlyRef.serviceManager.CService.getInstance({
@@ -245,6 +323,7 @@ export class CloudlyBackupManager {
throw new plugins.typedrequest.TypedResponseError(`Service ${backup.serviceId} not found`);
}
const previousStatus = backup.status;
backup.status = 'restoring';
backup.updatedAt = Date.now();
await backup.save();
@@ -256,6 +335,9 @@ export class CloudlyBackupManager {
snapshots: backup.snapshots || [],
clear: requestArg.clear,
resourceTypes: requestArg.resourceTypes,
replication: {
enabled: true,
},
}, backup.clusterId);
backup.status = 'restored';
backup.restoreHistory = [
@@ -268,7 +350,7 @@ export class CloudlyBackupManager {
backup.updatedAt = Date.now();
await backup.save();
} catch (error) {
backup.status = 'ready';
backup.status = previousStatus;
backup.restoreHistory = [
...(backup.restoreHistory || []),
{
@@ -285,6 +367,188 @@ export class CloudlyBackupManager {
return await backup.createSavableObject();
}
private getBackupTargetWriter() {
if (!this.backupTargetWriter) {
this.backupTargetWriter = createBackupTargetWriterFromEnv();
}
return this.backupTargetWriter;
}
private normalizeTargetPath(pathArg: string) {
const normalized = plugins.path.posix
.normalize(String(pathArg || '').replace(/\\/g, '/').trim())
.replace(/^\/+/, '');
if (!normalized || normalized === '.' || normalized.startsWith('../') || normalized.includes('/../')) {
throw new Error(`Invalid backup target path ${pathArg}`);
}
return normalized;
}
private getBackupTargetPath(backupArg: BackupRecord) {
return this.normalizeTargetPath([
process.env.CLOUDLY_BACKUP_TARGET_PREFIX || 'serve.zone-backups',
'clusters',
backupArg.clusterId || 'default',
'services',
backupArg.serviceId,
'backups',
backupArg.id,
].filter(Boolean).join('/'));
}
private getArchiveObjectTargetPath(backupArg: BackupRecord, objectPathArg: string) {
return this.normalizeTargetPath(`${this.getBackupTargetPath(backupArg)}/archive/${objectPathArg}`);
}
private getManifestTargetPath(backupArg: BackupRecord) {
return this.normalizeTargetPath(`${this.getBackupTargetPath(backupArg)}/manifest.json`);
}
private getSha256(contentsArg: Buffer) {
return plugins.crypto.createHash('sha256').update(contentsArg).digest('hex');
}
private assertObjectMatches(objectArg: IBackupArchiveObject, contentsArg: Buffer) {
if (contentsArg.length !== objectArg.size || this.getSha256(contentsArg) !== objectArg.sha256) {
throw new Error(`Backup archive object checksum mismatch for ${objectArg.path}`);
}
}
private createManifestBuffer(backupArg: BackupRecord, manifestArg: IBackupArchiveManifest) {
return Buffer.from(`${JSON.stringify({
version: 1,
backupId: backupArg.id,
serviceId: backupArg.serviceId,
serviceName: backupArg.serviceName,
clusterId: backupArg.clusterId,
archive: manifestArg,
}, null, 2)}\n`);
}
private async getBackupForClusterRequest(backupIdArg: string, identityArg: plugins.servezoneInterfaces.data.IIdentity) {
const backup = await BackupRecord.getInstance({ id: backupIdArg });
if (!backup) {
throw new plugins.typedrequest.TypedResponseError(`Backup ${backupIdArg} not found`);
}
const identityClusterId = (identityArg as any).clusterId;
if (backup.clusterId && identityClusterId && backup.clusterId !== identityClusterId) {
throw new plugins.typedrequest.TypedResponseError(`Backup ${backupIdArg} does not belong to this cluster`);
}
return backup;
}
public async prepareBackupReplication(requestArg: {
identity: plugins.servezoneInterfaces.data.IIdentity;
backupId: string;
manifest: IBackupArchiveManifest;
}) {
const backup = await this.getBackupForClusterRequest(requestArg.backupId, requestArg.identity);
const targetWriter = this.getBackupTargetWriter();
const missingObjects: IBackupArchiveObject[] = [];
for (const object of requestArg.manifest.objects || []) {
const targetPath = this.getArchiveObjectTargetPath(backup, object.path);
if (!await targetWriter.hasObject(targetPath, object)) {
missingObjects.push(object);
}
}
backup.status = 'replicating';
backup.updatedAt = Date.now();
await backup.save();
return { missingObjects };
}
public async uploadBackupArchiveObject(requestArg: {
identity: plugins.servezoneInterfaces.data.IIdentity;
backupId: string;
object: IBackupArchiveObject;
contentsBase64: string;
}) {
const backup = await this.getBackupForClusterRequest(requestArg.backupId, requestArg.identity);
const contents = Buffer.from(requestArg.contentsBase64 || '', 'base64');
this.assertObjectMatches(requestArg.object, contents);
await this.getBackupTargetWriter().putObject(
this.getArchiveObjectTargetPath(backup, requestArg.object.path),
requestArg.object,
contents,
);
return { accepted: true };
}
public async completeBackupReplication(requestArg: {
identity: plugins.servezoneInterfaces.data.IIdentity;
backupId: string;
manifest: IBackupArchiveManifest;
}) {
const backup = await this.getBackupForClusterRequest(requestArg.backupId, requestArg.identity);
const targetWriter = this.getBackupTargetWriter();
for (const object of requestArg.manifest.objects || []) {
const targetPath = this.getArchiveObjectTargetPath(backup, object.path);
if (!await targetWriter.hasObject(targetPath, object)) {
throw new Error(`Remote backup target is missing archive object ${object.path}`);
}
}
const manifestPath = this.getManifestTargetPath(backup);
const manifestBuffer = this.createManifestBuffer(backup, requestArg.manifest);
const manifestObject = {
path: 'manifest.json',
size: manifestBuffer.length,
sha256: this.getSha256(manifestBuffer),
};
await targetWriter.putObject(manifestPath, manifestObject, manifestBuffer);
const replication: IBackupReplicationResult = {
targetType: targetWriter.targetType,
targetPath: this.getBackupTargetPath(backup),
manifestPath,
manifestSha256: manifestObject.sha256,
objectCount: requestArg.manifest.objects.length,
totalSize: requestArg.manifest.totalSize,
completedAt: Date.now(),
};
backup.replication = replication;
backup.status = 'replicated';
backup.completedAt = replication.completedAt;
backup.updatedAt = replication.completedAt;
await backup.save();
return { replication };
}
public async getBackupArchiveManifest(requestArg: {
identity: plugins.servezoneInterfaces.data.IIdentity;
backupId: string;
}) {
const backup = await this.getBackupForClusterRequest(requestArg.backupId, requestArg.identity);
if (!backup.replication) {
throw new plugins.typedrequest.TypedResponseError(`Backup ${backup.id} has not been replicated`);
}
const manifestBuffer = await this.getBackupTargetWriter().readObject(backup.replication.manifestPath);
if (this.getSha256(manifestBuffer) !== backup.replication.manifestSha256) {
throw new Error(`Remote manifest checksum mismatch for backup ${backup.id}`);
}
const parsedManifest = JSON.parse(manifestBuffer.toString('utf8'));
return { manifest: parsedManifest.archive as IBackupArchiveManifest };
}
public async downloadBackupArchiveObject(requestArg: {
identity: plugins.servezoneInterfaces.data.IIdentity;
backupId: string;
object: IBackupArchiveObject;
}) {
const backup = await this.getBackupForClusterRequest(requestArg.backupId, requestArg.identity);
if (!backup.replication) {
throw new plugins.typedrequest.TypedResponseError(`Backup ${backup.id} has not been replicated`);
}
const contents = await this.getBackupTargetWriter().readObject(
this.getArchiveObjectTargetPath(backup, requestArg.object.path),
);
this.assertObjectMatches(requestArg.object, contents);
return {
object: requestArg.object,
contentsBase64: contents.toString('base64'),
};
}
private async fireCoreflowRequest(methodArg: string, payloadArg: Record<string, unknown>, clusterIdArg?: string) {
const typedsocket = this.cloudlyRef.server.typedServer?.typedsocket;
if (!typedsocket) {
@@ -317,4 +581,11 @@ export class CloudlyBackupManager {
this.cloudlyRef.authManager.adminIdentityGuard,
]);
}
private async passClusterIdentity(requestData: { identity: plugins.servezoneInterfaces.data.IIdentity }) {
await this.passValidIdentity(requestData);
if (requestData.identity.role !== 'cluster') {
throw new plugins.typedrequest.TypedResponseError('Cluster identity required');
}
}
}
@@ -28,6 +28,9 @@ export class BackupRecord extends plugins.smartdata.SmartDataDbDoc<
@plugins.smartdata.svDb()
public snapshots!: IBackupRecordData['snapshots'];
@plugins.smartdata.svDb()
public replication?: IBackupRecordData['replication'];
@plugins.smartdata.svDb()
public createdAt!: number;
@@ -0,0 +1,194 @@
import * as plugins from '../plugins.js';
type TArchiveObject = {
path: string;
size: number;
sha256: string;
};
type TTargetType = 's3' | 'smb';
export interface IBackupTargetWriter {
targetType: TTargetType;
hasObject(pathArg: string, objectArg: TArchiveObject): Promise<boolean>;
putObject(pathArg: string, objectArg: TArchiveObject, contentsArg: Buffer): Promise<void>;
readObject(pathArg: string): Promise<Buffer>;
}
const requiredEnv = (nameArg: string) => {
const value = process.env[nameArg];
if (!value) {
throw new Error(`Missing required backup target env ${nameArg}`);
}
return value;
};
const normalizeRemotePath = (pathArg: string) => {
const normalized = plugins.path.posix
.normalize(String(pathArg || '').replace(/\\/g, '/').trim())
.replace(/^\/+/, '');
if (!normalized || normalized === '.' || normalized.startsWith('../') || normalized.includes('/../')) {
throw new Error(`Invalid backup target path ${pathArg}`);
}
return normalized;
};
const getBufferSha256 = (contentsArg: Buffer) => {
return plugins.crypto.createHash('sha256').update(contentsArg).digest('hex');
};
const assertObjectMatches = (objectArg: TArchiveObject, contentsArg: Buffer, labelArg: string) => {
const sha256 = getBufferSha256(contentsArg);
if (contentsArg.length !== objectArg.size || sha256 !== objectArg.sha256) {
throw new Error(`Backup target checksum mismatch for ${labelArg}`);
}
};
const objectMatches = (objectArg: TArchiveObject, contentsArg: Buffer) => {
return contentsArg.length === objectArg.size && getBufferSha256(contentsArg) === objectArg.sha256;
};
class S3BackupTargetWriter implements IBackupTargetWriter {
public targetType: TTargetType = 's3';
private bucketPromise?: Promise<any>;
private async getBucket() {
if (!this.bucketPromise) {
this.bucketPromise = (async () => {
const smartBucket = new plugins.smartbucket.SmartBucket({
endpoint: requiredEnv('CLOUDLY_BACKUP_S3_ENDPOINT'),
accessKey: requiredEnv('CLOUDLY_BACKUP_S3_ACCESS_KEY'),
accessSecret: requiredEnv('CLOUDLY_BACKUP_S3_SECRET_KEY'),
region: process.env.CLOUDLY_BACKUP_S3_REGION || 'us-east-1',
...(process.env.CLOUDLY_BACKUP_S3_PORT
? { port: Number(process.env.CLOUDLY_BACKUP_S3_PORT) }
: {}),
...(process.env.CLOUDLY_BACKUP_S3_USE_SSL
? { useSsl: process.env.CLOUDLY_BACKUP_S3_USE_SSL !== 'false' }
: {}),
} as any);
const bucketName = requiredEnv('CLOUDLY_BACKUP_S3_BUCKET');
return await smartBucket.getBucketByName(bucketName) || await smartBucket.createBucket(bucketName);
})();
}
return await this.bucketPromise;
}
public async hasObject(pathArg: string, objectArg: TArchiveObject) {
try {
return objectMatches(objectArg, await this.readObject(pathArg));
} catch {
return false;
}
}
public async putObject(pathArg: string, objectArg: TArchiveObject, contentsArg: Buffer) {
const targetPath = normalizeRemotePath(pathArg);
assertObjectMatches(objectArg, contentsArg, targetPath);
const bucket = await this.getBucket();
const tempPath = `${targetPath}.upload-${Date.now()}-${plugins.smartunique.shortId()}.tmp`;
try {
await bucket.fastPut({ path: tempPath, contents: contentsArg, overwrite: true });
assertObjectMatches(objectArg, await bucket.fastGet({ path: tempPath }), tempPath);
await bucket.fastMove({ sourcePath: tempPath, destinationPath: targetPath, overwrite: true });
assertObjectMatches(objectArg, await bucket.fastGet({ path: targetPath }), targetPath);
} finally {
await bucket.fastRemove({ path: tempPath }).catch(() => {});
}
}
public async readObject(pathArg: string) {
const bucket = await this.getBucket();
return await bucket.fastGet({ path: normalizeRemotePath(pathArg) });
}
}
class SmbBackupTargetWriter implements IBackupTargetWriter {
public targetType: TTargetType = 'smb';
private clientPromise?: Promise<plugins.smartsamba.SambaClient>;
private async getClient() {
if (!this.clientPromise) {
this.clientPromise = (async () => {
const client = new plugins.smartsamba.SambaClient({
host: requiredEnv('CLOUDLY_BACKUP_SMB_HOST'),
...(process.env.CLOUDLY_BACKUP_SMB_PORT
? { port: Number(process.env.CLOUDLY_BACKUP_SMB_PORT) }
: {}),
auth: {
...(process.env.CLOUDLY_BACKUP_SMB_USERNAME
? { username: process.env.CLOUDLY_BACKUP_SMB_USERNAME }
: {}),
...(process.env.CLOUDLY_BACKUP_SMB_PASSWORD
? { password: process.env.CLOUDLY_BACKUP_SMB_PASSWORD }
: {}),
...(process.env.CLOUDLY_BACKUP_SMB_DOMAIN
? { domain: process.env.CLOUDLY_BACKUP_SMB_DOMAIN }
: {}),
},
});
await client.start();
return client;
})();
}
return await this.clientPromise;
}
private getShare() {
return requiredEnv('CLOUDLY_BACKUP_SMB_SHARE');
}
private async ensureParentDirectory(pathArg: string) {
const client = await this.getClient();
const parent = plugins.path.posix.dirname(pathArg);
if (!parent || parent === '.') {
return;
}
const parts = parent.split('/').filter(Boolean);
let current = '';
for (const part of parts) {
current = current ? `${current}/${part}` : part;
await client.createDirectory(this.getShare(), current).catch(() => {});
}
}
public async hasObject(pathArg: string, objectArg: TArchiveObject) {
try {
return objectMatches(objectArg, await this.readObject(pathArg));
} catch {
return false;
}
}
public async putObject(pathArg: string, objectArg: TArchiveObject, contentsArg: Buffer) {
const targetPath = normalizeRemotePath(pathArg);
assertObjectMatches(objectArg, contentsArg, targetPath);
const client = await this.getClient();
const share = this.getShare();
const tempPath = `${targetPath}.upload-${Date.now()}-${plugins.smartunique.shortId()}.tmp`;
await this.ensureParentDirectory(targetPath);
try {
await client.writeFile(share, tempPath, contentsArg);
assertObjectMatches(objectArg, await client.readFile(share, tempPath), tempPath);
await client.deleteFile(share, targetPath).catch(() => {});
await client.rename(share, tempPath, targetPath);
assertObjectMatches(objectArg, await client.readFile(share, targetPath), targetPath);
} finally {
await client.deleteFile(share, tempPath).catch(() => {});
}
}
public async readObject(pathArg: string) {
return await (await this.getClient()).readFile(this.getShare(), normalizeRemotePath(pathArg));
}
}
export const createBackupTargetWriterFromEnv = (): IBackupTargetWriter => {
const targetType = process.env.CLOUDLY_BACKUP_TARGET_TYPE as TTargetType | undefined;
if (targetType === 's3') {
return new S3BackupTargetWriter();
}
if (targetType === 'smb') {
return new SmbBackupTargetWriter();
}
throw new Error('No remote backup target configured. Set CLOUDLY_BACKUP_TARGET_TYPE to s3 or smb.');
};