Files
testing/scenarios/corestore-volume-driver/scenario.ts
T

455 lines
17 KiB
TypeScript

import * as http from 'node:http';
import * as net from 'node:net';
import { access, mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { dirname, join } from 'node:path';
import { CoreStore } from '../../../corestore/ts/index.js';
import { Coreflow } from '../../../coreflow/ts/coreflow.classes.coreflow.js';
import type { IService } from '../../../interfaces/ts/data/service.js';
const scenarioName = 'corestore-volume-driver';
const smokeId = `corestore-volume-${Date.now().toString(36)}`;
const buildDir = join(tmpdir(), smokeId);
const apiToken = `${smokeId}-token`;
function assert(conditionArg: unknown, messageArg: string): asserts conditionArg {
if (!conditionArg) {
throw new Error(messageArg);
}
}
const getFreePort = async () => {
return await new Promise<number>((resolveArg, rejectArg) => {
const server = net.createServer();
server.on('error', rejectArg);
server.listen(0, '127.0.0.1', () => {
const address = server.address();
if (typeof address !== 'object' || !address) {
server.close(() => rejectArg(new Error('Could not allocate a port')));
return;
}
server.close(() => resolveArg(address.port));
});
});
};
const requestJson = async <T>(optionsArg: http.RequestOptions, bodyArg?: unknown) => {
return await new Promise<T>((resolveArg, rejectArg) => {
const payload = bodyArg === undefined ? '' : JSON.stringify(bodyArg);
const headers = {
...(bodyArg === undefined
? {}
: {
'content-type': 'application/json',
'content-length': Buffer.byteLength(payload),
}),
...(optionsArg.headers || {}),
};
const request = http.request(
{
timeout: 30000,
...optionsArg,
headers,
},
(response) => {
const chunks: Buffer[] = [];
response.on('data', (chunkArg) => {
chunks.push(Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg));
});
response.on('end', () => {
const responseText = Buffer.concat(chunks).toString('utf8');
if ((response.statusCode || 500) >= 400) {
rejectArg(new Error(`HTTP ${response.statusCode}: ${responseText}`));
return;
}
resolveArg(responseText ? JSON.parse(responseText) as T : ({} as T));
});
},
);
request.on('timeout', () => request.destroy(new Error(`Request timed out: ${optionsArg.path}`)));
request.on('error', rejectArg);
request.end(payload);
});
};
const controlGet = async <T>(controlPortArg: number, pathArg: string) => {
return await requestJson<T>({
method: 'GET',
hostname: '127.0.0.1',
port: controlPortArg,
path: pathArg,
headers: {
authorization: `Bearer ${apiToken}`,
},
});
};
const controlPost = async <T>(controlPortArg: number, pathArg: string, bodyArg: unknown) => {
return await requestJson<T>(
{
method: 'POST',
hostname: '127.0.0.1',
port: controlPortArg,
path: pathArg,
headers: {
authorization: `Bearer ${apiToken}`,
},
},
bodyArg,
);
};
const pluginPost = async <T>(socketPathArg: string, pathArg: string, bodyArg: unknown) => {
return await requestJson<T>(
{
method: 'POST',
socketPath: socketPathArg,
path: pathArg,
},
bodyArg,
);
};
const pathExists = async (pathArg: string) => {
try {
await access(pathArg);
return true;
} catch {
return false;
}
};
const assertCoreflowVolumeMounts = () => {
const coreflow = new Coreflow();
const service: IService = {
id: 'svc-corestore-volume-test',
data: {
name: 'app.volume-test',
description: 'Corestore volume mount spec test service',
imageId: 'image-corestore-volume-test',
imageVersion: 'latest',
environment: {},
secretBundleId: 'secret-corestore-volume-test',
serviceCategory: 'workload',
deploymentStrategy: 'custom',
scaleFactor: 1,
balancingStrategy: 'round-robin',
ports: {
web: 80,
},
volumes: [
{
mountPath: '/data',
backup: true,
},
{
source: 'shared-assets',
mountPath: '/assets',
readOnly: true,
options: {
tier: 'gold',
},
},
],
resources: {
memorySizeLimitMB: 128,
volumeMounts: [
{
hostFsPath: '/tmp/legacy-corestore-volume-test',
containerFsPath: '/legacy',
},
],
},
domains: [],
deploymentIds: [],
},
};
const clusterManager = coreflow.clusterManager as unknown as {
getServiceDockerMounts: (serviceArg: IService) => Array<Record<string, any>>;
getServiceVolumeHash: (serviceArg: IService) => string;
};
const mounts = clusterManager.getServiceDockerMounts(service);
const volumeHash = clusterManager.getServiceVolumeHash(service);
assert(volumeHash.length === 64, `Expected non-empty volume hash, got ${volumeHash}`);
assert(mounts.some((mountArg) => mountArg.Type === 'bind' && mountArg.Target === '/legacy'), 'Legacy bind mount was not preserved');
const dataMount = mounts.find((mountArg) => mountArg.Type === 'volume' && mountArg.Target === '/data');
assert(dataMount, `Missing /data volume mount: ${JSON.stringify(mounts)}`);
assert(dataMount.VolumeOptions.DriverConfig.Name === 'corestore', 'Default volume driver should be corestore');
assert(dataMount.VolumeOptions.DriverConfig.Options.serviceId === service.id, 'Missing serviceId driver option');
assert(dataMount.VolumeOptions.DriverConfig.Options.serviceName === service.data.name, 'Missing serviceName driver option');
assert(dataMount.VolumeOptions.DriverConfig.Options.mountPath === '/data', 'Missing mountPath driver option');
assert(dataMount.VolumeOptions.DriverConfig.Options.backup === 'true', 'Missing backup driver option');
assert(typeof dataMount.Source === 'string' && dataMount.Source.startsWith('sz-app.volume-test-data-'), `Unexpected generated volume name: ${dataMount.Source}`);
const sharedMount = mounts.find((mountArg) => mountArg.Type === 'volume' && mountArg.Target === '/assets');
assert(sharedMount, `Missing /assets volume mount: ${JSON.stringify(mounts)}`);
assert(sharedMount.Source === 'shared-assets', 'Explicit volume source was not preserved');
assert(sharedMount.ReadOnly === true, 'Read-only flag was not preserved');
assert(sharedMount.VolumeOptions.DriverConfig.Options.tier === 'gold', 'Custom driver option was not preserved');
const serviceWithoutVolumes: IService = {
...service,
data: {
...service.data,
volumes: [],
resources: undefined,
},
};
assert(clusterManager.getServiceVolumeHash(serviceWithoutVolumes) === '', 'Services without volumes should not get a volume hash label');
};
const assertCorestoreVolumeDriver = async () => {
const socketPath = join(buildDir, 'plugins', 'corestore.sock');
const dataDir = join(buildDir, 'data');
await mkdir(dirname(socketPath), { recursive: true });
const controlPort = await getFreePort();
const s3Port = await getFreePort();
const dbPort = await getFreePort();
const corestore = new CoreStore({
dataDir,
bindAddress: '127.0.0.1',
publicHost: '127.0.0.1',
controlPort,
s3Port,
dbPort,
apiToken,
volumePluginSocketPath: socketPath,
});
try {
await corestore.start();
const health = await controlGet<any>(controlPort, '/health');
assert(health.ok === true, `Corestore health was not ok: ${JSON.stringify(health)}`);
assert(health.volumes?.running === true, `Volume plugin health was not reported: ${JSON.stringify(health)}`);
const activate = await pluginPost<any>(socketPath, '/Plugin.Activate', {});
assert(activate.Implements?.includes('VolumeDriver'), `Plugin activation failed: ${JSON.stringify(activate)}`);
const capabilities = await pluginPost<any>(socketPath, '/VolumeDriver.Capabilities', {});
assert(capabilities.Capabilities?.Scope === 'local', `Unexpected plugin capabilities: ${JSON.stringify(capabilities)}`);
const create = await pluginPost<any>(socketPath, '/VolumeDriver.Create', {
Name: 'smoke-volume',
Opts: {
serviceId: 'svc-smoke',
serviceName: 'smoke-service',
mountPath: '/data',
backup: 'true',
},
});
assert(create.Err === '', `Volume create failed: ${JSON.stringify(create)}`);
const mount = await pluginPost<any>(socketPath, '/VolumeDriver.Mount', {
Name: 'smoke-volume',
ID: 'container-smoke',
});
assert(mount.Err === '' && typeof mount.Mountpoint === 'string', `Volume mount failed: ${JSON.stringify(mount)}`);
const removeWhileMounted = await pluginPost<any>(socketPath, '/VolumeDriver.Remove', {
Name: 'smoke-volume',
});
assert(typeof removeWhileMounted.Err === 'string' && removeWhileMounted.Err.includes('still mounted'), 'Mounted volume removal should be rejected');
const pathResponse = await pluginPost<any>(socketPath, '/VolumeDriver.Path', {
Name: 'smoke-volume',
});
assert(pathResponse.Mountpoint === mount.Mountpoint, `Volume path mismatch: ${JSON.stringify(pathResponse)}`);
const getResponse = await pluginPost<any>(socketPath, '/VolumeDriver.Get', {
Name: 'smoke-volume',
});
assert(getResponse.Volume?.Status?.serviceId === 'svc-smoke', `Volume metadata missing: ${JSON.stringify(getResponse)}`);
const pluginList = await pluginPost<any>(socketPath, '/VolumeDriver.List', {});
assert(pluginList.Volumes?.some((volumeArg: any) => volumeArg.Name === 'smoke-volume'), `Plugin list missing volume: ${JSON.stringify(pluginList)}`);
const controlList = await controlGet<any>(controlPort, '/volumes');
assert(controlList.volumes?.some((volumeArg: any) => volumeArg.Name === 'smoke-volume'), `Control list missing volume: ${JSON.stringify(controlList)}`);
await mkdir(join(mount.Mountpoint, 'nested'), { recursive: true });
await writeFile(join(mount.Mountpoint, 'nested', 'hello.txt'), 'corestore volume smoke\n');
const snapshotResponse = await controlPost<any>(controlPort, '/volumes/snapshot', {
name: 'smoke-volume',
snapshotName: 'before-mutation',
tags: {
scenario: scenarioName,
smokeId,
},
});
assert(snapshotResponse.snapshot?.id, `Snapshot missing id: ${JSON.stringify(snapshotResponse)}`);
await writeFile(join(mount.Mountpoint, 'stale.txt'), 'should be removed on restore\n');
await writeFile(join(mount.Mountpoint, 'nested', 'hello.txt'), 'mutated\n');
const snapshotList = await controlGet<any>(controlPort, '/volumes/snapshots?name=smoke-volume');
assert(snapshotList.snapshots?.some((snapshotArg: any) => snapshotArg.snapshotId === snapshotResponse.snapshot.id), `Snapshot list missing snapshot: ${JSON.stringify(snapshotList)}`);
const restoreResponse = await controlPost<any>(controlPort, '/volumes/restore', {
name: 'smoke-volume',
snapshotId: snapshotResponse.snapshot.id,
});
assert(restoreResponse.ok === true, `Restore failed: ${JSON.stringify(restoreResponse)}`);
const restoredContent = await readFile(join(mount.Mountpoint, 'nested', 'hello.txt'), 'utf8');
assert(restoredContent === 'corestore volume smoke\n', `Restore content mismatch: ${restoredContent}`);
assert(!(await pathExists(join(mount.Mountpoint, 'stale.txt'))), 'Restore did not clear stale files');
const unmount = await pluginPost<any>(socketPath, '/VolumeDriver.Unmount', {
Name: 'smoke-volume',
ID: 'container-smoke',
});
assert(unmount.Err === '', `Volume unmount failed: ${JSON.stringify(unmount)}`);
const remove = await pluginPost<any>(socketPath, '/VolumeDriver.Remove', {
Name: 'smoke-volume',
});
assert(remove.Err === '', `Volume remove failed: ${JSON.stringify(remove)}`);
const finalList = await pluginPost<any>(socketPath, '/VolumeDriver.List', {});
assert(!finalList.Volumes?.some((volumeArg: any) => volumeArg.Name === 'smoke-volume'), `Volume was not removed: ${JSON.stringify(finalList)}`);
} finally {
await corestore.stop().catch((errorArg) => {
console.log(`[${scenarioName}] Failed to stop Corestore: ${(errorArg as Error).message}`);
});
}
};
const assertCoreflowBackupOrchestration = async () => {
const socketPath = join(buildDir, 'backup-plugins', 'corestore.sock');
const dataDir = join(buildDir, 'backup-data');
await mkdir(dirname(socketPath), { recursive: true });
const controlPort = await getFreePort();
const s3Port = await getFreePort();
const dbPort = await getFreePort();
const corestore = new CoreStore({
dataDir,
bindAddress: '127.0.0.1',
publicHost: '127.0.0.1',
controlPort,
s3Port,
dbPort,
apiToken,
volumePluginSocketPath: socketPath,
});
const previousControlUrl = process.env.CORESTORE_CONTROL_URL;
const previousApiToken = process.env.CORESTORE_API_TOKEN;
try {
await corestore.start();
process.env.CORESTORE_CONTROL_URL = `http://127.0.0.1:${controlPort}`;
process.env.CORESTORE_API_TOKEN = apiToken;
const volumeName = 'backup-orchestration-data';
const service: IService = {
id: 'svc-backup-orchestration',
data: {
name: 'backup-orchestration',
description: 'Coreflow backup orchestration service',
imageId: 'image-backup-orchestration',
imageVersion: 'latest',
environment: {},
secretBundleId: 'secret-backup-orchestration',
serviceCategory: 'workload',
deploymentStrategy: 'custom',
scaleFactor: 1,
balancingStrategy: 'round-robin',
ports: {
web: 80,
},
volumes: [
{
source: volumeName,
mountPath: '/data',
backup: true,
},
],
domains: [],
deploymentIds: [],
},
};
const createVolumeResponse = await controlPost<any>(controlPort, '/volumes/create', {
name: volumeName,
serviceId: service.id,
serviceName: service.data.name,
mountPath: '/data',
backup: true,
});
const mountpoint = createVolumeResponse.volume?.Mountpoint;
assert(typeof mountpoint === 'string', `Could not create backup test volume: ${JSON.stringify(createVolumeResponse)}`);
await mkdir(mountpoint, { recursive: true });
await writeFile(join(mountpoint, 'state.txt'), 'before backup\n');
const provisionResponse = await controlPost<any>(controlPort, '/resources/provision', {
serviceId: service.id,
serviceName: service.data.name,
capabilities: ['database', 'objectstorage'],
});
assert(provisionResponse.resources?.length === 2, `Resource provisioning failed: ${JSON.stringify(provisionResponse)}`);
const coreflow = new Coreflow();
const backupResult = await coreflow.backupManager.executeServiceBackup({
backupId: 'backup-orchestration-smoke',
service,
tags: {
scenario: scenarioName,
},
});
const snapshotTypes = backupResult.snapshots.map((snapshotArg: any) => snapshotArg.type).sort();
assert(
JSON.stringify(snapshotTypes) === JSON.stringify(['database', 'objectstorage', 'volume']),
`Unexpected backup snapshots: ${JSON.stringify(backupResult)}`,
);
await writeFile(join(mountpoint, 'state.txt'), 'after backup mutation\n');
await writeFile(join(mountpoint, 'stale.txt'), 'stale\n');
const restoreResult = await coreflow.backupManager.executeServiceRestore({
backupId: 'backup-orchestration-smoke',
service,
snapshots: backupResult.snapshots,
clear: true,
});
assert(restoreResult.restored.length === 3, `Unexpected restore result: ${JSON.stringify(restoreResult)}`);
assert(await readFile(join(mountpoint, 'state.txt'), 'utf8') === 'before backup\n', 'Coreflow restore did not restore volume data');
assert(!(await pathExists(join(mountpoint, 'stale.txt'))), 'Coreflow restore did not clear stale volume data');
} finally {
if (previousControlUrl === undefined) {
delete process.env.CORESTORE_CONTROL_URL;
} else {
process.env.CORESTORE_CONTROL_URL = previousControlUrl;
}
if (previousApiToken === undefined) {
delete process.env.CORESTORE_API_TOKEN;
} else {
process.env.CORESTORE_API_TOKEN = previousApiToken;
}
await corestore.stop().catch((errorArg) => {
console.log(`[${scenarioName}] Failed to stop backup Corestore: ${(errorArg as Error).message}`);
});
}
};
const main = async () => {
try {
await mkdir(buildDir, { recursive: true });
assertCoreflowVolumeMounts();
await assertCorestoreVolumeDriver();
await assertCoreflowBackupOrchestration();
console.log(`[${scenarioName}] PASS`);
} finally {
await rm(buildDir, { recursive: true, force: true });
}
};
await main();