From 02d1b77ae88a51ad63ba267feebbda9a8cae76d7 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 2 May 2026 18:58:21 +0000 Subject: [PATCH] feat: add corestore volume driver --- package.json | 3 +- pnpm-lock.yaml | 13 + readme.md | 42 ++- ts/corestore.classes.corestore.ts | 533 +++++++++++++++++++++++++++++- ts/corestore.interfaces.ts | 51 +++ ts/corestore.plugins.ts | 9 +- 6 files changed, 644 insertions(+), 7 deletions(-) diff --git a/package.json b/package.json index 0c26318..908f2bc 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,8 @@ "@push.rocks/projectinfo": "^5.1.0", "@push.rocks/smartdb": "^2.10.0", "@push.rocks/smartpath": "^6.0.0", - "@push.rocks/smartstorage": "^6.5.1" + "@push.rocks/smartstorage": "^6.5.1", + "@serve.zone/containerarchive": "^0.1.3" }, "private": false, "files": [ diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4472256..74850c6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -20,6 +20,9 @@ importers: '@push.rocks/smartstorage': specifier: ^6.5.1 version: 6.5.1 + '@serve.zone/containerarchive': + specifier: ^0.1.3 + version: 0.1.3 devDependencies: '@git.zone/tsbuild': specifier: ^4.4.0 @@ -1422,6 +1425,9 @@ packages: '@sec-ant/readable-stream@0.4.1': resolution: {integrity: sha512-831qok9r2t8AlxLko40y2ebgSDhenenCatLVeW/uBtnHPyhHOvG0C7TvfgecV+wHzIm5KUICgzmVpWS+IMEAeg==} + '@serve.zone/containerarchive@0.1.3': + resolution: {integrity: sha512-tCy7jrgoZxUX2wxin87PXq5YrIiZ/2evh5OtQhzshfS5mEod5OBrai5wxgNzicGFeg+uZRPCMtOD/ocTakSDZg==} + '@smithy/chunked-blob-reader-native@4.2.3': resolution: {integrity: sha512-jA5k5Udn7Y5717L86h4EIv06wIr3xn8GM1qHRi/Nf31annXcXHJjBKvgztnbn2TxH3xWrPBfgwHsOwZf0UmQWw==} engines: {node: '>=18.0.0'} @@ -6278,6 +6284,13 @@ snapshots: '@sec-ant/readable-stream@0.4.1': {} + '@serve.zone/containerarchive@0.1.3': + dependencies: + '@push.rocks/lik': 6.4.1 + '@push.rocks/smartpromise': 4.2.4 + '@push.rocks/smartrust': 1.4.0 + '@push.rocks/smartrx': 3.0.10 + '@smithy/chunked-blob-reader-native@4.2.3': dependencies: '@smithy/util-base64': 4.3.2 diff --git a/readme.md b/readme.md index c631ec0..cd1b924 100644 --- a/readme.md +++ b/readme.md @@ -5,12 +5,11 @@ - `@push.rocks/smartdb` as a MongoDB-compatible database endpoint on port `27017`. - `@push.rocks/smartstorage` as an S3-compatible object-storage endpoint on port `9000`. - A small control API on port `3000` for Coreflow provisioning. +- A Docker VolumeDriver plugin on `/run/docker/plugins/corestore.sock`. ## Purpose -Coreflow can run `corestore` on every node and provision per-service resources on the node that hosts a workload requiring `database` or `objectstorage`. - -The first implementation exposes the provider container and provisioning API. Coreflow should call the control API when reconciling platform bindings, then inject the returned environment variables into the workload secret. +Coreflow can run `corestore` on every node and provision per-service resources on the node that hosts a workload requiring `database`, `objectstorage`, or persistent volumes. ## Runtime @@ -43,6 +42,8 @@ Default data directory: `/data/corestore`. | `CORESTORE_REGION` | `us-east-1` | S3 region | | `CORESTORE_API_TOKEN` | unset | Optional bearer token for mutating/read-sensitive control APIs | | `CORESTORE_MASTER_SECRET` | generated and persisted | Seed for deterministic tenant credentials | +| `CORESTORE_VOLUME_PLUGIN_SOCKET` | `/run/docker/plugins/corestore.sock` | Docker VolumeDriver socket path | +| `CORESTORE_ARCHIVE_PASSPHRASE` | unset | Optional encryption passphrase for volume snapshots | When Coreflow creates the global `corestore` service, it forwards its own `CORESTORE_API_TOKEN` environment variable into the service. Set the same value on Coreflow to protect provisioning APIs from workload containers on the same overlay network. @@ -74,6 +75,39 @@ curl -X POST http://corestore:3000/resources/deprovision \ -d '{"serviceId":"svc-123"}' ``` +List managed volumes: + +```bash +curl http://corestore:3000/volumes \ + -H 'authorization: Bearer ' +``` + +Snapshot a volume into the local `containerarchive` repository: + +```bash +curl -X POST http://corestore:3000/volumes/snapshot \ + -H 'content-type: application/json' \ + -H 'authorization: Bearer ' \ + -d '{"name":"sz-api-data-abc123","snapshotName":"before-deploy"}' +``` + +Restore a snapshot into an existing volume: + +```bash +curl -X POST http://corestore:3000/volumes/restore \ + -H 'content-type: application/json' \ + -H 'authorization: Bearer ' \ + -d '{"name":"sz-api-data-abc123","snapshotId":""}' +``` + +## Docker Volume Driver + +Corestore implements Docker's legacy VolumeDriver API over a Unix socket. The `corestore` service must bind mount `/run/docker/plugins` from the host so Docker can discover `/run/docker/plugins/corestore.sock`. + +Docker calls `corestore` for `Create`, `Mount`, `Unmount`, `Remove`, `Path`, `Get`, `List`, and `Capabilities`. Mountpoints are real host paths under `/data/corestore/volumes//data`; Docker bind-mounts those paths into workload containers. + +The driver reports `Scope: local`, because volume data is node-local. Backup orchestration should snapshot volumes through the control API before destructive changes or restores. + ## Docker ```bash @@ -88,6 +122,8 @@ The intended cluster behavior is: - deploy `corestore` as a node-local/global service so every workload node has a local storage provider; - provision `database` and `objectstorage` bindings through `/resources/provision`; +- mount service volumes through Docker `DriverConfig.Name = corestore`; +- snapshot and restore service volumes through `/volumes/snapshot` and `/volumes/restore`; - merge the returned env vars into the workload Docker secret before service creation; - mark Cloudly platform bindings `ready` with endpoint metadata and credential env refs; - deprovision resources when the service binding or workload is deleted. diff --git a/ts/corestore.classes.corestore.ts b/ts/corestore.classes.corestore.ts index 2e2c229..37fa507 100644 --- a/ts/corestore.classes.corestore.ts +++ b/ts/corestore.classes.corestore.ts @@ -10,6 +10,7 @@ export interface ICoreStoreOptions { dbPort?: number; region?: string; apiToken?: string; + volumePluginSocketPath?: string; } type TResolvedCoreStoreOptions = Required> & { @@ -21,7 +22,9 @@ export class CoreStore { private smartStorage: plugins.smartstorage.SmartStorage | null = null; private smartDb: plugins.smartdb.SmartdbServer | null = null; private controlServer: plugins.http.Server | null = null; - private manifest: interfaces.ICoreStoreManifest = { version: 1, services: {} }; + private volumePluginServer: plugins.http.Server | null = null; + private volumeArchive: plugins.containerarchive.ContainerArchive | null = null; + private manifest: interfaces.ICoreStoreManifest = { version: 1, services: {}, volumes: {} }; private secretFile: interfaces.ICoreStoreSecretFile | null = null; constructor(optionsArg: ICoreStoreOptions = {}) { @@ -33,6 +36,10 @@ export class CoreStore { s3Port: optionsArg.s3Port || this.getNumberEnv('CORESTORE_S3_PORT', 9000), dbPort: optionsArg.dbPort || this.getNumberEnv('CORESTORE_DB_PORT', 27017), region: optionsArg.region || process.env.CORESTORE_REGION || 'us-east-1', + volumePluginSocketPath: + optionsArg.volumePluginSocketPath || + process.env.CORESTORE_VOLUME_PLUGIN_SOCKET || + '/run/docker/plugins/corestore.sock', ...(optionsArg.apiToken || process.env.CORESTORE_API_TOKEN ? { apiToken: optionsArg.apiToken || process.env.CORESTORE_API_TOKEN } : {}), @@ -46,15 +53,27 @@ export class CoreStore { await this.startSmartStorage(); await this.startSmartDb(); await this.startControlApi(); + await this.startVolumePlugin(); } public async stop() { + if (this.volumePluginServer) { + await new Promise((resolve, reject) => { + this.volumePluginServer!.close((errorArg) => (errorArg ? reject(errorArg) : resolve())); + }); + this.volumePluginServer = null; + await plugins.fs.unlink(this.options.volumePluginSocketPath).catch(() => {}); + } if (this.controlServer) { await new Promise((resolve, reject) => { this.controlServer!.close((errorArg) => (errorArg ? reject(errorArg) : resolve())); }); this.controlServer = null; } + if (this.volumeArchive) { + await this.volumeArchive.close(); + this.volumeArchive = null; + } if (this.smartStorage) { await this.smartStorage.stop(); this.smartStorage = null; @@ -79,6 +98,14 @@ export class CoreStore { return plugins.path.join(this.options.dataDir, 'smartdb'); } + private getVolumesDir() { + return plugins.path.join(this.options.dataDir, 'volumes'); + } + + private getVolumeArchiveDir() { + return plugins.path.join(this.options.dataDir, 'volume-archive'); + } + private getManifestPath() { return plugins.path.join(this.options.dataDir, 'corestore-manifest.json'); } @@ -91,6 +118,8 @@ export class CoreStore { await plugins.fs.mkdir(this.options.dataDir, { recursive: true }); await plugins.fs.mkdir(this.getStorageDir(), { recursive: true }); await plugins.fs.mkdir(this.getDbDir(), { recursive: true }); + await plugins.fs.mkdir(this.getVolumesDir(), { recursive: true }); + await plugins.fs.mkdir(this.getVolumeArchiveDir(), { recursive: true }); } private async loadOrCreateSecretFile(): Promise { @@ -117,10 +146,18 @@ export class CoreStore { } private async loadManifest(): Promise { - return (await this.readJsonFile(this.getManifestPath())) || { + const manifest = (await this.readJsonFile(this.getManifestPath())) || { version: 1, services: {}, + volumes: {}, }; + manifest.services = manifest.services || {}; + manifest.volumes = manifest.volumes || {}; + for (const volume of Object.values(manifest.volumes)) { + volume.mountIds = volume.mountIds || []; + volume.snapshots = volume.snapshots || []; + } + return manifest; } private async saveManifest() { @@ -220,6 +257,34 @@ export class CoreStore { }); } + private async startVolumePlugin() { + const socketPath = this.options.volumePluginSocketPath; + await plugins.fs.mkdir(plugins.path.dirname(socketPath), { recursive: true }); + await this.removeStaleSocket(socketPath); + + this.volumePluginServer = plugins.http.createServer(async (reqArg, resArg) => { + await this.handleVolumePluginRequest(reqArg, resArg); + }); + await new Promise((resolve) => { + this.volumePluginServer!.listen(socketPath, resolve); + }); + await plugins.fs.chmod(socketPath, 0o660).catch(() => {}); + } + + private async removeStaleSocket(socketPathArg: string) { + try { + const stat = await plugins.fs.stat(socketPathArg); + if (!stat.isSocket()) { + throw new Error(`${socketPathArg} exists and is not a socket`); + } + await plugins.fs.unlink(socketPathArg); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + throw error; + } + } + } + private async handleControlRequest( reqArg: plugins.http.IncomingMessage, resArg: plugins.http.ServerResponse, @@ -242,6 +307,19 @@ export class CoreStore { return; } + if (method === 'GET' && url.pathname === '/volumes') { + this.sendJson(resArg, 200, { + volumes: Object.values(this.manifest.volumes).map((volumeArg) => this.getVolumeInfo(volumeArg)), + }); + return; + } + + if (method === 'GET' && url.pathname === '/volumes/snapshots') { + const volumeName = url.searchParams.get('name'); + this.sendJson(resArg, 200, await this.listVolumeSnapshots(volumeName || undefined)); + return; + } + if (method === 'GET' && url.pathname === '/resources') { this.sendJson(resArg, 200, { services: Object.values(this.manifest.services).map((serviceArg) => ({ @@ -259,6 +337,31 @@ export class CoreStore { return; } + if (method === 'POST' && url.pathname === '/volumes/create') { + const body = await this.readRequestBody(reqArg); + const volume = await this.createOrUpdateVolume(body); + this.sendJson(resArg, 200, { volume: this.getVolumeInfo(volume) }); + return; + } + + if (method === 'POST' && url.pathname === '/volumes/remove') { + const body = await this.readRequestBody(reqArg); + this.sendJson(resArg, 200, await this.removeVolume(body.name)); + return; + } + + if (method === 'POST' && url.pathname === '/volumes/snapshot') { + const body = await this.readRequestBody(reqArg); + this.sendJson(resArg, 200, await this.snapshotVolume(body)); + return; + } + + if (method === 'POST' && url.pathname === '/volumes/restore') { + const body = await this.readRequestBody(reqArg); + this.sendJson(resArg, 200, await this.restoreVolume(body)); + return; + } + if (method === 'POST' && url.pathname === '/resources/provision') { const body = await this.readRequestBody(reqArg); this.sendJson(resArg, 200, await this.provisionForService(body)); @@ -308,6 +411,421 @@ export class CoreStore { resArg.end(body); } + private async handleVolumePluginRequest( + reqArg: plugins.http.IncomingMessage, + resArg: plugins.http.ServerResponse, + ) { + const url = new URL(reqArg.url || '/', 'http://docker-plugin.local'); + const pathName = url.pathname; + + if (reqArg.method !== 'POST') { + this.sendJson(resArg, 404, { Err: 'not found' }); + return; + } + + try { + if (pathName === '/Plugin.Activate') { + this.sendJson(resArg, 200, { Implements: ['VolumeDriver'] }); + return; + } + + if (pathName === '/VolumeDriver.Capabilities') { + this.sendJson(resArg, 200, { Capabilities: { Scope: 'local' } }); + return; + } + + const body = await this.readRequestBody>(reqArg); + + if (pathName === '/VolumeDriver.Create') { + await this.createOrUpdateVolume({ + name: body.Name, + opts: body.Opts || {}, + }); + this.sendJson(resArg, 200, { Err: '' }); + return; + } + + if (pathName === '/VolumeDriver.Remove') { + await this.removeVolume(body.Name); + this.sendJson(resArg, 200, { Err: '' }); + return; + } + + if (pathName === '/VolumeDriver.Mount') { + const volume = await this.mountVolume(body.Name, body.ID, body.Opts || {}); + this.sendJson(resArg, 200, { Mountpoint: volume.mountpoint, Err: '' }); + return; + } + + if (pathName === '/VolumeDriver.Unmount') { + await this.unmountVolume(body.Name, body.ID); + this.sendJson(resArg, 200, { Err: '' }); + return; + } + + if (pathName === '/VolumeDriver.Path') { + const volume = this.getExistingVolume(body.Name); + this.sendJson(resArg, 200, { Mountpoint: volume.mountpoint, Err: '' }); + return; + } + + if (pathName === '/VolumeDriver.Get') { + const volume = this.getExistingVolume(body.Name); + this.sendJson(resArg, 200, { Volume: this.getVolumeInfo(volume), Err: '' }); + return; + } + + if (pathName === '/VolumeDriver.List') { + this.sendJson(resArg, 200, { + Volumes: Object.values(this.manifest.volumes).map((volumeArg) => this.getVolumeInfo(volumeArg)), + Err: '', + }); + return; + } + + this.sendJson(resArg, 404, { Err: 'not found' }); + } catch (error) { + this.sendJson(resArg, 200, { Err: (error as Error).message }); + } + } + + private normalizeVolumeName(nameArg: string) { + const name = String(nameArg || '').trim(); + if (!name) { + throw new Error('volume name is required'); + } + if (name.startsWith('/') || name.includes('\0')) { + throw new Error(`invalid volume name ${name}`); + } + return name; + } + + private normalizeVolumeOptions(optionsArg: Record = {}) { + const options: Record = {}; + for (const [key, value] of Object.entries(optionsArg)) { + if (value === undefined || value === null) { + continue; + } + options[key] = String(value); + } + return options; + } + + private getVolumeDirectoryName(volumeNameArg: string) { + const safeName = volumeNameArg + .toLowerCase() + .replace(/[^a-z0-9_.-]+/g, '-') + .replace(/^-+|-+$/g, '') + .slice(0, 64) + .replace(/[-_.]+$/g, '') || 'volume'; + const hash = plugins.crypto.createHash('sha1').update(volumeNameArg).digest('hex').slice(0, 12); + return `${safeName}-${hash}`; + } + + private parseBooleanOption(valueArg: unknown, defaultArg: boolean) { + if (typeof valueArg === 'boolean') { + return valueArg; + } + if (typeof valueArg !== 'string') { + return defaultArg; + } + if (['true', '1', 'yes', 'on'].includes(valueArg.toLowerCase())) { + return true; + } + if (['false', '0', 'no', 'off'].includes(valueArg.toLowerCase())) { + return false; + } + return defaultArg; + } + + private async createOrUpdateVolume( + requestArg: interfaces.ICoreStoreVolumeCreateRequest, + ): Promise { + const name = this.normalizeVolumeName(requestArg.name); + const options = this.normalizeVolumeOptions({ + ...(requestArg.opts || {}), + ...(requestArg.options || {}), + }); + const now = Date.now(); + let volume = this.manifest.volumes[name]; + + if (!volume) { + const directoryName = this.getVolumeDirectoryName(name); + volume = { + name, + directoryName, + mountpoint: plugins.path.join(this.getVolumesDir(), directoryName, 'data'), + options, + serviceId: requestArg.serviceId || options.serviceId, + serviceName: requestArg.serviceName || options.serviceName, + mountPath: requestArg.mountPath || options.mountPath, + backup: requestArg.backup ?? this.parseBooleanOption(options.backup, true), + mountIds: [], + snapshots: [], + createdAt: now, + updatedAt: now, + }; + this.manifest.volumes[name] = volume; + } else { + volume.options = { + ...volume.options, + ...options, + }; + volume.serviceId = requestArg.serviceId || options.serviceId || volume.serviceId; + volume.serviceName = requestArg.serviceName || options.serviceName || volume.serviceName; + volume.mountPath = requestArg.mountPath || options.mountPath || volume.mountPath; + volume.backup = requestArg.backup ?? this.parseBooleanOption(options.backup, volume.backup); + volume.mountIds = volume.mountIds || []; + volume.snapshots = volume.snapshots || []; + volume.updatedAt = now; + } + + await plugins.fs.mkdir(volume.mountpoint, { recursive: true }); + await this.saveManifest(); + return volume; + } + + private getExistingVolume(nameArg: string) { + const name = this.normalizeVolumeName(nameArg); + const volume = this.manifest.volumes[name]; + if (!volume) { + throw new Error(`volume ${name} does not exist`); + } + return volume; + } + + private async mountVolume( + nameArg: string, + mountIdArg?: string, + optionsArg: Record = {}, + ) { + const volume = await this.createOrUpdateVolume({ + name: nameArg, + opts: optionsArg, + }); + if (mountIdArg && !volume.mountIds.includes(mountIdArg)) { + volume.mountIds.push(mountIdArg); + volume.updatedAt = Date.now(); + await this.saveManifest(); + } + await plugins.fs.mkdir(volume.mountpoint, { recursive: true }); + return volume; + } + + private async unmountVolume(nameArg: string, mountIdArg?: string) { + const volume = this.getExistingVolume(nameArg); + if (mountIdArg && volume.mountIds.includes(mountIdArg)) { + volume.mountIds = volume.mountIds.filter((mountId) => mountId !== mountIdArg); + volume.updatedAt = Date.now(); + await this.saveManifest(); + } + return { ok: true }; + } + + private async removeVolume(nameArg: string) { + const volume = this.getExistingVolume(nameArg); + if (volume.mountIds.length > 0) { + throw new Error(`volume ${volume.name} is still mounted`); + } + await plugins.fs.rm(plugins.path.dirname(volume.mountpoint), { recursive: true, force: true }); + delete this.manifest.volumes[volume.name]; + await this.saveManifest(); + return { ok: true, removed: volume.name }; + } + + private getVolumeInfo(volumeArg: interfaces.ICoreStoreVolumeManifestEntry) { + return { + Name: volumeArg.name, + Mountpoint: volumeArg.mountpoint, + Status: { + driver: 'corestore', + serviceId: volumeArg.serviceId || '', + serviceName: volumeArg.serviceName || '', + mountPath: volumeArg.mountPath || '', + backup: String(volumeArg.backup), + mountCount: String(volumeArg.mountIds.length), + snapshotCount: String(volumeArg.snapshots.length), + createdAt: String(volumeArg.createdAt), + updatedAt: String(volumeArg.updatedAt), + }, + }; + } + + private async listVolumeSnapshots(volumeNameArg?: string) { + if (volumeNameArg) { + const volume = this.getExistingVolume(volumeNameArg); + return { + volumeName: volume.name, + snapshots: volume.snapshots, + }; + } + return { + snapshots: Object.values(this.manifest.volumes).flatMap((volumeArg) => { + return volumeArg.snapshots.map((snapshotArg) => ({ + ...snapshotArg, + volumeName: volumeArg.name, + })); + }), + }; + } + + private async getVolumeArchive() { + if (this.volumeArchive) { + return this.volumeArchive; + } + const archiveDir = this.getVolumeArchiveDir(); + await plugins.fs.mkdir(archiveDir, { recursive: true }); + const passphrase = process.env.CORESTORE_ARCHIVE_PASSPHRASE; + const options = passphrase ? { passphrase } : undefined; + const configPath = plugins.path.join(archiveDir, 'config.json'); + try { + await plugins.fs.stat(configPath); + this.volumeArchive = await plugins.containerarchive.ContainerArchive.open(archiveDir, options); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + throw error; + } + this.volumeArchive = await plugins.containerarchive.ContainerArchive.init(archiveDir, options); + } + return this.volumeArchive; + } + + private createDirectoryTarStream(directoryArg: string) { + const tarProcess = plugins.childProcess.spawn('tar', ['-C', directoryArg, '-cf', '-', '.'], { + stdio: ['ignore', 'pipe', 'pipe'], + }); + const stderrChunks: Buffer[] = []; + tarProcess.stderr?.on('data', (chunkArg) => { + stderrChunks.push(Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg)); + }); + const completion = new Promise((resolve, reject) => { + tarProcess.on('error', reject); + tarProcess.on('close', (codeArg) => { + if (codeArg === 0) { + resolve(); + } else { + reject(new Error(`tar create failed: ${Buffer.concat(stderrChunks).toString('utf8').trim()}`)); + } + }); + }); + if (!tarProcess.stdout) { + throw new Error('tar stdout is unavailable'); + } + return { stream: tarProcess.stdout, completion }; + } + + private async extractTarStreamToDirectory( + inputStreamArg: NodeJS.ReadableStream, + directoryArg: string, + ) { + await plugins.fs.mkdir(directoryArg, { recursive: true }); + const tarProcess = plugins.childProcess.spawn('tar', ['-C', directoryArg, '-xf', '-'], { + stdio: ['pipe', 'ignore', 'pipe'], + }); + const stderrChunks: Buffer[] = []; + tarProcess.stderr?.on('data', (chunkArg) => { + stderrChunks.push(Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg)); + }); + const completion = new Promise((resolve, reject) => { + tarProcess.on('error', reject); + tarProcess.on('close', (codeArg) => { + if (codeArg === 0) { + resolve(); + } else { + reject(new Error(`tar extract failed: ${Buffer.concat(stderrChunks).toString('utf8').trim()}`)); + } + }); + }); + if (!tarProcess.stdin) { + throw new Error('tar stdin is unavailable'); + } + await plugins.streamPromises.pipeline(inputStreamArg as any, tarProcess.stdin as any); + await completion; + } + + private async clearDirectoryContents(directoryArg: string) { + await plugins.fs.mkdir(directoryArg, { recursive: true }); + const entries = await plugins.fs.readdir(directoryArg); + await Promise.all(entries.map((entryArg) => { + return plugins.fs.rm(plugins.path.join(directoryArg, entryArg), { recursive: true, force: true }); + })); + } + + private async moveDirectoryContents(sourceDirArg: string, targetDirArg: string) { + await plugins.fs.mkdir(targetDirArg, { recursive: true }); + const entries = await plugins.fs.readdir(sourceDirArg); + for (const entry of entries) { + await plugins.fs.rm(plugins.path.join(targetDirArg, entry), { recursive: true, force: true }); + await plugins.fs.rename( + plugins.path.join(sourceDirArg, entry), + plugins.path.join(targetDirArg, entry), + ); + } + } + + private async snapshotVolume(requestArg: interfaces.ICoreStoreVolumeSnapshotRequest) { + const volume = this.getExistingVolume(requestArg.name); + await plugins.fs.mkdir(volume.mountpoint, { recursive: true }); + const archive = await this.getVolumeArchive(); + const tags = { + corestore: 'volume', + volumeName: volume.name, + serviceId: volume.serviceId || '', + serviceName: volume.serviceName || '', + mountPath: volume.mountPath || '', + ...(requestArg.snapshotName ? { snapshotName: requestArg.snapshotName } : {}), + ...(requestArg.tags || {}), + }; + const tarStream = this.createDirectoryTarStream(volume.mountpoint); + const snapshot = await archive.ingest(tarStream.stream, { + tags, + items: [{ name: 'volume.tar', type: 'volume-tar' }], + }); + await tarStream.completion; + const snapshotEntry: interfaces.ICoreStoreVolumeSnapshotEntry = { + snapshotId: snapshot.id, + snapshotName: requestArg.snapshotName, + createdAt: Date.now(), + originalSize: snapshot.originalSize, + storedSize: snapshot.storedSize, + tags, + }; + volume.snapshots.push(snapshotEntry); + volume.updatedAt = Date.now(); + await this.saveManifest(); + return { + volume: this.getVolumeInfo(volume), + snapshot, + }; + } + + private async restoreVolume(requestArg: interfaces.ICoreStoreVolumeRestoreRequest) { + const volume = this.getExistingVolume(requestArg.name); + const archive = await this.getVolumeArchive(); + const restoreStream = await archive.restore(requestArg.snapshotId, { item: 'volume.tar' }); + const restoreDir = plugins.path.join( + plugins.path.dirname(volume.mountpoint), + `.restore-${Date.now()}-${plugins.crypto.randomBytes(4).toString('hex')}`, + ); + + try { + await this.extractTarStreamToDirectory(restoreStream, restoreDir); + if (requestArg.clear !== false) { + await this.clearDirectoryContents(volume.mountpoint); + } + await this.moveDirectoryContents(restoreDir, volume.mountpoint); + volume.updatedAt = Date.now(); + await this.saveManifest(); + return { + ok: true, + volume: this.getVolumeInfo(volume), + snapshotId: requestArg.snapshotId, + }; + } finally { + await plugins.fs.rm(restoreDir, { recursive: true, force: true }).catch(() => {}); + } + } + private async getHealth() { const [dbHealth, storageHealth] = await Promise.all([ this.smartDb?.getHealth(), @@ -328,6 +846,11 @@ export class CoreStore { port: this.options.s3Port, health: storageHealth, }, + volumes: { + pluginSocketPath: this.options.volumePluginSocketPath, + running: Boolean(this.volumePluginServer), + count: Object.keys(this.manifest.volumes).length, + }, }; } @@ -339,6 +862,12 @@ export class CoreStore { return { database: dbMetrics, objectstorage: storageMetrics, + volumes: { + count: Object.keys(this.manifest.volumes).length, + snapshots: Object.values(this.manifest.volumes).reduce((sumArg, volumeArg) => { + return sumArg + volumeArg.snapshots.length; + }, 0), + }, }; } diff --git a/ts/corestore.interfaces.ts b/ts/corestore.interfaces.ts index e2e933a..025b12a 100644 --- a/ts/corestore.interfaces.ts +++ b/ts/corestore.interfaces.ts @@ -1,5 +1,31 @@ export type TCoreStoreCapability = 'database' | 'objectstorage'; +export interface ICoreStoreVolumeCreateRequest { + name: string; + opts?: Record; + options?: Record; + serviceId?: string; + serviceName?: string; + mountPath?: string; + backup?: boolean; +} + +export interface ICoreStoreVolumeRemoveRequest { + name: string; +} + +export interface ICoreStoreVolumeSnapshotRequest { + name: string; + tags?: Record; + snapshotName?: string; +} + +export interface ICoreStoreVolumeRestoreRequest { + name: string; + snapshotId: string; + clear?: boolean; +} + export interface ICoreStoreProvisionRequest { serviceId: string; serviceName?: string; @@ -47,9 +73,34 @@ export interface ICoreStoreServiceManifestEntry { updatedAt: number; } +export interface ICoreStoreVolumeSnapshotEntry { + snapshotId: string; + snapshotName?: string; + createdAt: number; + originalSize: number; + storedSize: number; + tags: Record; +} + +export interface ICoreStoreVolumeManifestEntry { + name: string; + directoryName: string; + mountpoint: string; + options: Record; + serviceId?: string; + serviceName?: string; + mountPath?: string; + backup: boolean; + mountIds: string[]; + snapshots: ICoreStoreVolumeSnapshotEntry[]; + createdAt: number; + updatedAt: number; +} + export interface ICoreStoreManifest { version: 1; services: Record; + volumes: Record; } export interface ICoreStoreProvisionResponse { diff --git a/ts/corestore.plugins.ts b/ts/corestore.plugins.ts index a3c4a2e..c588410 100644 --- a/ts/corestore.plugins.ts +++ b/ts/corestore.plugins.ts @@ -1,10 +1,12 @@ // native +import * as childProcess from 'node:child_process'; import * as crypto from 'node:crypto'; import * as fs from 'node:fs/promises'; import * as http from 'node:http'; import * as path from 'node:path'; +import * as streamPromises from 'node:stream/promises'; -export { crypto, fs, http, path }; +export { childProcess, crypto, fs, http, path, streamPromises }; // @push.rocks scope import * as projectinfo from '@push.rocks/projectinfo'; @@ -13,3 +15,8 @@ import * as smartstorage from '@push.rocks/smartstorage'; import * as smartdb from '@push.rocks/smartdb'; export { projectinfo, smartpath, smartstorage, smartdb }; + +// @serve.zone scope +import * as containerarchive from '@serve.zone/containerarchive'; + +export { containerarchive };