feat: add corestore volume driver

This commit is contained in:
2026-05-02 18:58:21 +00:00
parent 29f0d94e86
commit 02d1b77ae8
6 changed files with 644 additions and 7 deletions
+531 -2
View File
@@ -10,6 +10,7 @@ export interface ICoreStoreOptions {
dbPort?: number;
region?: string;
apiToken?: string;
volumePluginSocketPath?: string;
}
type TResolvedCoreStoreOptions = Required<Omit<ICoreStoreOptions, 'apiToken'>> & {
@@ -21,7 +22,9 @@ export class CoreStore {
private smartStorage: plugins.smartstorage.SmartStorage | null = null;
private smartDb: plugins.smartdb.SmartdbServer | null = null;
private controlServer: plugins.http.Server | null = null;
private manifest: interfaces.ICoreStoreManifest = { version: 1, services: {} };
private volumePluginServer: plugins.http.Server | null = null;
private volumeArchive: plugins.containerarchive.ContainerArchive | null = null;
private manifest: interfaces.ICoreStoreManifest = { version: 1, services: {}, volumes: {} };
private secretFile: interfaces.ICoreStoreSecretFile | null = null;
constructor(optionsArg: ICoreStoreOptions = {}) {
@@ -33,6 +36,10 @@ export class CoreStore {
s3Port: optionsArg.s3Port || this.getNumberEnv('CORESTORE_S3_PORT', 9000),
dbPort: optionsArg.dbPort || this.getNumberEnv('CORESTORE_DB_PORT', 27017),
region: optionsArg.region || process.env.CORESTORE_REGION || 'us-east-1',
volumePluginSocketPath:
optionsArg.volumePluginSocketPath ||
process.env.CORESTORE_VOLUME_PLUGIN_SOCKET ||
'/run/docker/plugins/corestore.sock',
...(optionsArg.apiToken || process.env.CORESTORE_API_TOKEN
? { apiToken: optionsArg.apiToken || process.env.CORESTORE_API_TOKEN }
: {}),
@@ -46,15 +53,27 @@ export class CoreStore {
await this.startSmartStorage();
await this.startSmartDb();
await this.startControlApi();
await this.startVolumePlugin();
}
public async stop() {
if (this.volumePluginServer) {
await new Promise<void>((resolve, reject) => {
this.volumePluginServer!.close((errorArg) => (errorArg ? reject(errorArg) : resolve()));
});
this.volumePluginServer = null;
await plugins.fs.unlink(this.options.volumePluginSocketPath).catch(() => {});
}
if (this.controlServer) {
await new Promise<void>((resolve, reject) => {
this.controlServer!.close((errorArg) => (errorArg ? reject(errorArg) : resolve()));
});
this.controlServer = null;
}
if (this.volumeArchive) {
await this.volumeArchive.close();
this.volumeArchive = null;
}
if (this.smartStorage) {
await this.smartStorage.stop();
this.smartStorage = null;
@@ -79,6 +98,14 @@ export class CoreStore {
return plugins.path.join(this.options.dataDir, 'smartdb');
}
private getVolumesDir() {
return plugins.path.join(this.options.dataDir, 'volumes');
}
private getVolumeArchiveDir() {
return plugins.path.join(this.options.dataDir, 'volume-archive');
}
private getManifestPath() {
return plugins.path.join(this.options.dataDir, 'corestore-manifest.json');
}
@@ -91,6 +118,8 @@ export class CoreStore {
await plugins.fs.mkdir(this.options.dataDir, { recursive: true });
await plugins.fs.mkdir(this.getStorageDir(), { recursive: true });
await plugins.fs.mkdir(this.getDbDir(), { recursive: true });
await plugins.fs.mkdir(this.getVolumesDir(), { recursive: true });
await plugins.fs.mkdir(this.getVolumeArchiveDir(), { recursive: true });
}
private async loadOrCreateSecretFile(): Promise<interfaces.ICoreStoreSecretFile> {
@@ -117,10 +146,18 @@ export class CoreStore {
}
private async loadManifest(): Promise<interfaces.ICoreStoreManifest> {
return (await this.readJsonFile<interfaces.ICoreStoreManifest>(this.getManifestPath())) || {
const manifest = (await this.readJsonFile<interfaces.ICoreStoreManifest>(this.getManifestPath())) || {
version: 1,
services: {},
volumes: {},
};
manifest.services = manifest.services || {};
manifest.volumes = manifest.volumes || {};
for (const volume of Object.values(manifest.volumes)) {
volume.mountIds = volume.mountIds || [];
volume.snapshots = volume.snapshots || [];
}
return manifest;
}
private async saveManifest() {
@@ -220,6 +257,34 @@ export class CoreStore {
});
}
private async startVolumePlugin() {
const socketPath = this.options.volumePluginSocketPath;
await plugins.fs.mkdir(plugins.path.dirname(socketPath), { recursive: true });
await this.removeStaleSocket(socketPath);
this.volumePluginServer = plugins.http.createServer(async (reqArg, resArg) => {
await this.handleVolumePluginRequest(reqArg, resArg);
});
await new Promise<void>((resolve) => {
this.volumePluginServer!.listen(socketPath, resolve);
});
await plugins.fs.chmod(socketPath, 0o660).catch(() => {});
}
private async removeStaleSocket(socketPathArg: string) {
try {
const stat = await plugins.fs.stat(socketPathArg);
if (!stat.isSocket()) {
throw new Error(`${socketPathArg} exists and is not a socket`);
}
await plugins.fs.unlink(socketPathArg);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}
}
}
private async handleControlRequest(
reqArg: plugins.http.IncomingMessage,
resArg: plugins.http.ServerResponse,
@@ -242,6 +307,19 @@ export class CoreStore {
return;
}
if (method === 'GET' && url.pathname === '/volumes') {
this.sendJson(resArg, 200, {
volumes: Object.values(this.manifest.volumes).map((volumeArg) => this.getVolumeInfo(volumeArg)),
});
return;
}
if (method === 'GET' && url.pathname === '/volumes/snapshots') {
const volumeName = url.searchParams.get('name');
this.sendJson(resArg, 200, await this.listVolumeSnapshots(volumeName || undefined));
return;
}
if (method === 'GET' && url.pathname === '/resources') {
this.sendJson(resArg, 200, {
services: Object.values(this.manifest.services).map((serviceArg) => ({
@@ -259,6 +337,31 @@ export class CoreStore {
return;
}
if (method === 'POST' && url.pathname === '/volumes/create') {
const body = await this.readRequestBody<interfaces.ICoreStoreVolumeCreateRequest>(reqArg);
const volume = await this.createOrUpdateVolume(body);
this.sendJson(resArg, 200, { volume: this.getVolumeInfo(volume) });
return;
}
if (method === 'POST' && url.pathname === '/volumes/remove') {
const body = await this.readRequestBody<interfaces.ICoreStoreVolumeRemoveRequest>(reqArg);
this.sendJson(resArg, 200, await this.removeVolume(body.name));
return;
}
if (method === 'POST' && url.pathname === '/volumes/snapshot') {
const body = await this.readRequestBody<interfaces.ICoreStoreVolumeSnapshotRequest>(reqArg);
this.sendJson(resArg, 200, await this.snapshotVolume(body));
return;
}
if (method === 'POST' && url.pathname === '/volumes/restore') {
const body = await this.readRequestBody<interfaces.ICoreStoreVolumeRestoreRequest>(reqArg);
this.sendJson(resArg, 200, await this.restoreVolume(body));
return;
}
if (method === 'POST' && url.pathname === '/resources/provision') {
const body = await this.readRequestBody<interfaces.ICoreStoreProvisionRequest>(reqArg);
this.sendJson(resArg, 200, await this.provisionForService(body));
@@ -308,6 +411,421 @@ export class CoreStore {
resArg.end(body);
}
private async handleVolumePluginRequest(
reqArg: plugins.http.IncomingMessage,
resArg: plugins.http.ServerResponse,
) {
const url = new URL(reqArg.url || '/', 'http://docker-plugin.local');
const pathName = url.pathname;
if (reqArg.method !== 'POST') {
this.sendJson(resArg, 404, { Err: 'not found' });
return;
}
try {
if (pathName === '/Plugin.Activate') {
this.sendJson(resArg, 200, { Implements: ['VolumeDriver'] });
return;
}
if (pathName === '/VolumeDriver.Capabilities') {
this.sendJson(resArg, 200, { Capabilities: { Scope: 'local' } });
return;
}
const body = await this.readRequestBody<Record<string, any>>(reqArg);
if (pathName === '/VolumeDriver.Create') {
await this.createOrUpdateVolume({
name: body.Name,
opts: body.Opts || {},
});
this.sendJson(resArg, 200, { Err: '' });
return;
}
if (pathName === '/VolumeDriver.Remove') {
await this.removeVolume(body.Name);
this.sendJson(resArg, 200, { Err: '' });
return;
}
if (pathName === '/VolumeDriver.Mount') {
const volume = await this.mountVolume(body.Name, body.ID, body.Opts || {});
this.sendJson(resArg, 200, { Mountpoint: volume.mountpoint, Err: '' });
return;
}
if (pathName === '/VolumeDriver.Unmount') {
await this.unmountVolume(body.Name, body.ID);
this.sendJson(resArg, 200, { Err: '' });
return;
}
if (pathName === '/VolumeDriver.Path') {
const volume = this.getExistingVolume(body.Name);
this.sendJson(resArg, 200, { Mountpoint: volume.mountpoint, Err: '' });
return;
}
if (pathName === '/VolumeDriver.Get') {
const volume = this.getExistingVolume(body.Name);
this.sendJson(resArg, 200, { Volume: this.getVolumeInfo(volume), Err: '' });
return;
}
if (pathName === '/VolumeDriver.List') {
this.sendJson(resArg, 200, {
Volumes: Object.values(this.manifest.volumes).map((volumeArg) => this.getVolumeInfo(volumeArg)),
Err: '',
});
return;
}
this.sendJson(resArg, 404, { Err: 'not found' });
} catch (error) {
this.sendJson(resArg, 200, { Err: (error as Error).message });
}
}
private normalizeVolumeName(nameArg: string) {
const name = String(nameArg || '').trim();
if (!name) {
throw new Error('volume name is required');
}
if (name.startsWith('/') || name.includes('\0')) {
throw new Error(`invalid volume name ${name}`);
}
return name;
}
private normalizeVolumeOptions(optionsArg: Record<string, unknown> = {}) {
const options: Record<string, string> = {};
for (const [key, value] of Object.entries(optionsArg)) {
if (value === undefined || value === null) {
continue;
}
options[key] = String(value);
}
return options;
}
private getVolumeDirectoryName(volumeNameArg: string) {
const safeName = volumeNameArg
.toLowerCase()
.replace(/[^a-z0-9_.-]+/g, '-')
.replace(/^-+|-+$/g, '')
.slice(0, 64)
.replace(/[-_.]+$/g, '') || 'volume';
const hash = plugins.crypto.createHash('sha1').update(volumeNameArg).digest('hex').slice(0, 12);
return `${safeName}-${hash}`;
}
private parseBooleanOption(valueArg: unknown, defaultArg: boolean) {
if (typeof valueArg === 'boolean') {
return valueArg;
}
if (typeof valueArg !== 'string') {
return defaultArg;
}
if (['true', '1', 'yes', 'on'].includes(valueArg.toLowerCase())) {
return true;
}
if (['false', '0', 'no', 'off'].includes(valueArg.toLowerCase())) {
return false;
}
return defaultArg;
}
private async createOrUpdateVolume(
requestArg: interfaces.ICoreStoreVolumeCreateRequest,
): Promise<interfaces.ICoreStoreVolumeManifestEntry> {
const name = this.normalizeVolumeName(requestArg.name);
const options = this.normalizeVolumeOptions({
...(requestArg.opts || {}),
...(requestArg.options || {}),
});
const now = Date.now();
let volume = this.manifest.volumes[name];
if (!volume) {
const directoryName = this.getVolumeDirectoryName(name);
volume = {
name,
directoryName,
mountpoint: plugins.path.join(this.getVolumesDir(), directoryName, 'data'),
options,
serviceId: requestArg.serviceId || options.serviceId,
serviceName: requestArg.serviceName || options.serviceName,
mountPath: requestArg.mountPath || options.mountPath,
backup: requestArg.backup ?? this.parseBooleanOption(options.backup, true),
mountIds: [],
snapshots: [],
createdAt: now,
updatedAt: now,
};
this.manifest.volumes[name] = volume;
} else {
volume.options = {
...volume.options,
...options,
};
volume.serviceId = requestArg.serviceId || options.serviceId || volume.serviceId;
volume.serviceName = requestArg.serviceName || options.serviceName || volume.serviceName;
volume.mountPath = requestArg.mountPath || options.mountPath || volume.mountPath;
volume.backup = requestArg.backup ?? this.parseBooleanOption(options.backup, volume.backup);
volume.mountIds = volume.mountIds || [];
volume.snapshots = volume.snapshots || [];
volume.updatedAt = now;
}
await plugins.fs.mkdir(volume.mountpoint, { recursive: true });
await this.saveManifest();
return volume;
}
private getExistingVolume(nameArg: string) {
const name = this.normalizeVolumeName(nameArg);
const volume = this.manifest.volumes[name];
if (!volume) {
throw new Error(`volume ${name} does not exist`);
}
return volume;
}
private async mountVolume(
nameArg: string,
mountIdArg?: string,
optionsArg: Record<string, string> = {},
) {
const volume = await this.createOrUpdateVolume({
name: nameArg,
opts: optionsArg,
});
if (mountIdArg && !volume.mountIds.includes(mountIdArg)) {
volume.mountIds.push(mountIdArg);
volume.updatedAt = Date.now();
await this.saveManifest();
}
await plugins.fs.mkdir(volume.mountpoint, { recursive: true });
return volume;
}
private async unmountVolume(nameArg: string, mountIdArg?: string) {
const volume = this.getExistingVolume(nameArg);
if (mountIdArg && volume.mountIds.includes(mountIdArg)) {
volume.mountIds = volume.mountIds.filter((mountId) => mountId !== mountIdArg);
volume.updatedAt = Date.now();
await this.saveManifest();
}
return { ok: true };
}
private async removeVolume(nameArg: string) {
const volume = this.getExistingVolume(nameArg);
if (volume.mountIds.length > 0) {
throw new Error(`volume ${volume.name} is still mounted`);
}
await plugins.fs.rm(plugins.path.dirname(volume.mountpoint), { recursive: true, force: true });
delete this.manifest.volumes[volume.name];
await this.saveManifest();
return { ok: true, removed: volume.name };
}
private getVolumeInfo(volumeArg: interfaces.ICoreStoreVolumeManifestEntry) {
return {
Name: volumeArg.name,
Mountpoint: volumeArg.mountpoint,
Status: {
driver: 'corestore',
serviceId: volumeArg.serviceId || '',
serviceName: volumeArg.serviceName || '',
mountPath: volumeArg.mountPath || '',
backup: String(volumeArg.backup),
mountCount: String(volumeArg.mountIds.length),
snapshotCount: String(volumeArg.snapshots.length),
createdAt: String(volumeArg.createdAt),
updatedAt: String(volumeArg.updatedAt),
},
};
}
private async listVolumeSnapshots(volumeNameArg?: string) {
if (volumeNameArg) {
const volume = this.getExistingVolume(volumeNameArg);
return {
volumeName: volume.name,
snapshots: volume.snapshots,
};
}
return {
snapshots: Object.values(this.manifest.volumes).flatMap((volumeArg) => {
return volumeArg.snapshots.map((snapshotArg) => ({
...snapshotArg,
volumeName: volumeArg.name,
}));
}),
};
}
private async getVolumeArchive() {
if (this.volumeArchive) {
return this.volumeArchive;
}
const archiveDir = this.getVolumeArchiveDir();
await plugins.fs.mkdir(archiveDir, { recursive: true });
const passphrase = process.env.CORESTORE_ARCHIVE_PASSPHRASE;
const options = passphrase ? { passphrase } : undefined;
const configPath = plugins.path.join(archiveDir, 'config.json');
try {
await plugins.fs.stat(configPath);
this.volumeArchive = await plugins.containerarchive.ContainerArchive.open(archiveDir, options);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}
this.volumeArchive = await plugins.containerarchive.ContainerArchive.init(archiveDir, options);
}
return this.volumeArchive;
}
private createDirectoryTarStream(directoryArg: string) {
const tarProcess = plugins.childProcess.spawn('tar', ['-C', directoryArg, '-cf', '-', '.'], {
stdio: ['ignore', 'pipe', 'pipe'],
});
const stderrChunks: Buffer[] = [];
tarProcess.stderr?.on('data', (chunkArg) => {
stderrChunks.push(Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg));
});
const completion = new Promise<void>((resolve, reject) => {
tarProcess.on('error', reject);
tarProcess.on('close', (codeArg) => {
if (codeArg === 0) {
resolve();
} else {
reject(new Error(`tar create failed: ${Buffer.concat(stderrChunks).toString('utf8').trim()}`));
}
});
});
if (!tarProcess.stdout) {
throw new Error('tar stdout is unavailable');
}
return { stream: tarProcess.stdout, completion };
}
private async extractTarStreamToDirectory(
inputStreamArg: NodeJS.ReadableStream,
directoryArg: string,
) {
await plugins.fs.mkdir(directoryArg, { recursive: true });
const tarProcess = plugins.childProcess.spawn('tar', ['-C', directoryArg, '-xf', '-'], {
stdio: ['pipe', 'ignore', 'pipe'],
});
const stderrChunks: Buffer[] = [];
tarProcess.stderr?.on('data', (chunkArg) => {
stderrChunks.push(Buffer.isBuffer(chunkArg) ? chunkArg : Buffer.from(chunkArg));
});
const completion = new Promise<void>((resolve, reject) => {
tarProcess.on('error', reject);
tarProcess.on('close', (codeArg) => {
if (codeArg === 0) {
resolve();
} else {
reject(new Error(`tar extract failed: ${Buffer.concat(stderrChunks).toString('utf8').trim()}`));
}
});
});
if (!tarProcess.stdin) {
throw new Error('tar stdin is unavailable');
}
await plugins.streamPromises.pipeline(inputStreamArg as any, tarProcess.stdin as any);
await completion;
}
private async clearDirectoryContents(directoryArg: string) {
await plugins.fs.mkdir(directoryArg, { recursive: true });
const entries = await plugins.fs.readdir(directoryArg);
await Promise.all(entries.map((entryArg) => {
return plugins.fs.rm(plugins.path.join(directoryArg, entryArg), { recursive: true, force: true });
}));
}
private async moveDirectoryContents(sourceDirArg: string, targetDirArg: string) {
await plugins.fs.mkdir(targetDirArg, { recursive: true });
const entries = await plugins.fs.readdir(sourceDirArg);
for (const entry of entries) {
await plugins.fs.rm(plugins.path.join(targetDirArg, entry), { recursive: true, force: true });
await plugins.fs.rename(
plugins.path.join(sourceDirArg, entry),
plugins.path.join(targetDirArg, entry),
);
}
}
private async snapshotVolume(requestArg: interfaces.ICoreStoreVolumeSnapshotRequest) {
const volume = this.getExistingVolume(requestArg.name);
await plugins.fs.mkdir(volume.mountpoint, { recursive: true });
const archive = await this.getVolumeArchive();
const tags = {
corestore: 'volume',
volumeName: volume.name,
serviceId: volume.serviceId || '',
serviceName: volume.serviceName || '',
mountPath: volume.mountPath || '',
...(requestArg.snapshotName ? { snapshotName: requestArg.snapshotName } : {}),
...(requestArg.tags || {}),
};
const tarStream = this.createDirectoryTarStream(volume.mountpoint);
const snapshot = await archive.ingest(tarStream.stream, {
tags,
items: [{ name: 'volume.tar', type: 'volume-tar' }],
});
await tarStream.completion;
const snapshotEntry: interfaces.ICoreStoreVolumeSnapshotEntry = {
snapshotId: snapshot.id,
snapshotName: requestArg.snapshotName,
createdAt: Date.now(),
originalSize: snapshot.originalSize,
storedSize: snapshot.storedSize,
tags,
};
volume.snapshots.push(snapshotEntry);
volume.updatedAt = Date.now();
await this.saveManifest();
return {
volume: this.getVolumeInfo(volume),
snapshot,
};
}
private async restoreVolume(requestArg: interfaces.ICoreStoreVolumeRestoreRequest) {
const volume = this.getExistingVolume(requestArg.name);
const archive = await this.getVolumeArchive();
const restoreStream = await archive.restore(requestArg.snapshotId, { item: 'volume.tar' });
const restoreDir = plugins.path.join(
plugins.path.dirname(volume.mountpoint),
`.restore-${Date.now()}-${plugins.crypto.randomBytes(4).toString('hex')}`,
);
try {
await this.extractTarStreamToDirectory(restoreStream, restoreDir);
if (requestArg.clear !== false) {
await this.clearDirectoryContents(volume.mountpoint);
}
await this.moveDirectoryContents(restoreDir, volume.mountpoint);
volume.updatedAt = Date.now();
await this.saveManifest();
return {
ok: true,
volume: this.getVolumeInfo(volume),
snapshotId: requestArg.snapshotId,
};
} finally {
await plugins.fs.rm(restoreDir, { recursive: true, force: true }).catch(() => {});
}
}
private async getHealth() {
const [dbHealth, storageHealth] = await Promise.all([
this.smartDb?.getHealth(),
@@ -328,6 +846,11 @@ export class CoreStore {
port: this.options.s3Port,
health: storageHealth,
},
volumes: {
pluginSocketPath: this.options.volumePluginSocketPath,
running: Boolean(this.volumePluginServer),
count: Object.keys(this.manifest.volumes).length,
},
};
}
@@ -339,6 +862,12 @@ export class CoreStore {
return {
database: dbMetrics,
objectstorage: storageMetrics,
volumes: {
count: Object.keys(this.manifest.volumes).length,
snapshots: Object.values(this.manifest.volumes).reduce((sumArg, volumeArg) => {
return sumArg + volumeArg.snapshots.length;
}, 0),
},
};
}
+51
View File
@@ -1,5 +1,31 @@
export type TCoreStoreCapability = 'database' | 'objectstorage';
export interface ICoreStoreVolumeCreateRequest {
name: string;
opts?: Record<string, string>;
options?: Record<string, string>;
serviceId?: string;
serviceName?: string;
mountPath?: string;
backup?: boolean;
}
export interface ICoreStoreVolumeRemoveRequest {
name: string;
}
export interface ICoreStoreVolumeSnapshotRequest {
name: string;
tags?: Record<string, string>;
snapshotName?: string;
}
export interface ICoreStoreVolumeRestoreRequest {
name: string;
snapshotId: string;
clear?: boolean;
}
export interface ICoreStoreProvisionRequest {
serviceId: string;
serviceName?: string;
@@ -47,9 +73,34 @@ export interface ICoreStoreServiceManifestEntry {
updatedAt: number;
}
export interface ICoreStoreVolumeSnapshotEntry {
snapshotId: string;
snapshotName?: string;
createdAt: number;
originalSize: number;
storedSize: number;
tags: Record<string, string>;
}
export interface ICoreStoreVolumeManifestEntry {
name: string;
directoryName: string;
mountpoint: string;
options: Record<string, string>;
serviceId?: string;
serviceName?: string;
mountPath?: string;
backup: boolean;
mountIds: string[];
snapshots: ICoreStoreVolumeSnapshotEntry[];
createdAt: number;
updatedAt: number;
}
export interface ICoreStoreManifest {
version: 1;
services: Record<string, ICoreStoreServiceManifestEntry>;
volumes: Record<string, ICoreStoreVolumeManifestEntry>;
}
export interface ICoreStoreProvisionResponse {
+8 -1
View File
@@ -1,10 +1,12 @@
// native
import * as childProcess from 'node:child_process';
import * as crypto from 'node:crypto';
import * as fs from 'node:fs/promises';
import * as http from 'node:http';
import * as path from 'node:path';
import * as streamPromises from 'node:stream/promises';
export { crypto, fs, http, path };
export { childProcess, crypto, fs, http, path, streamPromises };
// @push.rocks scope
import * as projectinfo from '@push.rocks/projectinfo';
@@ -13,3 +15,8 @@ import * as smartstorage from '@push.rocks/smartstorage';
import * as smartdb from '@push.rocks/smartdb';
export { projectinfo, smartpath, smartstorage, smartdb };
// @serve.zone scope
import * as containerarchive from '@serve.zone/containerarchive';
export { containerarchive };