import * as http from 'node:http'; import * as net from 'node:net'; import { access, mkdir, readFile, rm, writeFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { dirname, join } from 'node:path'; import { CoreStore } from '../../../corestore/ts/index.js'; import { Coreflow } from '../../../coreflow/ts/coreflow.classes.coreflow.js'; import type { IService } from '../../../interfaces/ts/data/service.js'; const scenarioName = 'corestore-volume-driver'; const smokeId = `corestore-volume-${Date.now().toString(36)}`; const buildDir = join(tmpdir(), smokeId); const apiToken = `${smokeId}-token`; function assert(conditionArg: unknown, messageArg: string): asserts conditionArg { if (!conditionArg) { throw new Error(messageArg); } } const getFreePort = async () => { return await new Promise((resolveArg, rejectArg) => { const server = net.createServer(); server.on('error', rejectArg); server.listen(0, '127.0.0.1', () => { const address = server.address(); if (typeof address !== 'object' || !address) { server.close(() => rejectArg(new Error('Could not allocate a port'))); return; } server.close(() => resolveArg(address.port)); }); }); }; const requestJson = async (optionsArg: http.RequestOptions, bodyArg?: unknown) => { return await new Promise((resolveArg, rejectArg) => { const payload = bodyArg === undefined ? '' : JSON.stringify(bodyArg); const headers = { ...(bodyArg === undefined ? {} : { 'content-type': 'application/json', 'content-length': Buffer.byteLength(payload), }), ...(optionsArg.headers || {}), }; const request = http.request( { timeout: 30000, ...optionsArg, headers, }, (response) => { const chunks: Buffer[] = []; response.on('data', (chunkArg) => { chunks.push(Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg)); }); response.on('end', () => { const responseText = Buffer.concat(chunks).toString('utf8'); if ((response.statusCode || 500) >= 400) { rejectArg(new Error(`HTTP ${response.statusCode}: ${responseText}`)); return; } resolveArg(responseText ? JSON.parse(responseText) as T : ({} as T)); }); }, ); request.on('timeout', () => request.destroy(new Error(`Request timed out: ${optionsArg.path}`))); request.on('error', rejectArg); request.end(payload); }); }; const controlGet = async (controlPortArg: number, pathArg: string) => { return await requestJson({ method: 'GET', hostname: '127.0.0.1', port: controlPortArg, path: pathArg, headers: { authorization: `Bearer ${apiToken}`, }, }); }; const controlPost = async (controlPortArg: number, pathArg: string, bodyArg: unknown) => { return await requestJson( { method: 'POST', hostname: '127.0.0.1', port: controlPortArg, path: pathArg, headers: { authorization: `Bearer ${apiToken}`, }, }, bodyArg, ); }; const pluginPost = async (socketPathArg: string, pathArg: string, bodyArg: unknown) => { return await requestJson( { method: 'POST', socketPath: socketPathArg, path: pathArg, }, bodyArg, ); }; const pathExists = async (pathArg: string) => { try { await access(pathArg); return true; } catch { return false; } }; const assertCoreflowVolumeMounts = () => { const coreflow = new Coreflow(); const service: IService = { id: 'svc-corestore-volume-test', data: { name: 'app.volume-test', description: 'Corestore volume mount spec test service', imageId: 'image-corestore-volume-test', imageVersion: 'latest', environment: {}, secretBundleId: 'secret-corestore-volume-test', serviceCategory: 'workload', deploymentStrategy: 'custom', scaleFactor: 1, balancingStrategy: 'round-robin', ports: { web: 80, }, volumes: [ { mountPath: '/data', backup: true, }, { source: 'shared-assets', mountPath: '/assets', readOnly: true, options: { tier: 'gold', }, }, ], resources: { memorySizeLimitMB: 128, volumeMounts: [ { hostFsPath: '/tmp/legacy-corestore-volume-test', containerFsPath: '/legacy', }, ], }, domains: [], deploymentIds: [], }, }; const clusterManager = coreflow.clusterManager as unknown as { getServiceDockerMounts: (serviceArg: IService) => Array>; getServiceVolumeHash: (serviceArg: IService) => string; }; const mounts = clusterManager.getServiceDockerMounts(service); const volumeHash = clusterManager.getServiceVolumeHash(service); assert(volumeHash.length === 64, `Expected non-empty volume hash, got ${volumeHash}`); assert(mounts.some((mountArg) => mountArg.Type === 'bind' && mountArg.Target === '/legacy'), 'Legacy bind mount was not preserved'); const dataMount = mounts.find((mountArg) => mountArg.Type === 'volume' && mountArg.Target === '/data'); assert(dataMount, `Missing /data volume mount: ${JSON.stringify(mounts)}`); assert(dataMount.VolumeOptions.DriverConfig.Name === 'corestore', 'Default volume driver should be corestore'); assert(dataMount.VolumeOptions.DriverConfig.Options.serviceId === service.id, 'Missing serviceId driver option'); assert(dataMount.VolumeOptions.DriverConfig.Options.serviceName === service.data.name, 'Missing serviceName driver option'); assert(dataMount.VolumeOptions.DriverConfig.Options.mountPath === '/data', 'Missing mountPath driver option'); assert(dataMount.VolumeOptions.DriverConfig.Options.backup === 'true', 'Missing backup driver option'); assert(typeof dataMount.Source === 'string' && dataMount.Source.startsWith('sz-app.volume-test-data-'), `Unexpected generated volume name: ${dataMount.Source}`); const sharedMount = mounts.find((mountArg) => mountArg.Type === 'volume' && mountArg.Target === '/assets'); assert(sharedMount, `Missing /assets volume mount: ${JSON.stringify(mounts)}`); assert(sharedMount.Source === 'shared-assets', 'Explicit volume source was not preserved'); assert(sharedMount.ReadOnly === true, 'Read-only flag was not preserved'); assert(sharedMount.VolumeOptions.DriverConfig.Options.tier === 'gold', 'Custom driver option was not preserved'); const serviceWithoutVolumes: IService = { ...service, data: { ...service.data, volumes: [], resources: undefined, }, }; assert(clusterManager.getServiceVolumeHash(serviceWithoutVolumes) === '', 'Services without volumes should not get a volume hash label'); }; const assertCorestoreVolumeDriver = async () => { const socketPath = join(buildDir, 'plugins', 'corestore.sock'); const dataDir = join(buildDir, 'data'); await mkdir(dirname(socketPath), { recursive: true }); const controlPort = await getFreePort(); const s3Port = await getFreePort(); const dbPort = await getFreePort(); const corestore = new CoreStore({ dataDir, bindAddress: '127.0.0.1', publicHost: '127.0.0.1', controlPort, s3Port, dbPort, apiToken, volumePluginSocketPath: socketPath, }); try { await corestore.start(); const health = await controlGet(controlPort, '/health'); assert(health.ok === true, `Corestore health was not ok: ${JSON.stringify(health)}`); assert(health.volumes?.running === true, `Volume plugin health was not reported: ${JSON.stringify(health)}`); const activate = await pluginPost(socketPath, '/Plugin.Activate', {}); assert(activate.Implements?.includes('VolumeDriver'), `Plugin activation failed: ${JSON.stringify(activate)}`); const capabilities = await pluginPost(socketPath, '/VolumeDriver.Capabilities', {}); assert(capabilities.Capabilities?.Scope === 'local', `Unexpected plugin capabilities: ${JSON.stringify(capabilities)}`); const create = await pluginPost(socketPath, '/VolumeDriver.Create', { Name: 'smoke-volume', Opts: { serviceId: 'svc-smoke', serviceName: 'smoke-service', mountPath: '/data', backup: 'true', }, }); assert(create.Err === '', `Volume create failed: ${JSON.stringify(create)}`); const mount = await pluginPost(socketPath, '/VolumeDriver.Mount', { Name: 'smoke-volume', ID: 'container-smoke', }); assert(mount.Err === '' && typeof mount.Mountpoint === 'string', `Volume mount failed: ${JSON.stringify(mount)}`); const removeWhileMounted = await pluginPost(socketPath, '/VolumeDriver.Remove', { Name: 'smoke-volume', }); assert(typeof removeWhileMounted.Err === 'string' && removeWhileMounted.Err.includes('still mounted'), 'Mounted volume removal should be rejected'); const pathResponse = await pluginPost(socketPath, '/VolumeDriver.Path', { Name: 'smoke-volume', }); assert(pathResponse.Mountpoint === mount.Mountpoint, `Volume path mismatch: ${JSON.stringify(pathResponse)}`); const getResponse = await pluginPost(socketPath, '/VolumeDriver.Get', { Name: 'smoke-volume', }); assert(getResponse.Volume?.Status?.serviceId === 'svc-smoke', `Volume metadata missing: ${JSON.stringify(getResponse)}`); const pluginList = await pluginPost(socketPath, '/VolumeDriver.List', {}); assert(pluginList.Volumes?.some((volumeArg: any) => volumeArg.Name === 'smoke-volume'), `Plugin list missing volume: ${JSON.stringify(pluginList)}`); const controlList = await controlGet(controlPort, '/volumes'); assert(controlList.volumes?.some((volumeArg: any) => volumeArg.Name === 'smoke-volume'), `Control list missing volume: ${JSON.stringify(controlList)}`); await mkdir(join(mount.Mountpoint, 'nested'), { recursive: true }); await writeFile(join(mount.Mountpoint, 'nested', 'hello.txt'), 'corestore volume smoke\n'); const snapshotResponse = await controlPost(controlPort, '/volumes/snapshot', { name: 'smoke-volume', snapshotName: 'before-mutation', tags: { scenario: scenarioName, smokeId, }, }); assert(snapshotResponse.snapshot?.id, `Snapshot missing id: ${JSON.stringify(snapshotResponse)}`); await writeFile(join(mount.Mountpoint, 'stale.txt'), 'should be removed on restore\n'); await writeFile(join(mount.Mountpoint, 'nested', 'hello.txt'), 'mutated\n'); const snapshotList = await controlGet(controlPort, '/volumes/snapshots?name=smoke-volume'); assert(snapshotList.snapshots?.some((snapshotArg: any) => snapshotArg.snapshotId === snapshotResponse.snapshot.id), `Snapshot list missing snapshot: ${JSON.stringify(snapshotList)}`); const restoreResponse = await controlPost(controlPort, '/volumes/restore', { name: 'smoke-volume', snapshotId: snapshotResponse.snapshot.id, }); assert(restoreResponse.ok === true, `Restore failed: ${JSON.stringify(restoreResponse)}`); const restoredContent = await readFile(join(mount.Mountpoint, 'nested', 'hello.txt'), 'utf8'); assert(restoredContent === 'corestore volume smoke\n', `Restore content mismatch: ${restoredContent}`); assert(!(await pathExists(join(mount.Mountpoint, 'stale.txt'))), 'Restore did not clear stale files'); const unmount = await pluginPost(socketPath, '/VolumeDriver.Unmount', { Name: 'smoke-volume', ID: 'container-smoke', }); assert(unmount.Err === '', `Volume unmount failed: ${JSON.stringify(unmount)}`); const remove = await pluginPost(socketPath, '/VolumeDriver.Remove', { Name: 'smoke-volume', }); assert(remove.Err === '', `Volume remove failed: ${JSON.stringify(remove)}`); const finalList = await pluginPost(socketPath, '/VolumeDriver.List', {}); assert(!finalList.Volumes?.some((volumeArg: any) => volumeArg.Name === 'smoke-volume'), `Volume was not removed: ${JSON.stringify(finalList)}`); } finally { await corestore.stop().catch((errorArg) => { console.log(`[${scenarioName}] Failed to stop Corestore: ${(errorArg as Error).message}`); }); } }; const main = async () => { try { await mkdir(buildDir, { recursive: true }); assertCoreflowVolumeMounts(); await assertCorestoreVolumeDriver(); console.log(`[${scenarioName}] PASS`); } finally { await rm(buildDir, { recursive: true, force: true }); } }; await main();