Files
objectstorage/ts/classes/objectstoragecontainer.ts
T

619 lines
21 KiB
TypeScript
Raw Normal View History

import * as plugins from '../plugins.ts';
import { defaultConfig, type IObjectStorageConfig } from '../types.ts';
import type * as interfaces from '../../ts_interfaces/index.ts';
import { OpsServer } from '../opsserver/index.ts';
import { PolicyManager } from './policymanager.ts';
import { AuditLogger } from './auditlogger.ts';
interface IPersistedAdminConfig {
accessCredentials?: Array<{ accessKeyId: string; secretAccessKey: string }>;
}
export class ObjectStorageContainer {
public config: IObjectStorageConfig;
public smartstorageInstance!: plugins.smartstorage.SmartStorage;
public s3Client!: plugins.S3Client;
public opsServer: OpsServer;
public policyManager: PolicyManager;
public auditLogger: AuditLogger;
public startedAt: number = 0;
private envAccessCredentialsProvided = false;
constructor(configArg?: Partial<IObjectStorageConfig>) {
this.config = { ...defaultConfig, ...configArg };
// Read environment variables (override config)
const envPort = Deno.env.get('OBJST_PORT');
if (envPort) this.config.objstPort = parseInt(envPort, 10);
const envUiPort = Deno.env.get('UI_PORT');
if (envUiPort) this.config.uiPort = parseInt(envUiPort, 10);
const envStorageDir = Deno.env.get('OBJST_STORAGE_DIR');
if (envStorageDir) this.config.storageDirectory = envStorageDir;
const envAccessKey = Deno.env.get('OBJST_ACCESS_KEY');
const envSecretKey = Deno.env.get('OBJST_SECRET_KEY');
if (envAccessKey && envSecretKey) {
this.envAccessCredentialsProvided = true;
this.config.accessCredentials = [
{ accessKeyId: envAccessKey, secretAccessKey: envSecretKey },
];
}
const envAdminPassword = Deno.env.get('OBJST_ADMIN_PASSWORD');
if (envAdminPassword) this.config.adminPassword = envAdminPassword;
const envRegion = Deno.env.get('OBJST_REGION');
if (envRegion) this.config.region = envRegion;
// Cluster environment variables
const envClusterEnabled = Deno.env.get('OBJST_CLUSTER_ENABLED');
if (envClusterEnabled) {
this.config.clusterEnabled = envClusterEnabled === 'true' || envClusterEnabled === '1';
}
const envClusterNodeId = Deno.env.get('OBJST_CLUSTER_NODE_ID');
if (envClusterNodeId) this.config.clusterNodeId = envClusterNodeId;
const envClusterQuicPort = Deno.env.get('OBJST_CLUSTER_QUIC_PORT');
if (envClusterQuicPort) this.config.clusterQuicPort = parseInt(envClusterQuicPort, 10);
const envClusterSeedNodes = Deno.env.get('OBJST_CLUSTER_SEED_NODES');
if (envClusterSeedNodes) {
this.config.clusterSeedNodes = envClusterSeedNodes.split(',').map((s) => s.trim()).filter(
Boolean,
);
}
const envDrivePaths = Deno.env.get('OBJST_DRIVE_PATHS');
if (envDrivePaths) {
this.config.drivePaths = envDrivePaths.split(',').map((s) => s.trim()).filter(Boolean);
}
const envErasureDataShards = Deno.env.get('OBJST_ERASURE_DATA_SHARDS');
if (envErasureDataShards) this.config.erasureDataShards = parseInt(envErasureDataShards, 10);
const envErasureParityShards = Deno.env.get('OBJST_ERASURE_PARITY_SHARDS');
if (envErasureParityShards) {
this.config.erasureParityShards = parseInt(envErasureParityShards, 10);
}
const envErasureChunkSize = Deno.env.get('OBJST_ERASURE_CHUNK_SIZE');
if (envErasureChunkSize) this.config.erasureChunkSizeBytes = parseInt(envErasureChunkSize, 10);
const envHeartbeatInterval = Deno.env.get('OBJST_HEARTBEAT_INTERVAL_MS');
if (envHeartbeatInterval) {
this.config.clusterHeartbeatIntervalMs = parseInt(envHeartbeatInterval, 10);
}
const envHeartbeatTimeout = Deno.env.get('OBJST_HEARTBEAT_TIMEOUT_MS');
if (envHeartbeatTimeout) {
this.config.clusterHeartbeatTimeoutMs = parseInt(envHeartbeatTimeout, 10);
}
this.opsServer = new OpsServer(this);
this.policyManager = new PolicyManager(this);
this.auditLogger = new AuditLogger(this.config.storageDirectory);
}
public async start(): Promise<void> {
this.assertSecureStartupConfig();
await this.loadPersistedAdminConfig();
console.log(`Starting ObjectStorage...`);
console.log(` Storage port: ${this.config.objstPort}`);
console.log(` UI port: ${this.config.uiPort}`);
console.log(` Storage: ${this.config.storageDirectory}`);
console.log(` Region: ${this.config.region}`);
console.log(` Cluster: ${this.config.clusterEnabled ? 'enabled' : 'disabled'}`);
if (this.config.clusterEnabled) {
console.log(` Node ID: ${this.config.clusterNodeId || '(auto-generated)'}`);
console.log(` QUIC Port: ${this.config.clusterQuicPort}`);
console.log(` Seed Nodes: ${this.config.clusterSeedNodes.join(', ') || '(none)'}`);
console.log(
` Drives: ${
this.config.drivePaths.length > 0
? this.config.drivePaths.join(', ')
: this.config.storageDirectory
}`,
);
console.log(` Erasure: ${this.config.erasureDataShards}+${this.config.erasureParityShards}`);
}
this.smartstorageInstance = await plugins.smartstorage.SmartStorage.createAndStart(
this.buildSmartstorageConfig(),
);
this.startedAt = Date.now();
console.log(`Storage server started on port ${this.config.objstPort}`);
await this.refreshManagementClient();
// Load named policies
await this.policyManager.load();
// Start UI server
await this.opsServer.start(this.config.uiPort);
console.log(`Management UI started on port ${this.config.uiPort}`);
}
public async stop(): Promise<void> {
console.log('Stopping ObjectStorage...');
await this.opsServer.stop();
await this.smartstorageInstance.stop();
console.log('ObjectStorage stopped.');
}
public async replaceAccessCredentials(
credentials: Array<{ accessKeyId: string; secretAccessKey: string }>,
): Promise<void> {
const nextCredentials = credentials.map((credential) => ({ ...credential }));
const previousCredentials = this.config.accessCredentials.map((credential) => ({
...credential,
}));
if (this.smartstorageInstance) {
await this.smartstorageInstance.replaceCredentials(nextCredentials);
}
this.config.accessCredentials = nextCredentials;
try {
await this.savePersistedAdminConfig();
} catch (error) {
this.config.accessCredentials = previousCredentials;
if (this.smartstorageInstance) {
await this.smartstorageInstance.replaceCredentials(previousCredentials);
await this.refreshManagementClient();
}
throw error;
}
if (this.smartstorageInstance) {
await this.refreshManagementClient();
}
}
public async listAccessCredentials(): Promise<Array<{ accessKeyId: string }>> {
if (!this.smartstorageInstance) {
return this.config.accessCredentials.map((credential) => ({
accessKeyId: credential.accessKeyId,
}));
}
return await this.smartstorageInstance.listCredentials();
}
// ── Management methods ──
public async listBuckets(): Promise<interfaces.data.IBucketInfo[]> {
const summaries = await this.smartstorageInstance.listBucketSummaries();
return summaries.map((bucket) => ({
name: bucket.name,
creationDate: bucket.creationDate || 0,
objectCount: bucket.objectCount,
totalSizeBytes: bucket.totalSizeBytes,
}));
}
public async createBucket(bucketName: string): Promise<void> {
await this.smartstorageInstance.createBucket(bucketName);
}
public async deleteBucket(bucketName: string): Promise<void> {
await this.s3Client.send(new plugins.DeleteBucketCommand({ Bucket: bucketName }));
}
public async listObjects(
bucketName: string,
prefix?: string,
delimiter?: string,
maxKeys?: number,
): Promise<interfaces.data.IObjectListResult> {
const response = await this.s3Client.send(
new plugins.ListObjectsV2Command({
Bucket: bucketName,
Prefix: prefix || '',
Delimiter: delimiter || '/',
MaxKeys: maxKeys || 1000,
}),
);
const objects: interfaces.data.IObjectInfo[] = (response.Contents || []).map((obj) => ({
key: obj.Key || '',
size: obj.Size || 0,
lastModified: obj.LastModified?.getTime() || 0,
etag: obj.ETag || '',
contentType: '',
}));
const commonPrefixes = (response.CommonPrefixes || []).map((p) => p.Prefix || '');
return {
objects,
commonPrefixes,
isTruncated: response.IsTruncated || false,
currentPrefix: prefix || '',
};
}
public async deleteObject(bucketName: string, key: string): Promise<void> {
await this.s3Client.send(
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: key }),
);
}
public async getObject(bucketName: string, key: string): Promise<{
content: string;
contentType: string;
size: number;
lastModified: string;
}> {
const response = await this.s3Client.send(
new plugins.GetObjectCommand({ Bucket: bucketName, Key: key }),
);
const bodyBytes = await response.Body!.transformToByteArray();
// Convert to base64
let binary = '';
for (const byte of bodyBytes) {
binary += String.fromCharCode(byte);
}
const base64Content = btoa(binary);
return {
content: base64Content,
contentType: response.ContentType || 'application/octet-stream',
size: response.ContentLength || 0,
lastModified: response.LastModified?.toISOString() || new Date().toISOString(),
};
}
public async putObject(
bucketName: string,
key: string,
base64Content: string,
contentType: string,
): Promise<void> {
const binaryString = atob(base64Content);
const bytes = new Uint8Array(binaryString.length);
for (let i = 0; i < binaryString.length; i++) {
bytes[i] = binaryString.charCodeAt(i);
}
await this.s3Client.send(
new plugins.PutObjectCommand({
Bucket: bucketName,
Key: key,
Body: bytes,
ContentType: contentType,
}),
);
}
public async deletePrefix(bucketName: string, prefix: string): Promise<void> {
let continuationToken: string | undefined;
do {
const listResp = await this.s3Client.send(
new plugins.ListObjectsV2Command({
Bucket: bucketName,
Prefix: prefix,
ContinuationToken: continuationToken,
}),
);
for (const obj of listResp.Contents || []) {
if (obj.Key) {
await this.s3Client.send(
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: obj.Key }),
);
}
}
continuationToken = listResp.IsTruncated ? listResp.NextContinuationToken : undefined;
} while (continuationToken);
}
public async getObjectUrl(bucketName: string, key: string): Promise<string> {
const descriptor = await this.smartstorageInstance.getStorageDescriptor();
return `http://${descriptor.endpoint}:${descriptor.port}/${bucketName}/${key}`;
}
public async moveObject(
bucketName: string,
sourceKey: string,
destKey: string,
): Promise<{ success: boolean; error?: string }> {
try {
await this.s3Client.send(
new plugins.CopyObjectCommand({
Bucket: bucketName,
CopySource: `${bucketName}/${sourceKey}`,
Key: destKey,
}),
);
await this.s3Client.send(
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: sourceKey }),
);
return { success: true };
} catch (err: any) {
return { success: false, error: err.message };
}
}
public async movePrefix(
bucketName: string,
sourcePrefix: string,
destPrefix: string,
): Promise<{ success: boolean; movedCount?: number; error?: string }> {
try {
let movedCount = 0;
let continuationToken: string | undefined;
do {
const listResp = await this.s3Client.send(
new plugins.ListObjectsV2Command({
Bucket: bucketName,
Prefix: sourcePrefix,
ContinuationToken: continuationToken,
}),
);
for (const obj of listResp.Contents || []) {
if (!obj.Key) continue;
const newKey = destPrefix + obj.Key.slice(sourcePrefix.length);
await this.s3Client.send(
new plugins.CopyObjectCommand({
Bucket: bucketName,
CopySource: `${bucketName}/${obj.Key}`,
Key: newKey,
}),
);
await this.s3Client.send(
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: obj.Key }),
);
movedCount++;
}
continuationToken = listResp.IsTruncated ? listResp.NextContinuationToken : undefined;
} while (continuationToken);
return { success: true, movedCount };
} catch (err: any) {
return { success: false, error: err.message };
}
}
public async getServerStats(): Promise<interfaces.data.IServerStatus> {
const stats = await this.smartstorageInstance.getStorageStats();
return {
running: true,
objstPort: this.config.objstPort,
uiPort: this.config.uiPort,
uptime: Math.floor((Date.now() - this.startedAt) / 1000),
startedAt: this.startedAt,
bucketCount: stats.bucketCount,
totalObjectCount: stats.totalObjectCount,
totalStorageBytes: stats.totalStorageBytes,
storageDirectory: stats.storageDirectory,
region: this.config.region,
authEnabled: true,
};
}
public async getClusterHealth(): Promise<interfaces.data.IClusterHealth> {
return await this.smartstorageInstance.getClusterHealth();
}
public async getBucketPolicy(bucketName: string): Promise<string | null> {
try {
const response = await this.s3Client.send(
new plugins.GetBucketPolicyCommand({ Bucket: bucketName }),
);
return response.Policy || null;
} catch (err: any) {
if (err.name === 'NoSuchBucketPolicy' || err.Code === 'NoSuchBucketPolicy') {
return null;
}
throw err;
}
}
public async putBucketPolicy(bucketName: string, policyJson: string): Promise<void> {
await this.s3Client.send(
new plugins.PutBucketPolicyCommand({ Bucket: bucketName, Policy: policyJson }),
);
}
public async deleteBucketPolicy(bucketName: string): Promise<void> {
await this.s3Client.send(
new plugins.DeleteBucketPolicyCommand({ Bucket: bucketName }),
);
}
public async getConnectionInfo(): Promise<interfaces.data.IConnectionInfo> {
const descriptor = await this.smartstorageInstance.getStorageDescriptor();
return {
endpoint: descriptor.endpoint,
port: Number(descriptor.port ?? this.config.objstPort),
useSsl: descriptor.useSsl ?? false,
accessKey: descriptor.accessKey,
region: this.config.region,
};
}
public isReady(): boolean {
return Boolean(this.smartstorageInstance && this.s3Client && this.startedAt);
}
public async getOperationalHealth(): Promise<Record<string, unknown>> {
const clusterHealth = this.smartstorageInstance
? await this.getClusterHealth()
: { enabled: false };
const stats = this.smartstorageInstance
? await this.getServerStats()
: null;
const ready = this.isReady();
return {
ok: ready,
status: ready ? 'healthy' : 'starting',
startedAt: this.startedAt || null,
uptimeSeconds: this.startedAt ? Math.floor((Date.now() - this.startedAt) / 1000) : 0,
storageDirectory: this.config.storageDirectory,
stats,
cluster: clusterHealth,
};
}
public async getOperationalMetrics(): Promise<string> {
const stats = this.smartstorageInstance
? await this.smartstorageInstance.getStorageStats()
: null;
const clusterHealth = this.smartstorageInstance
? await this.smartstorageInstance.getClusterHealth()
: { enabled: false };
return [
'# HELP objectstorage_ready ObjectStorage readiness state.',
'# TYPE objectstorage_ready gauge',
`objectstorage_ready ${this.isReady() ? 1 : 0}`,
'# HELP objectstorage_buckets_total Runtime bucket count.',
'# TYPE objectstorage_buckets_total gauge',
`objectstorage_buckets_total ${stats?.bucketCount ?? 0}`,
'# HELP objectstorage_objects_total Runtime object count.',
'# TYPE objectstorage_objects_total gauge',
`objectstorage_objects_total ${stats?.totalObjectCount ?? 0}`,
'# HELP objectstorage_cluster_enabled Cluster mode enabled.',
'# TYPE objectstorage_cluster_enabled gauge',
`objectstorage_cluster_enabled ${clusterHealth.enabled ? 1 : 0}`,
'',
].join('\n');
}
private get persistedAdminConfigPath(): string {
return `${this.config.storageDirectory}/.objectstorage/admin-config.json`;
}
private async loadPersistedAdminConfig(): Promise<void> {
if (this.envAccessCredentialsProvided) {
return;
}
try {
const content = await Deno.readTextFile(this.persistedAdminConfigPath);
const persistedConfig = JSON.parse(content) as IPersistedAdminConfig;
const persistedCredentials = persistedConfig.accessCredentials;
if (!Array.isArray(persistedCredentials) || persistedCredentials.length === 0) {
return;
}
const validCredentials = persistedCredentials
.filter((credential) => credential?.accessKeyId && credential?.secretAccessKey)
.map((credential) => ({
accessKeyId: credential.accessKeyId,
secretAccessKey: credential.secretAccessKey,
}));
if (validCredentials.length === 0) {
return;
}
this.config.accessCredentials = validCredentials;
} catch (error) {
if (error instanceof Deno.errors.NotFound) {
return;
}
throw error;
}
}
private async savePersistedAdminConfig(): Promise<void> {
const dirPath = this.persistedAdminConfigPath.substring(
0,
this.persistedAdminConfigPath.lastIndexOf('/'),
);
await Deno.mkdir(dirPath, { recursive: true });
const persistedConfig: IPersistedAdminConfig = {
accessCredentials: this.config.accessCredentials,
};
await Deno.writeTextFile(
this.persistedAdminConfigPath,
JSON.stringify(persistedConfig, null, 2),
{ mode: 0o600 },
);
await this.restrictPersistedAdminConfigPermissions();
}
private async refreshManagementClient(): Promise<void> {
const descriptor = await this.smartstorageInstance.getStorageDescriptor();
this.s3Client = new plugins.S3Client({
endpoint: `http://${descriptor.endpoint}:${descriptor.port}`,
region: this.config.region,
credentials: {
accessKeyId: descriptor.accessKey,
secretAccessKey: descriptor.accessSecret,
},
forcePathStyle: true,
});
}
private buildSmartstorageConfig(): any {
const smartstorageConfig: any = {
server: {
port: this.config.objstPort,
address: '0.0.0.0',
region: this.config.region,
},
storage: {
directory: this.config.storageDirectory,
},
auth: {
enabled: true,
credentials: this.config.accessCredentials,
},
};
if (this.config.clusterEnabled) {
smartstorageConfig.cluster = {
enabled: true,
nodeId: this.config.clusterNodeId || crypto.randomUUID().slice(0, 8),
quicPort: this.config.clusterQuicPort,
seedNodes: this.config.clusterSeedNodes,
erasure: {
dataShards: this.config.erasureDataShards,
parityShards: this.config.erasureParityShards,
chunkSizeBytes: this.config.erasureChunkSizeBytes,
},
drives: {
paths: this.config.drivePaths.length > 0
? this.config.drivePaths
: [this.config.storageDirectory],
},
heartbeatIntervalMs: this.config.clusterHeartbeatIntervalMs,
heartbeatTimeoutMs: this.config.clusterHeartbeatTimeoutMs,
};
}
return smartstorageConfig;
}
private assertSecureStartupConfig(): void {
const allowInsecureDefaults = Deno.env.get('OBJST_ALLOW_INSECURE_DEFAULTS') === 'true';
const usesDefaultAdminPassword = this.config.adminPassword === 'admin';
const usesDefaultAccessCredential = this.config.accessCredentials.some((credential) => {
return credential.accessKeyId === 'admin' && credential.secretAccessKey === 'admin';
});
const looksLikePersistentProductionStorage = this.config.storageDirectory === '/data';
if (
looksLikePersistentProductionStorage &&
!allowInsecureDefaults &&
(usesDefaultAdminPassword || usesDefaultAccessCredential)
) {
throw new Error(
'Refusing to start with default admin credentials on persistent /data storage. Set OBJST_ADMIN_PASSWORD and OBJST_ACCESS_KEY/OBJST_SECRET_KEY, or set OBJST_ALLOW_INSECURE_DEFAULTS=true for disposable development.',
);
}
}
private async restrictPersistedAdminConfigPermissions(): Promise<void> {
try {
await Deno.chmod(this.persistedAdminConfigPath, 0o600);
} catch {
// chmod is not available on every platform Deno supports.
}
}
}