440 lines
15 KiB
TypeScript
440 lines
15 KiB
TypeScript
import * as plugins from '../plugins.ts';
|
|
import { type IObjectStorageConfig, defaultConfig } from '../types.ts';
|
|
import type * as interfaces from '../../ts_interfaces/index.ts';
|
|
import { OpsServer } from '../opsserver/index.ts';
|
|
import { PolicyManager } from './policymanager.ts';
|
|
|
|
export class ObjectStorageContainer {
|
|
public config: IObjectStorageConfig;
|
|
public smartstorageInstance!: plugins.smartstorage.SmartStorage;
|
|
public s3Client!: plugins.S3Client;
|
|
public opsServer: OpsServer;
|
|
public policyManager: PolicyManager;
|
|
public startedAt: number = 0;
|
|
|
|
constructor(configArg?: Partial<IObjectStorageConfig>) {
|
|
this.config = { ...defaultConfig, ...configArg };
|
|
|
|
// Read environment variables (override config)
|
|
const envPort = Deno.env.get('OBJST_PORT');
|
|
if (envPort) this.config.objstPort = parseInt(envPort, 10);
|
|
|
|
const envUiPort = Deno.env.get('UI_PORT');
|
|
if (envUiPort) this.config.uiPort = parseInt(envUiPort, 10);
|
|
|
|
const envStorageDir = Deno.env.get('OBJST_STORAGE_DIR');
|
|
if (envStorageDir) this.config.storageDirectory = envStorageDir;
|
|
|
|
const envAccessKey = Deno.env.get('OBJST_ACCESS_KEY');
|
|
const envSecretKey = Deno.env.get('OBJST_SECRET_KEY');
|
|
if (envAccessKey && envSecretKey) {
|
|
this.config.accessCredentials = [
|
|
{ accessKeyId: envAccessKey, secretAccessKey: envSecretKey },
|
|
];
|
|
}
|
|
|
|
const envAdminPassword = Deno.env.get('OBJST_ADMIN_PASSWORD');
|
|
if (envAdminPassword) this.config.adminPassword = envAdminPassword;
|
|
|
|
const envRegion = Deno.env.get('OBJST_REGION');
|
|
if (envRegion) this.config.region = envRegion;
|
|
|
|
// Cluster environment variables
|
|
const envClusterEnabled = Deno.env.get('OBJST_CLUSTER_ENABLED');
|
|
if (envClusterEnabled) this.config.clusterEnabled = envClusterEnabled === 'true' || envClusterEnabled === '1';
|
|
|
|
const envClusterNodeId = Deno.env.get('OBJST_CLUSTER_NODE_ID');
|
|
if (envClusterNodeId) this.config.clusterNodeId = envClusterNodeId;
|
|
|
|
const envClusterQuicPort = Deno.env.get('OBJST_CLUSTER_QUIC_PORT');
|
|
if (envClusterQuicPort) this.config.clusterQuicPort = parseInt(envClusterQuicPort, 10);
|
|
|
|
const envClusterSeedNodes = Deno.env.get('OBJST_CLUSTER_SEED_NODES');
|
|
if (envClusterSeedNodes) this.config.clusterSeedNodes = envClusterSeedNodes.split(',').map(s => s.trim()).filter(Boolean);
|
|
|
|
const envDrivePaths = Deno.env.get('OBJST_DRIVE_PATHS');
|
|
if (envDrivePaths) this.config.drivePaths = envDrivePaths.split(',').map(s => s.trim()).filter(Boolean);
|
|
|
|
const envErasureDataShards = Deno.env.get('OBJST_ERASURE_DATA_SHARDS');
|
|
if (envErasureDataShards) this.config.erasureDataShards = parseInt(envErasureDataShards, 10);
|
|
|
|
const envErasureParityShards = Deno.env.get('OBJST_ERASURE_PARITY_SHARDS');
|
|
if (envErasureParityShards) this.config.erasureParityShards = parseInt(envErasureParityShards, 10);
|
|
|
|
const envErasureChunkSize = Deno.env.get('OBJST_ERASURE_CHUNK_SIZE');
|
|
if (envErasureChunkSize) this.config.erasureChunkSizeBytes = parseInt(envErasureChunkSize, 10);
|
|
|
|
const envHeartbeatInterval = Deno.env.get('OBJST_HEARTBEAT_INTERVAL_MS');
|
|
if (envHeartbeatInterval) this.config.clusterHeartbeatIntervalMs = parseInt(envHeartbeatInterval, 10);
|
|
|
|
const envHeartbeatTimeout = Deno.env.get('OBJST_HEARTBEAT_TIMEOUT_MS');
|
|
if (envHeartbeatTimeout) this.config.clusterHeartbeatTimeoutMs = parseInt(envHeartbeatTimeout, 10);
|
|
|
|
this.opsServer = new OpsServer(this);
|
|
this.policyManager = new PolicyManager(this);
|
|
}
|
|
|
|
public async start(): Promise<void> {
|
|
console.log(`Starting ObjectStorage...`);
|
|
console.log(` Storage port: ${this.config.objstPort}`);
|
|
console.log(` UI port: ${this.config.uiPort}`);
|
|
console.log(` Storage: ${this.config.storageDirectory}`);
|
|
console.log(` Region: ${this.config.region}`);
|
|
console.log(` Cluster: ${this.config.clusterEnabled ? 'enabled' : 'disabled'}`);
|
|
if (this.config.clusterEnabled) {
|
|
console.log(` Node ID: ${this.config.clusterNodeId || '(auto-generated)'}`);
|
|
console.log(` QUIC Port: ${this.config.clusterQuicPort}`);
|
|
console.log(` Seed Nodes: ${this.config.clusterSeedNodes.join(', ') || '(none)'}`);
|
|
console.log(` Drives: ${this.config.drivePaths.length > 0 ? this.config.drivePaths.join(', ') : this.config.storageDirectory}`);
|
|
console.log(` Erasure: ${this.config.erasureDataShards}+${this.config.erasureParityShards}`);
|
|
}
|
|
|
|
// Build smartstorage config
|
|
const smartstorageConfig: any = {
|
|
server: {
|
|
port: this.config.objstPort,
|
|
address: '0.0.0.0',
|
|
region: this.config.region,
|
|
},
|
|
storage: {
|
|
directory: this.config.storageDirectory,
|
|
},
|
|
auth: {
|
|
enabled: true,
|
|
credentials: this.config.accessCredentials,
|
|
},
|
|
};
|
|
|
|
if (this.config.clusterEnabled) {
|
|
smartstorageConfig.cluster = {
|
|
enabled: true,
|
|
nodeId: this.config.clusterNodeId || crypto.randomUUID().slice(0, 8),
|
|
quicPort: this.config.clusterQuicPort,
|
|
seedNodes: this.config.clusterSeedNodes,
|
|
erasure: {
|
|
dataShards: this.config.erasureDataShards,
|
|
parityShards: this.config.erasureParityShards,
|
|
chunkSizeBytes: this.config.erasureChunkSizeBytes,
|
|
},
|
|
drives: {
|
|
paths: this.config.drivePaths.length > 0
|
|
? this.config.drivePaths
|
|
: [this.config.storageDirectory],
|
|
},
|
|
heartbeatIntervalMs: this.config.clusterHeartbeatIntervalMs,
|
|
heartbeatTimeoutMs: this.config.clusterHeartbeatTimeoutMs,
|
|
};
|
|
}
|
|
|
|
// Start smartstorage
|
|
this.smartstorageInstance = await plugins.smartstorage.SmartStorage.createAndStart(smartstorageConfig);
|
|
|
|
this.startedAt = Date.now();
|
|
console.log(`Storage server started on port ${this.config.objstPort}`);
|
|
|
|
// Create S3 client for management operations
|
|
const descriptor = await this.smartstorageInstance.getStorageDescriptor();
|
|
this.s3Client = new plugins.S3Client({
|
|
endpoint: `http://${descriptor.endpoint}:${descriptor.port}`,
|
|
region: this.config.region,
|
|
credentials: {
|
|
accessKeyId: descriptor.accessKey,
|
|
secretAccessKey: descriptor.accessSecret,
|
|
},
|
|
forcePathStyle: true,
|
|
});
|
|
|
|
// Load named policies
|
|
await this.policyManager.load();
|
|
|
|
// Start UI server
|
|
await this.opsServer.start(this.config.uiPort);
|
|
console.log(`Management UI started on port ${this.config.uiPort}`);
|
|
}
|
|
|
|
public async stop(): Promise<void> {
|
|
console.log('Stopping ObjectStorage...');
|
|
await this.opsServer.stop();
|
|
await this.smartstorageInstance.stop();
|
|
console.log('ObjectStorage stopped.');
|
|
}
|
|
|
|
// ── Management methods ──
|
|
|
|
public async listBuckets(): Promise<interfaces.data.IBucketInfo[]> {
|
|
const response = await this.s3Client.send(new plugins.ListBucketsCommand({}));
|
|
const buckets: interfaces.data.IBucketInfo[] = [];
|
|
|
|
for (const bucket of response.Buckets || []) {
|
|
const name = bucket.Name || '';
|
|
const creationDate = bucket.CreationDate?.getTime() || 0;
|
|
|
|
// Get object count and size for each bucket
|
|
let objectCount = 0;
|
|
let totalSizeBytes = 0;
|
|
let continuationToken: string | undefined;
|
|
|
|
do {
|
|
const listResp = await this.s3Client.send(
|
|
new plugins.ListObjectsV2Command({
|
|
Bucket: name,
|
|
ContinuationToken: continuationToken,
|
|
}),
|
|
);
|
|
|
|
for (const obj of listResp.Contents || []) {
|
|
objectCount++;
|
|
totalSizeBytes += obj.Size || 0;
|
|
}
|
|
|
|
continuationToken = listResp.IsTruncated ? listResp.NextContinuationToken : undefined;
|
|
} while (continuationToken);
|
|
|
|
buckets.push({ name, creationDate, objectCount, totalSizeBytes });
|
|
}
|
|
|
|
return buckets;
|
|
}
|
|
|
|
public async createBucket(bucketName: string): Promise<void> {
|
|
await this.smartstorageInstance.createBucket(bucketName);
|
|
}
|
|
|
|
public async deleteBucket(bucketName: string): Promise<void> {
|
|
await this.s3Client.send(new plugins.DeleteBucketCommand({ Bucket: bucketName }));
|
|
}
|
|
|
|
public async listObjects(
|
|
bucketName: string,
|
|
prefix?: string,
|
|
delimiter?: string,
|
|
maxKeys?: number,
|
|
): Promise<interfaces.data.IObjectListResult> {
|
|
const response = await this.s3Client.send(
|
|
new plugins.ListObjectsV2Command({
|
|
Bucket: bucketName,
|
|
Prefix: prefix || '',
|
|
Delimiter: delimiter || '/',
|
|
MaxKeys: maxKeys || 1000,
|
|
}),
|
|
);
|
|
|
|
const objects: interfaces.data.IObjectInfo[] = (response.Contents || []).map((obj) => ({
|
|
key: obj.Key || '',
|
|
size: obj.Size || 0,
|
|
lastModified: obj.LastModified?.getTime() || 0,
|
|
etag: obj.ETag || '',
|
|
contentType: '',
|
|
}));
|
|
|
|
const commonPrefixes = (response.CommonPrefixes || []).map((p) => p.Prefix || '');
|
|
|
|
return {
|
|
objects,
|
|
commonPrefixes,
|
|
isTruncated: response.IsTruncated || false,
|
|
currentPrefix: prefix || '',
|
|
};
|
|
}
|
|
|
|
public async deleteObject(bucketName: string, key: string): Promise<void> {
|
|
await this.s3Client.send(
|
|
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: key }),
|
|
);
|
|
}
|
|
|
|
public async getObject(bucketName: string, key: string): Promise<{
|
|
content: string;
|
|
contentType: string;
|
|
size: number;
|
|
lastModified: string;
|
|
}> {
|
|
const response = await this.s3Client.send(
|
|
new plugins.GetObjectCommand({ Bucket: bucketName, Key: key }),
|
|
);
|
|
const bodyBytes = await response.Body!.transformToByteArray();
|
|
// Convert to base64
|
|
let binary = '';
|
|
for (const byte of bodyBytes) {
|
|
binary += String.fromCharCode(byte);
|
|
}
|
|
const base64Content = btoa(binary);
|
|
|
|
return {
|
|
content: base64Content,
|
|
contentType: response.ContentType || 'application/octet-stream',
|
|
size: response.ContentLength || 0,
|
|
lastModified: response.LastModified?.toISOString() || new Date().toISOString(),
|
|
};
|
|
}
|
|
|
|
public async putObject(
|
|
bucketName: string,
|
|
key: string,
|
|
base64Content: string,
|
|
contentType: string,
|
|
): Promise<void> {
|
|
const binaryString = atob(base64Content);
|
|
const bytes = new Uint8Array(binaryString.length);
|
|
for (let i = 0; i < binaryString.length; i++) {
|
|
bytes[i] = binaryString.charCodeAt(i);
|
|
}
|
|
await this.s3Client.send(
|
|
new plugins.PutObjectCommand({
|
|
Bucket: bucketName,
|
|
Key: key,
|
|
Body: bytes,
|
|
ContentType: contentType,
|
|
}),
|
|
);
|
|
}
|
|
|
|
public async deletePrefix(bucketName: string, prefix: string): Promise<void> {
|
|
let continuationToken: string | undefined;
|
|
do {
|
|
const listResp = await this.s3Client.send(
|
|
new plugins.ListObjectsV2Command({
|
|
Bucket: bucketName,
|
|
Prefix: prefix,
|
|
ContinuationToken: continuationToken,
|
|
}),
|
|
);
|
|
for (const obj of listResp.Contents || []) {
|
|
if (obj.Key) {
|
|
await this.s3Client.send(
|
|
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: obj.Key }),
|
|
);
|
|
}
|
|
}
|
|
continuationToken = listResp.IsTruncated ? listResp.NextContinuationToken : undefined;
|
|
} while (continuationToken);
|
|
}
|
|
|
|
public async getObjectUrl(bucketName: string, key: string): Promise<string> {
|
|
const descriptor = await this.smartstorageInstance.getStorageDescriptor();
|
|
return `http://${descriptor.endpoint}:${descriptor.port}/${bucketName}/${key}`;
|
|
}
|
|
|
|
public async moveObject(
|
|
bucketName: string,
|
|
sourceKey: string,
|
|
destKey: string,
|
|
): Promise<{ success: boolean; error?: string }> {
|
|
try {
|
|
await this.s3Client.send(
|
|
new plugins.CopyObjectCommand({
|
|
Bucket: bucketName,
|
|
CopySource: `${bucketName}/${sourceKey}`,
|
|
Key: destKey,
|
|
}),
|
|
);
|
|
await this.s3Client.send(
|
|
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: sourceKey }),
|
|
);
|
|
return { success: true };
|
|
} catch (err: any) {
|
|
return { success: false, error: err.message };
|
|
}
|
|
}
|
|
|
|
public async movePrefix(
|
|
bucketName: string,
|
|
sourcePrefix: string,
|
|
destPrefix: string,
|
|
): Promise<{ success: boolean; movedCount?: number; error?: string }> {
|
|
try {
|
|
let movedCount = 0;
|
|
let continuationToken: string | undefined;
|
|
do {
|
|
const listResp = await this.s3Client.send(
|
|
new plugins.ListObjectsV2Command({
|
|
Bucket: bucketName,
|
|
Prefix: sourcePrefix,
|
|
ContinuationToken: continuationToken,
|
|
}),
|
|
);
|
|
for (const obj of listResp.Contents || []) {
|
|
if (!obj.Key) continue;
|
|
const newKey = destPrefix + obj.Key.slice(sourcePrefix.length);
|
|
await this.s3Client.send(
|
|
new plugins.CopyObjectCommand({
|
|
Bucket: bucketName,
|
|
CopySource: `${bucketName}/${obj.Key}`,
|
|
Key: newKey,
|
|
}),
|
|
);
|
|
await this.s3Client.send(
|
|
new plugins.DeleteObjectCommand({ Bucket: bucketName, Key: obj.Key }),
|
|
);
|
|
movedCount++;
|
|
}
|
|
continuationToken = listResp.IsTruncated ? listResp.NextContinuationToken : undefined;
|
|
} while (continuationToken);
|
|
return { success: true, movedCount };
|
|
} catch (err: any) {
|
|
return { success: false, error: err.message };
|
|
}
|
|
}
|
|
|
|
public async getServerStats(): Promise<interfaces.data.IServerStatus> {
|
|
const buckets = await this.listBuckets();
|
|
let totalObjectCount = 0;
|
|
let totalStorageBytes = 0;
|
|
for (const b of buckets) {
|
|
totalObjectCount += b.objectCount;
|
|
totalStorageBytes += b.totalSizeBytes;
|
|
}
|
|
|
|
return {
|
|
running: true,
|
|
objstPort: this.config.objstPort,
|
|
uiPort: this.config.uiPort,
|
|
uptime: Math.floor((Date.now() - this.startedAt) / 1000),
|
|
startedAt: this.startedAt,
|
|
bucketCount: buckets.length,
|
|
totalObjectCount,
|
|
totalStorageBytes,
|
|
storageDirectory: this.config.storageDirectory,
|
|
region: this.config.region,
|
|
authEnabled: true,
|
|
};
|
|
}
|
|
|
|
public async getBucketPolicy(bucketName: string): Promise<string | null> {
|
|
try {
|
|
const response = await this.s3Client.send(
|
|
new plugins.GetBucketPolicyCommand({ Bucket: bucketName }),
|
|
);
|
|
return response.Policy || null;
|
|
} catch (err: any) {
|
|
if (err.name === 'NoSuchBucketPolicy' || err.Code === 'NoSuchBucketPolicy') {
|
|
return null;
|
|
}
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
public async putBucketPolicy(bucketName: string, policyJson: string): Promise<void> {
|
|
await this.s3Client.send(
|
|
new plugins.PutBucketPolicyCommand({ Bucket: bucketName, Policy: policyJson }),
|
|
);
|
|
}
|
|
|
|
public async deleteBucketPolicy(bucketName: string): Promise<void> {
|
|
await this.s3Client.send(
|
|
new plugins.DeleteBucketPolicyCommand({ Bucket: bucketName }),
|
|
);
|
|
}
|
|
|
|
public async getConnectionInfo(): Promise<interfaces.data.IConnectionInfo> {
|
|
const descriptor = await this.smartstorageInstance.getStorageDescriptor();
|
|
return {
|
|
endpoint: descriptor.endpoint,
|
|
port: Number(descriptor.port ?? this.config.objstPort),
|
|
useSsl: descriptor.useSsl ?? false,
|
|
accessKey: descriptor.accessKey,
|
|
region: this.config.region,
|
|
};
|
|
}
|
|
}
|