Compare commits

..

4 Commits

Author SHA1 Message Date
jkunz 8f1d025330 v6.5.1
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-05-02 12:09:13 +00:00
jkunz b075de1ecd fix(bucket-tenants): make tenant lifecycle and bucket import validation safer 2026-05-02 12:09:13 +00:00
jkunz 7020810b5e v6.5.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-05-02 11:14:15 +00:00
jkunz 7f2546e041 feat(bucket-tenants): add persisted bucket-scoped tenant credentials with bucket export and import APIs 2026-05-02 11:14:15 +00:00
28 changed files with 2031 additions and 221 deletions
+24
View File
@@ -1,5 +1,29 @@
# Changelog # Changelog
## 2026-05-02 - 6.5.1 - fix(bucket-tenants)
make tenant lifecycle and bucket import validation safer
- validate exported object size and MD5 before creating and importing target buckets
- persist bucket-tenant credential changes with rollback when bucket creation or deletion fails
- require auth-enabled access and matching tenant credentials for tenant list, descriptor, and recursive delete flows
## Next - fix(bucket-tenants)
make tenant lifecycle and bucket import validation safer
- Validate bucket export object size and MD5 before creating/importing into target buckets.
- Persist tenant credentials through snapshot-aware helpers and roll back credential changes if bucket create/delete fails.
- Require matching tenant credentials before `deleteBucketTenant({ bucketName })` can recursively delete a bucket.
- Enforce `auth.enabled` consistently on tenant list/descriptor APIs and document metadata-only credential listings.
## 2026-05-02 - 6.5.0 - feat(bucket-tenants)
add persisted bucket-scoped tenant credentials with bucket export and import APIs
- Adds bucket tenant management APIs for creating, rotating, listing, retrieving, and deleting scoped per-bucket credentials.
- Persists runtime credentials under the storage directory so tenant and replaced credentials survive restarts.
- Enforces tenant bucket isolation in auth, including blocking cross-bucket access and copy operations.
- Adds bucket export/import support using the smartstorage.bucket.v1 JSON format.
- Introduces health and metrics APIs plus test coverage for tenant lifecycle, persistence, policy retention, and AWS SDK compatibility.
## 2026-04-30 - 6.4.1 - fix(build) ## 2026-04-30 - 6.4.1 - fix(build)
tighten TypeScript compiler settings and refresh package metadata tighten TypeScript compiler settings and refresh package metadata
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartstorage", "name": "@push.rocks/smartstorage",
"version": "6.4.1", "version": "6.5.1",
"private": false, "private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.", "description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",
+15 -3
View File
@@ -1,6 +1,6 @@
# Project Hints for smartstorage # Project Hints for smartstorage
## Current State (v6.4.0) ## Current State (v6.5.0)
- **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge - **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge
- High-performance: streaming I/O, zero-copy, backpressure, range seek - High-performance: streaming I/O, zero-copy, backpressure, range seek
@@ -14,6 +14,9 @@
- Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans) - Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans)
- Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state) - Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state)
- Runtime credential listing and atomic replacement via the Rust bridge - Runtime credential listing and atomic replacement via the Rust bridge
- Runtime credentials persist under `{storage}/.smartstorage/credentials.json`
- Bucket tenant APIs provision scoped per-bucket credentials and enforce the scope before bucket-policy/default-auth authorization
- Per-bucket export/import uses `smartstorage.bucket.v1` JSON with object payloads encoded per object and size/MD5 validation on import
- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/` - Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/`
- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics` - S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics`
- Runtime credential listing returns access-key metadata only; secrets are write-only - Runtime credential listing returns access-key metadata only; secrets are write-only
@@ -44,9 +47,16 @@
| `start` | `{ config: ISmartStorageConfig }` | Init storage + HTTP server | | `start` | `{ config: ISmartStorageConfig }` | Init storage + HTTP server |
| `stop` | `{}` | Graceful shutdown | | `stop` | `{}` | Graceful shutdown |
| `createBucket` | `{ name: string }` | Create bucket directory | | `createBucket` | `{ name: string }` | Create bucket directory |
| `createBucketTenant` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Create bucket and scoped persisted credential |
| `deleteBucketTenant` | `{ bucketName, accessKeyId? }` | Revoke scoped credential or delete a verified tenant bucket recursively |
| `rotateBucketTenantCredentials` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Replace scoped credential for one bucket |
| `listBucketTenants` | `{}` | Return scoped credential metadata |
| `getBucketTenantCredential` | `{ bucketName }` | Return one scoped credential including secret for descriptor generation |
| `exportBucket` | `{ bucketName }` | Export one bucket's objects and metadata |
| `importBucket` | `{ bucketName, source }` | Import a `smartstorage.bucket.v1` bucket export |
| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots | | `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots |
| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries | | `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries |
| `listCredentials` | `{}` | Return the active runtime auth credential set | | `listCredentials` | `{}` | Return metadata for the active runtime auth credential set |
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set | | `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode | | `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
@@ -65,6 +75,7 @@
- MD5: `{root}/{bucket}/{key}._storage_object.md5` - MD5: `{root}/{bucket}/{key}._storage_object.md5`
- Multipart: `{root}/.multipart/{upload_id}/part-{N}` - Multipart: `{root}/.multipart/{upload_id}/part-{N}`
- Policies: `{root}/.policies/{bucket}.policy.json` - Policies: `{root}/.policies/{bucket}.policy.json`
- Runtime credentials: `{root}/.smartstorage/credentials.json`
## Build ## Build
@@ -81,7 +92,8 @@
## Testing ## Testing
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337) - `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health/metrics coverage (20 tests, auth disabled, port 3337)
- `test/test.bucket-tenants.node.ts` - bucket tenant provisioning, per-bucket isolation, restart persistence, export/import integrity, policy persistence, rotation, revoke/delete safeguards, AWS SDK v3 compatibility (15 tests, port 3361)
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349) - `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353) - `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353)
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348) - `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
+103 -5
View File
@@ -34,6 +34,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
- 🧹 **Clean slate mode** — wipe storage on startup for test isolation - 🧹 **Clean slate mode** — wipe storage on startup for test isolation
- 📊 **Runtime storage stats** — cheap bucket summaries and global counts without S3 list scans - 📊 **Runtime storage stats** — cheap bucket summaries and global counts without S3 list scans
- 🔑 **Runtime credential rotation** — list and replace active auth credentials without mutating internals - 🔑 **Runtime credential rotation** — list and replace active auth credentials without mutating internals
- 🧩 **Bucket tenants** — provision one scoped S3 credential per bucket with restart persistence
-**Test-first design** — start/stop in milliseconds, no port conflicts -**Test-first design** — start/stop in milliseconds, no port conflicts
### Clustering Features ### Clustering Features
@@ -225,15 +226,76 @@ await storage.replaceCredentials([
interface IStorageCredential { interface IStorageCredential {
accessKeyId: string; accessKeyId: string;
secretAccessKey: string; secretAccessKey: string;
bucketName?: string;
region?: string;
} }
``` ```
- `listCredentials()` returns the Rust core's current runtime credential set. - `listCredentials()` returns the Rust core's current runtime credential set.
- `replaceCredentials()` swaps the full set atomically. On success, new requests use the new set immediately and the old credentials stop authenticating immediately. - `replaceCredentials()` swaps the full set atomically and persists it under the storage root. On success, new requests use the new set immediately and the old credentials stop authenticating immediately.
- Requests that were already authenticated before the replacement keep running; auth is evaluated when each request starts. - Requests that were already authenticated before the replacement keep running; auth is evaluated when each request starts.
- No restart is required. - No restart is required, and runtime-created credentials survive restart unless `storage.cleanSlate` removes the storage directory.
- Replacement input must contain at least one credential, each `accessKeyId` and `secretAccessKey` must be non-empty, and `accessKeyId` values must be unique. - Replacement input must contain at least one credential, each `accessKeyId` and `secretAccessKey` must be non-empty, and `accessKeyId` values must be unique.
## Bucket Tenants
Bucket tenants are designed for platform services that need one bucket and one scoped S3 credential per app. Tenant credentials are enforced by the auth layer before the normal bucket-policy/default-auth pipeline, so a scoped credential cannot list all buckets or access another bucket even when it has a valid SigV4 signature.
```typescript
const tenant = await storage.createBucketTenant({
bucketName: 'workapp-123',
});
// Directly usable by AWS SDK v3 or env injection
const client = new S3Client({
endpoint: `http://${tenant.endpoint}:${tenant.port}`,
region: tenant.region,
credentials: {
accessKeyId: tenant.accessKeyId,
secretAccessKey: tenant.secretAccessKey,
},
forcePathStyle: true,
});
console.log(tenant.env.S3_BUCKET);
console.log(tenant.env.AWS_ACCESS_KEY_ID);
```
```typescript
await storage.rotateBucketTenantCredentials({ bucketName: 'workapp-123' });
await storage.deleteBucketTenant({ bucketName: 'workapp-123', accessKeyId: tenant.accessKeyId });
const descriptor = await storage.getBucketTenantDescriptor({ bucketName: 'workapp-123' });
const tenants = await storage.listBucketTenants();
```
- `createBucketTenant()` creates the bucket if needed and stores a scoped credential for that bucket.
- `rotateBucketTenantCredentials()` replaces the active scoped credential for the bucket and persists the new credential.
- `deleteBucketTenant({ bucketName, accessKeyId })` revokes one scoped credential and keeps the bucket.
- `deleteBucketTenant({ bucketName })` revokes scoped credentials for an existing tenant bucket and deletes that bucket's contents recursively.
- Tenant credentials can list, read, write, and delete objects in their assigned bucket, but cannot list all buckets, access other buckets, copy from other buckets, delete buckets, or mutate bucket policies.
- Bucket tenant APIs require `auth.enabled: true`.
## Bucket Backup/Restore
```typescript
const appBackup = await storage.exportBucket({ bucketName: 'workapp-123' });
await storage.importBucket({ bucketName: 'workapp-123-restore', source: appBackup });
```
- `exportBucket()` returns a self-contained `smartstorage.bucket.v1` JSON export with only the selected bucket's objects and object metadata.
- `importBucket()` validates object payload size and MD5 before creating the target bucket if needed, then restores the exported objects into that bucket.
- Exports do not include credentials, policies, or unrelated tenant data.
## Health and Metrics APIs
```typescript
const health = await storage.getHealth();
const metrics = await storage.getMetrics();
```
- `getHealth()` reports running state, storage directory, auth enabled state, credential counts, bucket count, object count, total bytes, and cluster health.
- `getMetrics()` returns numeric counters and a Prometheus text snippet for bucket, object, byte, tenant credential, and cluster-enabled metrics.
## Runtime Stats ## Runtime Stats
```typescript ```typescript
@@ -577,6 +639,34 @@ Gracefully stop the server and kill the Rust process.
Create a storage bucket. Create a storage bucket.
#### `createBucketTenant(options): Promise<IBucketTenantDescriptor>`
Create a bucket tenant with a generated or supplied scoped credential. Options: `{ bucketName, accessKeyId?, secretAccessKey?, region? }`.
#### `deleteBucketTenant(options): Promise<void>`
Revoke a tenant credential or delete a bucket that still has tenant credentials. Options: `{ bucketName, accessKeyId? }`.
#### `rotateBucketTenantCredentials(options): Promise<IBucketTenantDescriptor>`
Replace the scoped credential for a bucket tenant. Options: `{ bucketName, accessKeyId?, secretAccessKey?, region? }`.
#### `listBucketTenants(): Promise<IBucketTenantMetadata[]>`
List scoped tenant credential metadata without returning secrets.
#### `getBucketTenantDescriptor(options): Promise<IBucketTenantDescriptor>`
Return endpoint, port, region, bucket, access key, secret key, SSL flag, legacy descriptor fields, and S3/AWS env values for the bucket tenant.
#### `exportBucket(options): Promise<IBucketExport>`
Export one bucket's objects and metadata into a `smartstorage.bucket.v1` JSON object.
#### `importBucket(options): Promise<void>`
Import a `smartstorage.bucket.v1` JSON object into the target bucket after validating object size and MD5. Options: `{ bucketName, source }`.
#### `getStorageDescriptor(options?): Promise<IS3Descriptor>` #### `getStorageDescriptor(options?): Promise<IS3Descriptor>`
Get connection details for S3-compatible clients. Returns: Get connection details for S3-compatible clients. Returns:
@@ -597,9 +687,9 @@ Read cached logical bucket and object totals from the Rust runtime without issui
Get per-bucket logical object counts and total payload sizes. Get per-bucket logical object counts and total payload sizes.
#### `listCredentials(): Promise<IStorageCredential[]>` #### `listCredentials(): Promise<IStorageCredentialMetadata[]>`
Return the currently active runtime credential set. Return metadata for the currently active runtime credential set without `secretAccessKey` values.
#### `replaceCredentials(credentials: IStorageCredential[]): Promise<void>` #### `replaceCredentials(credentials: IStorageCredential[]): Promise<void>`
@@ -609,6 +699,14 @@ Atomically replace the active runtime credential set without restarting the serv
Read the Rust core's current cluster, drive, quorum, and repair health snapshot. Standalone mode returns `{ enabled: false }`. Read the Rust core's current cluster, drive, quorum, and repair health snapshot. Standalone mode returns `{ enabled: false }`.
#### `getHealth(): Promise<ISmartStorageHealth>`
Return running state, storage directory, auth state, credential counts, bucket count, object count, total bytes, and cluster health.
#### `getMetrics(): Promise<ISmartStorageMetrics>`
Return numeric metrics plus a Prometheus text snippet for operational scraping.
## Architecture ## Architecture
smartstorage uses a **hybrid Rust + TypeScript** architecture: smartstorage uses a **hybrid Rust + TypeScript** architecture:
@@ -642,7 +740,7 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
**Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests. **Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
**IPC Protocol:** TypeScript communicates with the `ruststorage` binary over newline-delimited JSON via stdin/stdout. The current management commands are `start`, `stop`, `createBucket`, `getStorageStats`, `listBucketSummaries`, `listCredentials`, `replaceCredentials`, and `getClusterHealth`. **IPC Protocol:** TypeScript communicates with the `ruststorage` binary over newline-delimited JSON via stdin/stdout. The current management commands are `start`, `stop`, `createBucket`, `createBucketTenant`, `deleteBucketTenant`, `rotateBucketTenantCredentials`, `listBucketTenants`, `getBucketTenantCredential`, `exportBucket`, `importBucket`, `getStorageStats`, `listBucketSummaries`, `listCredentials`, `replaceCredentials`, and `getClusterHealth`.
### S3-Compatible Operations ### S3-Compatible Operations
+17
View File
@@ -57,6 +57,7 @@ pub struct RequestContext {
pub action: StorageAction, pub action: StorageAction,
pub bucket: Option<String>, pub bucket: Option<String>,
pub key: Option<String>, pub key: Option<String>,
pub source_bucket: Option<String>,
} }
impl RequestContext { impl RequestContext {
@@ -90,6 +91,7 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
action: StorageAction::ListAllMyBuckets, action: StorageAction::ListAllMyBuckets,
bucket: None, bucket: None,
key: None, key: None,
source_bucket: None,
} }
} }
1 => { 1 => {
@@ -113,6 +115,7 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
action, action,
bucket: Some(bucket), bucket: Some(bucket),
key: None, key: None,
source_bucket: None,
} }
} }
2 => { 2 => {
@@ -123,6 +126,18 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
let has_part_number = query.contains_key("partNumber"); let has_part_number = query.contains_key("partNumber");
let has_upload_id = query.contains_key("uploadId"); let has_upload_id = query.contains_key("uploadId");
let has_uploads = query.contains_key("uploads"); let has_uploads = query.contains_key("uploads");
let source_bucket = if has_copy_source {
req.headers()
.get("x-amz-copy-source")
.and_then(|value| value.to_str().ok())
.map(|source| {
let source = source.trim_start_matches('/');
let first_slash = source.find('/').unwrap_or(source.len());
percent_decode(&source[..first_slash])
})
} else {
None
};
let action = match &method { let action = match &method {
&Method::PUT if has_part_number && has_upload_id => StorageAction::UploadPart, &Method::PUT if has_part_number && has_upload_id => StorageAction::UploadPart,
@@ -141,12 +156,14 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
action, action,
bucket: Some(bucket), bucket: Some(bucket),
key: Some(key), key: Some(key),
source_bucket,
} }
} }
_ => RequestContext { _ => RequestContext {
action: StorageAction::ListAllMyBuckets, action: StorageAction::ListAllMyBuckets,
bucket: None, bucket: None,
key: None, key: None,
source_bucket: None,
}, },
} }
} }
+242 -21
View File
@@ -3,6 +3,8 @@ use hyper::body::Incoming;
use hyper::Request; use hyper::Request;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use tokio::fs;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::config::{AuthConfig, Credential}; use crate::config::{AuthConfig, Credential};
@@ -14,6 +16,7 @@ type HmacSha256 = Hmac<Sha256>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AuthenticatedIdentity { pub struct AuthenticatedIdentity {
pub access_key_id: String, pub access_key_id: String,
pub bucket_name: Option<String>,
} }
/// Parsed components of an AWS4-HMAC-SHA256 Authorization header. /// Parsed components of an AWS4-HMAC-SHA256 Authorization header.
@@ -56,11 +59,7 @@ pub fn verify_request(
.headers() .headers()
.get("x-amz-date") .get("x-amz-date")
.and_then(|v| v.to_str().ok()) .and_then(|v| v.to_str().ok())
.or_else(|| { .or_else(|| req.headers().get("date").and_then(|v| v.to_str().ok()))
req.headers()
.get("date")
.and_then(|v| v.to_str().ok())
})
.ok_or_else(|| StorageError::missing_security_header("Missing x-amz-date header"))?; .ok_or_else(|| StorageError::missing_security_header("Missing x-amz-date header"))?;
// Enforce 15-min clock skew // Enforce 15-min clock skew
@@ -77,10 +76,7 @@ pub fn verify_request(
let canonical_request = build_canonical_request(req, &parsed.signed_headers, content_sha256); let canonical_request = build_canonical_request(req, &parsed.signed_headers, content_sha256);
// Build string to sign // Build string to sign
let scope = format!( let scope = format!("{}/{}/s3/aws4_request", parsed.date_stamp, parsed.region);
"{}/{}/s3/aws4_request",
parsed.date_stamp, parsed.region
);
let canonical_hash = hex::encode(Sha256::digest(canonical_request.as_bytes())); let canonical_hash = hex::encode(Sha256::digest(canonical_request.as_bytes()));
let string_to_sign = format!( let string_to_sign = format!(
"AWS4-HMAC-SHA256\n{}\n{}\n{}", "AWS4-HMAC-SHA256\n{}\n{}\n{}",
@@ -105,6 +101,7 @@ pub fn verify_request(
Ok(AuthenticatedIdentity { Ok(AuthenticatedIdentity {
access_key_id: parsed.access_key_id, access_key_id: parsed.access_key_id,
bucket_name: credential.bucket_name.clone(),
}) })
} }
@@ -131,10 +128,9 @@ fn parse_auth_header(header: &str) -> Result<SigV4Header, StorageError> {
} }
} }
let credential_str = credential_str let credential_str = credential_str.ok_or_else(StorageError::authorization_header_malformed)?;
.ok_or_else(StorageError::authorization_header_malformed)?; let signed_headers_str =
let signed_headers_str = signed_headers_str signed_headers_str.ok_or_else(StorageError::authorization_header_malformed)?;
.ok_or_else(StorageError::authorization_header_malformed)?;
let signature = signature_str let signature = signature_str
.ok_or_else(StorageError::authorization_header_malformed)? .ok_or_else(StorageError::authorization_header_malformed)?
.to_string(); .to_string();
@@ -164,7 +160,10 @@ fn parse_auth_header(header: &str) -> Result<SigV4Header, StorageError> {
} }
/// Find a credential by access key ID. /// Find a credential by access key ID.
fn find_credential<'a>(access_key_id: &str, credentials: &'a [Credential]) -> Option<&'a Credential> { fn find_credential<'a>(
access_key_id: &str,
credentials: &'a [Credential],
) -> Option<&'a Credential> {
credentials credentials
.iter() .iter()
.find(|c| c.access_key_id == access_key_id) .find(|c| c.access_key_id == access_key_id)
@@ -174,20 +173,60 @@ fn find_credential<'a>(access_key_id: &str, credentials: &'a [Credential]) -> Op
pub struct RuntimeCredentialStore { pub struct RuntimeCredentialStore {
enabled: bool, enabled: bool,
credentials: RwLock<Vec<Credential>>, credentials: RwLock<Vec<Credential>>,
persistence_path: Option<PathBuf>,
} }
#[derive(Debug, Clone, serde::Serialize)] #[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct CredentialMetadata { pub struct CredentialMetadata {
pub access_key_id: String, pub access_key_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub bucket_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketTenantMetadata {
pub bucket_name: String,
pub access_key_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
}
#[derive(Debug, Clone)]
pub struct CredentialRemoval {
pub previous_credentials: Vec<Credential>,
}
#[derive(Debug, Clone)]
pub struct CredentialReplacement {
pub credential: Credential,
pub previous_credentials: Vec<Credential>,
} }
impl RuntimeCredentialStore { impl RuntimeCredentialStore {
pub fn new(config: &AuthConfig) -> Self { pub async fn new(
Self { config: &AuthConfig,
persistence_path: Option<PathBuf>,
) -> anyhow::Result<Self> {
let credentials = match persistence_path.as_ref() {
Some(path) if path.exists() => {
let content = fs::read_to_string(path).await?;
let credentials: Vec<Credential> = serde_json::from_str(&content)?;
validate_credentials(&credentials)
.map_err(|error| anyhow::anyhow!(error.message))?;
credentials
}
_ => config.credentials.clone(),
};
Ok(Self {
enabled: config.enabled, enabled: config.enabled,
credentials: RwLock::new(config.credentials.clone()), credentials: RwLock::new(credentials),
} persistence_path,
})
} }
pub fn enabled(&self) -> bool { pub fn enabled(&self) -> bool {
@@ -201,6 +240,8 @@ impl RuntimeCredentialStore {
.iter() .iter()
.map(|credential| CredentialMetadata { .map(|credential| CredentialMetadata {
access_key_id: credential.access_key_id.clone(), access_key_id: credential.access_key_id.clone(),
bucket_name: credential.bucket_name.clone(),
region: credential.region.clone(),
}) })
.collect() .collect()
} }
@@ -209,11 +250,190 @@ impl RuntimeCredentialStore {
self.credentials.read().await.clone() self.credentials.read().await.clone()
} }
pub async fn replace_credentials(&self, credentials: Vec<Credential>) -> Result<(), StorageError> { pub async fn replace_credentials(
&self,
credentials: Vec<Credential>,
) -> Result<(), StorageError> {
validate_credentials(&credentials)?; validate_credentials(&credentials)?;
*self.credentials.write().await = credentials; let mut credentials_guard = self.credentials.write().await;
self.persist_credentials(&credentials).await?;
*credentials_guard = credentials;
Ok(()) Ok(())
} }
pub async fn replace_bucket_tenant_credential(
&self,
bucket_name: &str,
credential: Credential,
) -> Result<Credential, StorageError> {
let replacement = self
.replace_bucket_tenant_credential_with_snapshot(bucket_name, credential)
.await?;
Ok(replacement.credential)
}
pub async fn replace_bucket_tenant_credential_with_snapshot(
&self,
bucket_name: &str,
credential: Credential,
) -> Result<CredentialReplacement, StorageError> {
let mut credentials_guard = self.credentials.write().await;
let previous_credentials = credentials_guard.clone();
let (credential, updated_credentials) =
prepare_bucket_tenant_replacement(bucket_name, credential, &credentials_guard)?;
self.persist_credentials(&updated_credentials).await?;
*credentials_guard = updated_credentials;
Ok(CredentialReplacement {
credential,
previous_credentials,
})
}
pub async fn remove_bucket_tenant_credentials(
&self,
bucket_name: &str,
access_key_id: Option<&str>,
) -> Result<CredentialRemoval, StorageError> {
let mut credentials_guard = self.credentials.write().await;
let previous_credentials = credentials_guard.clone();
let (_removed, updated_credentials) =
prepare_bucket_tenant_removal(bucket_name, access_key_id, &credentials_guard)?;
self.persist_credentials(&updated_credentials).await?;
*credentials_guard = updated_credentials;
Ok(CredentialRemoval {
previous_credentials,
})
}
pub async fn list_bucket_tenants(&self) -> Vec<BucketTenantMetadata> {
let mut tenants: Vec<BucketTenantMetadata> = self
.credentials
.read()
.await
.iter()
.filter_map(|credential| {
credential
.bucket_name
.as_ref()
.map(|bucket_name| BucketTenantMetadata {
bucket_name: bucket_name.clone(),
access_key_id: credential.access_key_id.clone(),
region: credential.region.clone(),
})
})
.collect();
tenants.sort_by(|a, b| {
a.bucket_name
.cmp(&b.bucket_name)
.then_with(|| a.access_key_id.cmp(&b.access_key_id))
});
tenants
}
pub async fn get_bucket_tenant_credential(&self, bucket_name: &str) -> Option<Credential> {
self.credentials
.read()
.await
.iter()
.find(|credential| credential.bucket_name.as_deref() == Some(bucket_name))
.cloned()
}
async fn persist_credentials(&self, credentials: &[Credential]) -> Result<(), StorageError> {
let Some(path) = self.persistence_path.as_ref() else {
return Ok(());
};
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.await
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
}
let temp_path = path.with_extension("json.tmp");
let json = serde_json::to_string_pretty(credentials)
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
fs::write(&temp_path, json)
.await
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
fs::rename(&temp_path, path)
.await
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
Ok(())
}
}
fn validate_bucket_scope(bucket_name: &str) -> Result<(), StorageError> {
if bucket_name.trim().is_empty() {
return Err(StorageError::invalid_request(
"Bucket tenant bucketName must not be empty.",
));
}
Ok(())
}
fn prepare_bucket_tenant_replacement(
bucket_name: &str,
mut credential: Credential,
credentials: &[Credential],
) -> Result<(Credential, Vec<Credential>), StorageError> {
validate_bucket_scope(bucket_name)?;
credential.bucket_name = Some(bucket_name.to_string());
if credentials.iter().any(|existing| {
existing.access_key_id == credential.access_key_id
&& existing.bucket_name.as_deref() != Some(bucket_name)
}) {
return Err(StorageError::invalid_request(
"Credential accessKeyId is already assigned to another principal.",
));
}
let mut updated_credentials = credentials.to_vec();
updated_credentials.retain(|existing| existing.bucket_name.as_deref() != Some(bucket_name));
updated_credentials.push(credential.clone());
validate_credentials(&updated_credentials)?;
Ok((credential, updated_credentials))
}
fn prepare_bucket_tenant_removal(
bucket_name: &str,
access_key_id: Option<&str>,
credentials: &[Credential],
) -> Result<(usize, Vec<Credential>), StorageError> {
validate_bucket_scope(bucket_name)?;
let mut updated_credentials = credentials.to_vec();
let before = updated_credentials.len();
updated_credentials.retain(|credential| {
if credential.bucket_name.as_deref() != Some(bucket_name) {
return true;
}
if let Some(access_key_id) = access_key_id {
credential.access_key_id != access_key_id
} else {
false
}
});
let removed = before.saturating_sub(updated_credentials.len());
if removed == 0 {
return Err(StorageError::invalid_request(
"No matching bucket tenant credential exists.",
));
}
if updated_credentials.is_empty() {
return Err(StorageError::invalid_request(
"Cannot remove the last active credential.",
));
}
Ok((removed, updated_credentials))
} }
fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> { fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> {
@@ -253,7 +473,8 @@ fn check_clock_skew(amz_date: &str) -> Result<(), StorageError> {
let parsed = chrono::NaiveDateTime::parse_from_str(amz_date, "%Y%m%dT%H%M%SZ") let parsed = chrono::NaiveDateTime::parse_from_str(amz_date, "%Y%m%dT%H%M%SZ")
.map_err(|_| StorageError::authorization_header_malformed())?; .map_err(|_| StorageError::authorization_header_malformed())?;
let request_time = chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(parsed, chrono::Utc); let request_time =
chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(parsed, chrono::Utc);
let now = chrono::Utc::now(); let now = chrono::Utc::now();
let diff = (now - request_time).num_seconds().unsigned_abs(); let diff = (now - request_time).num_seconds().unsigned_abs();
+78 -8
View File
@@ -21,11 +21,10 @@ use super::quic_transport::QuicTransport;
use super::shard_store::{ShardId, ShardStore}; use super::shard_store::{ShardId, ShardStore};
use super::state::{ClusterState, NodeStatus}; use super::state::{ClusterState, NodeStatus};
use crate::storage::{ use crate::storage::{
storage_location_summary, BucketInfo, BucketSummary, ClusterDriveHealth, storage_location_summary, BucketInfo, BucketSummary, ClusterDriveHealth, ClusterErasureHealth,
ClusterErasureHealth, ClusterHealth, ClusterPeerHealth, ClusterRepairHealth, ClusterHealth, ClusterPeerHealth, ClusterRepairHealth, CompleteMultipartResult, CopyResult,
CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry, GetResult, HeadResult, ListObjectEntry, ListObjectsResult, MultipartUploadInfo, PutResult,
ListObjectsResult, MultipartUploadInfo, PutResult, RuntimeBucketStats, RuntimeBucketStats, RuntimeStatsState, StorageLocationSummary, StorageStats,
RuntimeStatsState, StorageLocationSummary, StorageStats,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -170,7 +169,8 @@ impl DistributedStore {
let peers = self.peer_health(&nodes); let peers = self.peer_health(&nodes);
let drives = self.drive_health(&drive_states, &erasure_sets); let drives = self.drive_health(&drive_states, &erasure_sets);
let repairs = self.repair_health().await; let repairs = self.repair_health().await;
let quorum_healthy = majority_healthy && self.quorum_is_healthy(&nodes, &drive_states, &erasure_sets); let quorum_healthy =
majority_healthy && self.quorum_is_healthy(&nodes, &drive_states, &erasure_sets);
Ok(ClusterHealth { Ok(ClusterHealth {
enabled: true, enabled: true,
@@ -291,6 +291,69 @@ impl DistributedStore {
Ok(PutResult { md5: md5_hex }) Ok(PutResult { md5: md5_hex })
} }
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
if !self.bucket_exists(bucket).await {
return Err(crate::error::StorageError::no_such_bucket().into());
}
let previous_size = self.manifest_size_if_exists(bucket, key).await;
let erasure_set = self
.state
.get_erasure_set_for_object(bucket, key)
.await
.ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?;
let mut chunks = Vec::new();
for (chunk_index, chunk_data) in data
.chunks(self.erasure_config.chunk_size_bytes)
.enumerate()
{
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
bucket,
key,
chunk_index as u32,
chunk_data,
)
.await?;
chunks.push(chunk_manifest);
}
let md5_hex = format!("{:x}", Md5::digest(data));
let now = Utc::now().to_rfc3339();
let manifest = ObjectManifest {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: uuid::Uuid::new_v4().to_string(),
size: data.len() as u64,
content_md5: md5_hex.clone(),
content_type: metadata
.get("content-type")
.cloned()
.unwrap_or_else(|| "binary/octet-stream".to_string()),
metadata,
created_at: now.clone(),
last_modified: now,
data_shards: self.erasure_config.data_shards,
parity_shards: self.erasure_config.parity_shards,
chunk_size: self.erasure_config.chunk_size_bytes,
chunks,
};
self.store_manifest(&manifest).await?;
self.track_object_upsert(bucket, previous_size, manifest.size)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object( pub async fn get_object(
&self, &self,
bucket: &str, bucket: &str,
@@ -1033,7 +1096,11 @@ impl DistributedStore {
peers peers
} }
fn drive_health(&self, drive_states: &[DriveState], erasure_sets: &[ErasureSet]) -> Vec<ClusterDriveHealth> { fn drive_health(
&self,
drive_states: &[DriveState],
erasure_sets: &[ErasureSet],
) -> Vec<ClusterDriveHealth> {
let local_node_id = self.state.local_node_id(); let local_node_id = self.state.local_node_id();
let mut drive_to_set = HashMap::new(); let mut drive_to_set = HashMap::new();
for erasure_set in erasure_sets { for erasure_set in erasure_sets {
@@ -1118,7 +1185,10 @@ impl DistributedStore {
.unwrap_or(false); .unwrap_or(false);
} }
matches!(node_statuses.get(drive.node_id.as_str()), Some(NodeStatus::Online)) matches!(
node_statuses.get(drive.node_id.as_str()),
Some(NodeStatus::Online)
)
}) })
.count(); .count();
+3 -6
View File
@@ -80,11 +80,7 @@ impl ErasureCoder {
let total = self.config.total_shards(); let total = self.config.total_shards();
if shards.len() != total { if shards.len() != total {
anyhow::bail!( anyhow::bail!("Expected {} shards, got {}", total, shards.len());
"Expected {} shards, got {}",
total,
shards.len()
);
} }
let available = shards.iter().filter(|s| s.is_some()).count(); let available = shards.iter().filter(|s| s.is_some()).count();
@@ -159,7 +155,8 @@ mod tests {
#[test] #[test]
fn test_decode_with_missing_shards() { fn test_decode_with_missing_shards() {
let coder = ErasureCoder::new(&test_config()).unwrap(); let coder = ErasureCoder::new(&test_config()).unwrap();
let original = b"Testing reconstruction with missing shards - this should work with 4 of 6."; let original =
b"Testing reconstruction with missing shards - this should work with 4 of 6.";
let shards = coder.encode_chunk(original).unwrap(); let shards = coder.encode_chunk(original).unwrap();
+7 -11
View File
@@ -166,12 +166,7 @@ impl HealingService {
Ok(stats) Ok(stats)
} }
async fn heal_bucket( async fn heal_bucket(&self, bucket: &str, offline_nodes: &[String], stats: &mut HealStats) {
&self,
bucket: &str,
offline_nodes: &[String],
stats: &mut HealStats,
) {
let bucket_dir = self.manifest_dir.join(bucket); let bucket_dir = self.manifest_dir.join(bucket);
let manifests = match self.collect_manifests(&bucket_dir).await { let manifests = match self.collect_manifests(&bucket_dir).await {
Ok(m) => m, Ok(m) => m,
@@ -264,10 +259,10 @@ impl HealingService {
} }
// Reconstruct all shards // Reconstruct all shards
let reconstructed = match self.erasure_coder.decode_chunk( let reconstructed = match self
&mut shards, .erasure_coder
chunk.data_size, .decode_chunk(&mut shards, chunk.data_size)
) { {
Ok(_) => true, Ok(_) => true,
Err(e) => { Err(e) => {
tracing::error!( tracing::error!(
@@ -361,7 +356,8 @@ impl HealingService {
/// Collect all manifests under a bucket directory. /// Collect all manifests under a bucket directory.
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> { async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
let mut manifests = Vec::new(); let mut manifests = Vec::new();
self.collect_manifests_recursive(dir, &mut manifests).await?; self.collect_manifests_recursive(dir, &mut manifests)
.await?;
Ok(manifests) Ok(manifests)
} }
+11 -12
View File
@@ -7,8 +7,7 @@ use tokio::sync::Mutex;
use super::drive_manager::{DriveManager, DriveStatus}; use super::drive_manager::{DriveManager, DriveStatus};
use super::protocol::{ use super::protocol::{
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage, ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage, NodeInfo,
NodeInfo,
}; };
use super::quic_transport::QuicTransport; use super::quic_transport::QuicTransport;
use super::state::ClusterState; use super::state::ClusterState;
@@ -49,7 +48,11 @@ impl MembershipManager {
/// Join the cluster by contacting seed nodes. /// Join the cluster by contacting seed nodes.
/// Sends a JoinRequest to each seed node until one accepts. /// Sends a JoinRequest to each seed node until one accepts.
pub async fn join_cluster(&self, seed_nodes: &[String], allow_bootstrap_on_failure: bool) -> Result<()> { pub async fn join_cluster(
&self,
seed_nodes: &[String],
allow_bootstrap_on_failure: bool,
) -> Result<()> {
if seed_nodes.is_empty() { if seed_nodes.is_empty() {
tracing::info!("No seed nodes configured, starting as initial cluster node"); tracing::info!("No seed nodes configured, starting as initial cluster node");
self.state.add_node(self.local_node_info.clone()).await; self.state.add_node(self.local_node_info.clone()).await;
@@ -84,14 +87,13 @@ impl MembershipManager {
return Ok(()); return Ok(());
} }
anyhow::bail!("Could not reach any configured seed nodes; refusing unsafe cluster bootstrap") anyhow::bail!(
"Could not reach any configured seed nodes; refusing unsafe cluster bootstrap"
)
} }
async fn try_join(&self, addr: SocketAddr) -> Result<()> { async fn try_join(&self, addr: SocketAddr) -> Result<()> {
let conn = self let conn = self.transport.get_connection("seed", addr).await?;
.transport
.get_connection("seed", addr)
.await?;
let request = ClusterRequest::JoinRequest(JoinRequestMessage { let request = ClusterRequest::JoinRequest(JoinRequestMessage {
node_info: self.local_node_info.clone(), node_info: self.local_node_info.clone(),
@@ -120,10 +122,7 @@ impl MembershipManager {
} }
Ok(()) Ok(())
} else { } else {
anyhow::bail!( anyhow::bail!("Join rejected: {}", join_resp.error.unwrap_or_default())
"Join rejected: {}",
join_resp.error.unwrap_or_default()
)
} }
} }
ClusterResponse::Error(e) => { ClusterResponse::Error(e) => {
+9 -6
View File
@@ -113,11 +113,17 @@ mod tests {
// Set 0 should interleave across nodes // Set 0 should interleave across nodes
let set0_nodes: Vec<&str> = sets[0].drives.iter().map(|d| d.node_id.as_str()).collect(); let set0_nodes: Vec<&str> = sets[0].drives.iter().map(|d| d.node_id.as_str()).collect();
assert_eq!(set0_nodes, vec!["node1", "node2", "node3", "node1", "node2", "node3"]); assert_eq!(
set0_nodes,
vec!["node1", "node2", "node3", "node1", "node2", "node3"]
);
// Set 1 should also interleave // Set 1 should also interleave
let set1_nodes: Vec<&str> = sets[1].drives.iter().map(|d| d.node_id.as_str()).collect(); let set1_nodes: Vec<&str> = sets[1].drives.iter().map(|d| d.node_id.as_str()).collect();
assert_eq!(set1_nodes, vec!["node1", "node2", "node3", "node1", "node2", "node3"]); assert_eq!(
set1_nodes,
vec!["node1", "node2", "node3", "node1", "node2", "node3"]
);
// Drive indices should be different between sets // Drive indices should be different between sets
let set0_drives: Vec<u32> = sets[0].drives.iter().map(|d| d.drive_index).collect(); let set0_drives: Vec<u32> = sets[0].drives.iter().map(|d| d.drive_index).collect();
@@ -129,10 +135,7 @@ mod tests {
#[test] #[test]
fn test_form_erasure_sets_remainder() { fn test_form_erasure_sets_remainder() {
// 2 nodes, 3 drives each, 4 shards => 1 set (2 drives left over) // 2 nodes, 3 drives each, 4 shards => 1 set (2 drives left over)
let nodes = vec![ let nodes = vec![("a".to_string(), 3), ("b".to_string(), 3)];
("a".to_string(), 3),
("b".to_string(), 3),
];
let sets = form_erasure_sets(&nodes, 4); let sets = form_erasure_sets(&nodes, 4);
assert_eq!(sets.len(), 1); assert_eq!(sets.len(), 1);
assert_eq!(sets[0].drives.len(), 4); assert_eq!(sets[0].drives.len(), 4);
-4
View File
@@ -13,7 +13,6 @@ pub enum ClusterRequest {
// ============================ // ============================
// Shard operations // Shard operations
// ============================ // ============================
/// Write a shard to a specific drive on the target node. /// Write a shard to a specific drive on the target node.
/// Shard data follows after this header on the same stream. /// Shard data follows after this header on the same stream.
ShardWrite(ShardWriteRequest), ShardWrite(ShardWriteRequest),
@@ -30,7 +29,6 @@ pub enum ClusterRequest {
// ============================ // ============================
// Manifest operations // Manifest operations
// ============================ // ============================
/// Store an object manifest on the target node. /// Store an object manifest on the target node.
ManifestWrite(ManifestWriteRequest), ManifestWrite(ManifestWriteRequest),
@@ -46,7 +44,6 @@ pub enum ClusterRequest {
// ============================ // ============================
// Cluster management // Cluster management
// ============================ // ============================
/// Periodic heartbeat. /// Periodic heartbeat.
Heartbeat(HeartbeatMessage), Heartbeat(HeartbeatMessage),
@@ -59,7 +56,6 @@ pub enum ClusterRequest {
// ============================ // ============================
// Healing // Healing
// ============================ // ============================
/// Request a shard to be reconstructed and placed on a target drive. /// Request a shard to be reconstructed and placed on a target drive.
HealRequest(HealRequestMessage), HealRequest(HealRequestMessage),
} }
+34 -22
View File
@@ -1,14 +1,14 @@
use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
};
use super::shard_store::{ShardId, ShardStore};
use super::state::{ClusterState, NodeStatus};
use anyhow::Result; use anyhow::Result;
use dashmap::DashMap; use dashmap::DashMap;
use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig}; use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
};
use super::shard_store::{ShardId, ShardStore};
use super::state::{ClusterState, NodeStatus};
/// QUIC transport layer for inter-node communication. /// QUIC transport layer for inter-node communication.
/// ///
@@ -54,13 +54,9 @@ impl QuicTransport {
} }
// Establish new connection // Establish new connection
let conn = self let conn = self.endpoint.connect(addr, "smartstorage")?.await?;
.endpoint
.connect(addr, "smartstorage")?
.await?;
self.connections self.connections.insert(node_id.to_string(), conn.clone());
.insert(node_id.to_string(), conn.clone());
Ok(conn) Ok(conn)
} }
@@ -246,7 +242,11 @@ impl QuicTransport {
}; };
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) { let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store.write_shard(&shard_id, &shard_data, write_req.checksum).await, Ok(store) => {
store
.write_shard(&shard_id, &shard_data, write_req.checksum)
.await
}
Err(error) => Err(error), Err(error) => Err(error),
}; };
@@ -272,7 +272,8 @@ impl QuicTransport {
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) { let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store, Ok(store) => store,
Err(error) => { Err(error) => {
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?; Self::send_error_response(&mut send, "InvalidDrive", error.to_string())
.await?;
return Ok(()); return Ok(());
} }
}; };
@@ -286,8 +287,10 @@ impl QuicTransport {
checksum, checksum,
}; };
// Send header // Send header
let header_bytes = bincode::serialize(&ClusterResponse::ShardReadResponse(header))?; let header_bytes =
send.write_all(&(header_bytes.len() as u32).to_le_bytes()).await?; bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
send.write_all(&(header_bytes.len() as u32).to_le_bytes())
.await?;
send.write_all(&header_bytes).await?; send.write_all(&header_bytes).await?;
// Send shard data // Send shard data
send.write_all(&data).await?; send.write_all(&data).await?;
@@ -300,8 +303,10 @@ impl QuicTransport {
shard_data_length: 0, shard_data_length: 0,
checksum: 0, checksum: 0,
}; };
let header_bytes = bincode::serialize(&ClusterResponse::ShardReadResponse(header))?; let header_bytes =
send.write_all(&(header_bytes.len() as u32).to_le_bytes()).await?; bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
send.write_all(&(header_bytes.len() as u32).to_le_bytes())
.await?;
send.write_all(&header_bytes).await?; send.write_all(&header_bytes).await?;
send.finish()?; send.finish()?;
} }
@@ -340,7 +345,8 @@ impl QuicTransport {
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) { let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store, Ok(store) => store,
Err(error) => { Err(error) => {
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?; Self::send_error_response(&mut send, "InvalidDrive", error.to_string())
.await?;
return Ok(()); return Ok(());
} }
}; };
@@ -403,7 +409,13 @@ impl QuicTransport {
send.write_all(&response).await?; send.write_all(&response).await?;
send.finish()?; send.finish()?;
self.broadcast_topology(&state, Some(response_topology), None, Some(&joining_node_id)).await; self.broadcast_topology(
&state,
Some(response_topology),
None,
Some(&joining_node_id),
)
.await;
} }
ClusterRequest::Heartbeat(heartbeat) => { ClusterRequest::Heartbeat(heartbeat) => {
@@ -434,7 +446,8 @@ impl QuicTransport {
send.finish()?; send.finish()?;
if local_topology_version > peer_topology_version { if local_topology_version > peer_topology_version {
self.broadcast_topology(&state, None, Some(&peer_node_id), None).await; self.broadcast_topology(&state, None, Some(&peer_node_id), None)
.await;
} }
} }
@@ -585,8 +598,7 @@ impl QuicTransport {
/// Close the QUIC endpoint gracefully. /// Close the QUIC endpoint gracefully.
pub fn close(&self) { pub fn close(&self) {
self.endpoint self.endpoint.close(quinn::VarInt::from_u32(0), b"shutdown");
.close(quinn::VarInt::from_u32(0), b"shutdown");
} }
/// Get the local node ID. /// Get the local node ID.
+2 -11
View File
@@ -40,12 +40,7 @@ impl ShardStore {
} }
/// Write a shard to disk atomically (write to temp file, then rename). /// Write a shard to disk atomically (write to temp file, then rename).
pub async fn write_shard( pub async fn write_shard(&self, shard_id: &ShardId, data: &[u8], checksum: u32) -> Result<()> {
&self,
shard_id: &ShardId,
data: &[u8],
checksum: u32,
) -> Result<()> {
let shard_path = self.shard_data_path(shard_id); let shard_path = self.shard_data_path(shard_id);
let meta_path = self.shard_meta_path(shard_id); let meta_path = self.shard_meta_path(shard_id);
@@ -117,11 +112,7 @@ impl ShardStore {
} }
/// List all shard IDs for a given bucket and key (across all chunks). /// List all shard IDs for a given bucket and key (across all chunks).
pub async fn list_shards_for_object( pub async fn list_shards_for_object(&self, bucket: &str, key: &str) -> Result<Vec<ShardId>> {
&self,
bucket: &str,
key: &str,
) -> Result<Vec<ShardId>> {
let key_dir = self.key_dir(bucket, key); let key_dir = self.key_dir(bucket, key);
if !key_dir.exists() { if !key_dir.exists() {
return Ok(Vec::new()); return Ok(Vec::new());
+13 -11
View File
@@ -3,16 +3,16 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use super::placement::{DriveLocation, ErasureSet};
use super::persistence; use super::persistence;
use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo}; use super::placement::{DriveLocation, ErasureSet};
use super::protocol::{ClusterTopology, DriveLocationInfo, ErasureSetInfo, NodeInfo};
/// Node status for tracking liveness. /// Node status for tracking liveness.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub enum NodeStatus { pub enum NodeStatus {
Online, Online,
Suspect, // missed 2+ heartbeats Suspect, // missed 2+ heartbeats
Offline, // missed 5+ heartbeats Offline, // missed 5+ heartbeats
} }
/// Tracked state for a peer node. /// Tracked state for a peer node.
@@ -162,11 +162,8 @@ impl ClusterState {
if inner.erasure_sets.is_empty() { if inner.erasure_sets.is_empty() {
return None; return None;
} }
let set_idx = super::placement::erasure_set_for_object( let set_idx =
bucket, super::placement::erasure_set_for_object(bucket, key, inner.erasure_sets.len() as u32);
key,
inner.erasure_sets.len() as u32,
);
inner.erasure_sets.get(set_idx as usize).cloned() inner.erasure_sets.get(set_idx as usize).cloned()
} }
@@ -284,7 +281,10 @@ impl ClusterState {
let now = chrono::Utc::now(); let now = chrono::Utc::now();
for node_info in &topology.nodes { for node_info in &topology.nodes {
let existing_status = inner.nodes.get(&node_info.node_id).map(|node| node.status.clone()); let existing_status = inner
.nodes
.get(&node_info.node_id)
.map(|node| node.status.clone());
let existing_missed_heartbeats = inner let existing_missed_heartbeats = inner
.nodes .nodes
.get(&node_info.node_id) .get(&node_info.node_id)
@@ -304,7 +304,9 @@ impl ClusterState {
); );
} }
inner.nodes.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id)); inner
.nodes
.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
// Update erasure sets // Update erasure sets
inner.erasure_sets = topology inner.erasure_sets = topology
+5 -2
View File
@@ -45,11 +45,14 @@ pub struct AuthConfig {
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Credential { pub struct Credential {
#[serde(rename = "accessKeyId")]
pub access_key_id: String, pub access_key_id: String,
#[serde(rename = "secretAccessKey")]
pub secret_access_key: String, pub secret_access_key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bucket_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
+25 -5
View File
@@ -18,15 +18,27 @@ impl StorageError {
} }
pub fn no_such_key() -> Self { pub fn no_such_key() -> Self {
Self::new("NoSuchKey", "The specified key does not exist.", StatusCode::NOT_FOUND) Self::new(
"NoSuchKey",
"The specified key does not exist.",
StatusCode::NOT_FOUND,
)
} }
pub fn no_such_bucket() -> Self { pub fn no_such_bucket() -> Self {
Self::new("NoSuchBucket", "The specified bucket does not exist", StatusCode::NOT_FOUND) Self::new(
"NoSuchBucket",
"The specified bucket does not exist",
StatusCode::NOT_FOUND,
)
} }
pub fn bucket_not_empty() -> Self { pub fn bucket_not_empty() -> Self {
Self::new("BucketNotEmpty", "The bucket you tried to delete is not empty", StatusCode::CONFLICT) Self::new(
"BucketNotEmpty",
"The bucket you tried to delete is not empty",
StatusCode::CONFLICT,
)
} }
pub fn access_denied() -> Self { pub fn access_denied() -> Self {
@@ -34,11 +46,19 @@ impl StorageError {
} }
pub fn no_such_upload() -> Self { pub fn no_such_upload() -> Self {
Self::new("NoSuchUpload", "The specified upload does not exist", StatusCode::NOT_FOUND) Self::new(
"NoSuchUpload",
"The specified upload does not exist",
StatusCode::NOT_FOUND,
)
} }
pub fn invalid_part_number() -> Self { pub fn invalid_part_number() -> Self {
Self::new("InvalidPartNumber", "Part number must be between 1 and 10000", StatusCode::BAD_REQUEST) Self::new(
"InvalidPartNumber",
"Part number must be between 1 and 10000",
StatusCode::BAD_REQUEST,
)
} }
pub fn internal_error(msg: &str) -> Self { pub fn internal_error(msg: &str) -> Self {
+5 -2
View File
@@ -2,9 +2,9 @@ mod action;
mod auth; mod auth;
mod cluster; mod cluster;
mod config; mod config;
mod error;
mod management; mod management;
mod policy; mod policy;
mod error;
mod server; mod server;
mod storage; mod storage;
mod xml_response; mod xml_response;
@@ -12,7 +12,10 @@ mod xml_response;
use clap::Parser; use clap::Parser;
#[derive(Parser)] #[derive(Parser)]
#[command(name = "ruststorage", about = "High-performance S3-compatible storage server")] #[command(
name = "ruststorage",
about = "High-performance S3-compatible storage server"
)]
struct Cli { struct Cli {
/// Run in management mode (IPC via stdin/stdout) /// Run in management mode (IPC via stdin/stdout)
#[arg(long)] #[arg(long)]
+259 -18
View File
@@ -7,6 +7,7 @@ use tokio::io::{AsyncBufReadExt, BufReader};
use crate::config::Credential; use crate::config::Credential;
use crate::config::SmartStorageConfig; use crate::config::SmartStorageConfig;
use crate::server::StorageServer; use crate::server::StorageServer;
use crate::storage::BucketExport;
#[derive(Deserialize)] #[derive(Deserialize)]
struct IpcRequest { struct IpcRequest {
@@ -91,17 +92,15 @@ pub async fn management_loop() -> Result<()> {
config: SmartStorageConfig, config: SmartStorageConfig,
} }
match serde_json::from_value::<StartParams>(req.params) { match serde_json::from_value::<StartParams>(req.params) {
Ok(params) => { Ok(params) => match StorageServer::start(params.config).await {
match StorageServer::start(params.config).await { Ok(s) => {
Ok(s) => { server = Some(s);
server = Some(s); send_response(id, serde_json::json!({}));
send_response(id, serde_json::json!({}));
}
Err(e) => {
send_error(id, format!("Failed to start server: {}", e));
}
} }
} Err(e) => {
send_error(id, format!("Failed to start server: {}", e));
}
},
Err(e) => { Err(e) => {
send_error(id, format!("Invalid start params: {}", e)); send_error(id, format!("Invalid start params: {}", e));
} }
@@ -126,10 +125,7 @@ pub async fn management_loop() -> Result<()> {
send_response(id, serde_json::json!({})); send_response(id, serde_json::json!({}));
} }
Err(e) => { Err(e) => {
send_error( send_error(id, format!("Failed to create bucket: {}", e));
id,
format!("Failed to create bucket: {}", e),
);
} }
} }
} else { } else {
@@ -141,6 +137,254 @@ pub async fn management_loop() -> Result<()> {
} }
} }
} }
"createBucketTenant" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateBucketTenantParams {
bucket_name: String,
access_key_id: String,
secret_access_key: String,
region: Option<String>,
}
match serde_json::from_value::<CreateBucketTenantParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
let credential = Credential {
access_key_id: params.access_key_id,
secret_access_key: params.secret_access_key,
bucket_name: Some(params.bucket_name.clone()),
region: params.region,
};
match s
.create_bucket_tenant(&params.bucket_name, credential)
.await
{
Ok(credential) => match serde_json::to_value(credential) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket tenant: {}", error),
),
},
Err(error) => send_error(
id,
format!("Failed to create bucket tenant: {}", error),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid createBucketTenant params: {}", error));
}
}
}
"deleteBucketTenant" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct DeleteBucketTenantParams {
bucket_name: String,
access_key_id: Option<String>,
}
match serde_json::from_value::<DeleteBucketTenantParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s
.delete_bucket_tenant(
&params.bucket_name,
params.access_key_id.as_deref(),
)
.await
{
Ok(()) => send_response(id, serde_json::json!({})),
Err(error) => send_error(
id,
format!("Failed to delete bucket tenant: {}", error),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid deleteBucketTenant params: {}", error));
}
}
}
"rotateBucketTenantCredentials" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RotateBucketTenantCredentialsParams {
bucket_name: String,
access_key_id: String,
secret_access_key: String,
region: Option<String>,
}
match serde_json::from_value::<RotateBucketTenantCredentialsParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
let credential = Credential {
access_key_id: params.access_key_id,
secret_access_key: params.secret_access_key,
bucket_name: Some(params.bucket_name.clone()),
region: params.region,
};
match s
.rotate_bucket_tenant_credentials(&params.bucket_name, credential)
.await
{
Ok(credential) => match serde_json::to_value(credential) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket tenant: {}", error),
),
},
Err(error) => send_error(
id,
format!(
"Failed to rotate bucket tenant credentials: {}",
error
),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(
id,
format!("Invalid rotateBucketTenantCredentials params: {}", error),
);
}
}
}
"listBucketTenants" => {
if let Some(ref s) = server {
match s.list_bucket_tenants().await {
Ok(tenants) => match serde_json::to_value(tenants) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket tenants: {}", error),
),
},
Err(error) => {
send_error(id, format!("Failed to list bucket tenants: {}", error))
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"getBucketTenantCredential" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetBucketTenantCredentialParams {
bucket_name: String,
}
match serde_json::from_value::<GetBucketTenantCredentialParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s.get_bucket_tenant_credential(&params.bucket_name).await {
Ok(Some(credential)) => match serde_json::to_value(credential) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket tenant: {}", error),
),
},
Ok(None) => send_error(
id,
format!(
"No bucket tenant credential exists for bucket {}",
params.bucket_name
),
),
Err(error) => send_error(
id,
format!("Failed to get bucket tenant credential: {}", error),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(
id,
format!("Invalid getBucketTenantCredential params: {}", error),
);
}
}
}
"exportBucket" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ExportBucketParams {
bucket_name: String,
}
match serde_json::from_value::<ExportBucketParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s.store().export_bucket(&params.bucket_name).await {
Ok(export) => match serde_json::to_value(export) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket export: {}", error),
),
},
Err(error) => {
send_error(id, format!("Failed to export bucket: {}", error))
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid exportBucket params: {}", error));
}
}
}
"importBucket" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ImportBucketParams {
bucket_name: String,
source: BucketExport,
}
match serde_json::from_value::<ImportBucketParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s
.store()
.import_bucket(&params.bucket_name, params.source)
.await
{
Ok(()) => send_response(id, serde_json::json!({})),
Err(error) => {
send_error(id, format!("Failed to import bucket: {}", error))
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid importBucket params: {}", error));
}
}
}
"getStorageStats" => { "getStorageStats" => {
if let Some(ref s) = server { if let Some(ref s) = server {
match s.store().get_storage_stats().await { match s.store().get_storage_stats().await {
@@ -186,10 +430,7 @@ pub async fn management_loop() -> Result<()> {
match serde_json::to_value(s.list_credentials().await) { match serde_json::to_value(s.list_credentials().await) {
Ok(value) => send_response(id, value), Ok(value) => send_response(id, value),
Err(error) => { Err(error) => {
send_error( send_error(id, format!("Failed to serialize credentials: {}", error));
id,
format!("Failed to serialize credentials: {}", error),
);
} }
} }
} else { } else {
+7 -5
View File
@@ -81,14 +81,14 @@ where
let raw = PrincipalRaw::deserialize(deserializer)?; let raw = PrincipalRaw::deserialize(deserializer)?;
match raw { match raw {
PrincipalRaw::Star(s) if s == "*" => Ok(Principal::Wildcard), PrincipalRaw::Star(s) if s == "*" => Ok(Principal::Wildcard),
PrincipalRaw::Star(_) => Err(serde::de::Error::custom( PrincipalRaw::Star(_) => Err(serde::de::Error::custom("Principal string must be \"*\"")),
"Principal string must be \"*\"",
)),
PrincipalRaw::Map(map) => { PrincipalRaw::Map(map) => {
if let Some(aws) = map.get("AWS") { if let Some(aws) = map.get("AWS") {
Ok(Principal::Aws(aws.clone().into_vec())) Ok(Principal::Aws(aws.clone().into_vec()))
} else { } else {
Err(serde::de::Error::custom("Principal map must contain \"AWS\" key")) Err(serde::de::Error::custom(
"Principal map must contain \"AWS\" key",
))
} }
} }
} }
@@ -286,7 +286,9 @@ const MAX_POLICY_SIZE: usize = 20 * 1024; // 20 KB
pub fn validate_policy(json: &str) -> Result<BucketPolicy, StorageError> { pub fn validate_policy(json: &str) -> Result<BucketPolicy, StorageError> {
if json.len() > MAX_POLICY_SIZE { if json.len() > MAX_POLICY_SIZE {
return Err(StorageError::malformed_policy("Policy exceeds maximum size of 20KB")); return Err(StorageError::malformed_policy(
"Policy exceeds maximum size of 20KB",
));
} }
let policy: BucketPolicy = let policy: BucketPolicy =
+291 -64
View File
@@ -10,8 +10,8 @@ use hyper_util::rt::TokioIo;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@@ -21,9 +21,6 @@ use uuid::Uuid;
use crate::action::{self, RequestContext, StorageAction}; use crate::action::{self, RequestContext, StorageAction};
use crate::auth::{self, AuthenticatedIdentity}; use crate::auth::{self, AuthenticatedIdentity};
use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore; use crate::cluster::coordinator::DistributedStore;
use crate::cluster::drive_manager::DriveManager; use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService; use crate::cluster::healing::HealingService;
@@ -34,6 +31,9 @@ use crate::cluster::protocol::NodeInfo;
use crate::cluster::quic_transport::QuicTransport; use crate::cluster::quic_transport::QuicTransport;
use crate::cluster::shard_store::ShardStore; use crate::cluster::shard_store::ShardStore;
use crate::cluster::state::ClusterState; use crate::cluster::state::ClusterState;
use crate::config::{Credential, SmartStorageConfig};
use crate::error::StorageError;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::storage::{FileStore, StorageBackend}; use crate::storage::{FileStore, StorageBackend};
use crate::xml_response; use crate::xml_response;
@@ -70,7 +70,6 @@ pub struct StorageServer {
impl StorageServer { impl StorageServer {
pub async fn start(config: SmartStorageConfig) -> Result<Self> { pub async fn start(config: SmartStorageConfig) -> Result<Self> {
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
let mut cluster_shutdown_txs = Vec::new(); let mut cluster_shutdown_txs = Vec::new();
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster { let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
if cluster_config.enabled { if cluster_config.enabled {
@@ -88,8 +87,12 @@ impl StorageServer {
let policy_store = Arc::new(PolicyStore::new(store.policies_dir())); let policy_store = Arc::new(PolicyStore::new(store.policies_dir()));
policy_store.load_from_disk().await?; policy_store.load_from_disk().await?;
let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port) let auth_runtime = Arc::new(
.parse()?; auth::RuntimeCredentialStore::new(&config.auth, Some(Self::credentials_path(&config)))
.await?,
);
let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port).parse()?;
let listener = TcpListener::bind(addr).await?; let listener = TcpListener::bind(addr).await?;
let (shutdown_tx, shutdown_rx) = watch::channel(false); let (shutdown_tx, shutdown_rx) = watch::channel(false);
@@ -181,15 +184,117 @@ impl StorageServer {
pub async fn replace_credentials( pub async fn replace_credentials(
&self, &self,
credentials: Vec<crate::config::Credential>, credentials: Vec<Credential>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
self.auth_runtime.replace_credentials(credentials).await self.auth_runtime.replace_credentials(credentials).await
} }
pub async fn create_bucket_tenant(
&self,
bucket_name: &str,
credential: Credential,
) -> Result<Credential> {
self.ensure_tenant_auth_enabled()?;
let replacement = self
.auth_runtime
.replace_bucket_tenant_credential_with_snapshot(bucket_name, credential)
.await?;
if let Err(error) = self.store.create_bucket(bucket_name).await {
if let Err(rollback_error) = self
.auth_runtime
.replace_credentials(replacement.previous_credentials)
.await
{
return Err(anyhow::anyhow!(
"Failed to create tenant bucket: {}; credential rollback failed: {}",
error,
rollback_error.message
));
}
return Err(error);
}
Ok(replacement.credential)
}
pub async fn rotate_bucket_tenant_credentials(
&self,
bucket_name: &str,
credential: Credential,
) -> Result<Credential> {
self.ensure_tenant_auth_enabled()?;
if !self.store.bucket_exists(bucket_name).await {
return Err(StorageError::no_such_bucket().into());
}
Ok(self
.auth_runtime
.replace_bucket_tenant_credential(bucket_name, credential)
.await?)
}
pub async fn delete_bucket_tenant(
&self,
bucket_name: &str,
access_key_id: Option<&str>,
) -> Result<()> {
self.ensure_tenant_auth_enabled()?;
let removal = self
.auth_runtime
.remove_bucket_tenant_credentials(bucket_name, access_key_id)
.await?;
if access_key_id.is_none() && self.store.bucket_exists(bucket_name).await {
if let Err(error) = self.store.delete_bucket_recursive(bucket_name).await {
if let Err(rollback_error) = self
.auth_runtime
.replace_credentials(removal.previous_credentials)
.await
{
return Err(anyhow::anyhow!(
"Failed to delete tenant bucket: {}; credential rollback failed: {}",
error,
rollback_error.message
));
}
return Err(error);
}
}
Ok(())
}
pub async fn list_bucket_tenants(&self) -> Result<Vec<crate::auth::BucketTenantMetadata>> {
self.ensure_tenant_auth_enabled()?;
Ok(self.auth_runtime.list_bucket_tenants().await)
}
pub async fn get_bucket_tenant_credential(
&self,
bucket_name: &str,
) -> Result<Option<Credential>> {
self.ensure_tenant_auth_enabled()?;
Ok(self
.auth_runtime
.get_bucket_tenant_credential(bucket_name)
.await)
}
fn ensure_tenant_auth_enabled(&self) -> Result<()> {
if !self.auth_runtime.enabled() {
anyhow::bail!("Bucket tenants require auth.enabled=true");
}
Ok(())
}
fn credentials_path(config: &SmartStorageConfig) -> std::path::PathBuf {
std::path::PathBuf::from(&config.storage.directory)
.join(".smartstorage")
.join("credentials.json")
}
async fn start_standalone(config: &SmartStorageConfig) -> Result<Arc<StorageBackend>> { async fn start_standalone(config: &SmartStorageConfig) -> Result<Arc<StorageBackend>> {
let store = Arc::new(StorageBackend::Standalone( let store = Arc::new(StorageBackend::Standalone(FileStore::new(
FileStore::new(config.storage.directory.clone().into()), config.storage.directory.clone().into(),
)); )));
if config.storage.clean_slate { if config.storage.clean_slate {
store.reset().await?; store.reset().await?;
} else { } else {
@@ -208,7 +313,9 @@ impl StorageServer {
let topology_path = persistence::topology_path(&cluster_metadata_dir); let topology_path = persistence::topology_path(&cluster_metadata_dir);
let persisted_identity = persistence::load_identity(&identity_path).await?; let persisted_identity = persistence::load_identity(&identity_path).await?;
if let (Some(configured_node_id), Some(identity)) = (&cluster_config.node_id, &persisted_identity) { if let (Some(configured_node_id), Some(identity)) =
(&cluster_config.node_id, &persisted_identity)
{
if configured_node_id != &identity.node_id { if configured_node_id != &identity.node_id {
anyhow::bail!( anyhow::bail!(
"Configured cluster node ID '{}' conflicts with persisted node ID '{}'", "Configured cluster node ID '{}' conflicts with persisted node ID '{}'",
@@ -221,7 +328,11 @@ impl StorageServer {
let node_id = cluster_config let node_id = cluster_config
.node_id .node_id
.clone() .clone()
.or_else(|| persisted_identity.as_ref().map(|identity| identity.node_id.clone())) .or_else(|| {
persisted_identity
.as_ref()
.map(|identity| identity.node_id.clone())
})
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let cluster_id = persisted_identity let cluster_id = persisted_identity
.as_ref() .as_ref()
@@ -273,7 +384,9 @@ impl StorageServer {
let has_persisted_topology = persisted_topology.is_some(); let has_persisted_topology = persisted_topology.is_some();
if let Some(topology) = persisted_topology { if let Some(topology) = persisted_topology {
if topology.cluster_id != cluster_id { if topology.cluster_id != cluster_id {
anyhow::bail!("Persisted topology cluster ID does not match persisted node identity"); anyhow::bail!(
"Persisted topology cluster ID does not match persisted node identity"
);
} }
cluster_state.apply_topology(&topology).await; cluster_state.apply_topology(&topology).await;
} else if cluster_config.seed_nodes.is_empty() { } else if cluster_config.seed_nodes.is_empty() {
@@ -347,7 +460,11 @@ impl StorageServer {
let shard_stores_for_accept = local_shard_stores.clone(); let shard_stores_for_accept = local_shard_stores.clone();
tokio::spawn(async move { tokio::spawn(async move {
transport_clone transport_clone
.accept_loop(shard_stores_for_accept, Some(cluster_state_for_accept), quic_shutdown_rx) .accept_loop(
shard_stores_for_accept,
Some(cluster_state_for_accept),
quic_shutdown_rx,
)
.await; .await;
}); });
@@ -400,7 +517,10 @@ impl StorageServer {
); );
} }
Ok((store, vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx])) Ok((
store,
vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx],
))
} }
} }
@@ -414,17 +534,26 @@ impl SmartStorageConfig {
// Request handling // Request handling
// ============================ // ============================
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>; type BoxBody =
http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
fn full_body(data: impl Into<Bytes>) -> BoxBody { fn full_body(data: impl Into<Bytes>) -> BoxBody {
http_body_util::Full::new(data.into()) http_body_util::Full::new(data.into())
.map_err(|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> { match never {} }) .map_err(
|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> {
match never {}
},
)
.boxed() .boxed()
} }
fn empty_body() -> BoxBody { fn empty_body() -> BoxBody {
http_body_util::Empty::new() http_body_util::Empty::new()
.map_err(|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> { match never {} }) .map_err(
|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> {
match never {}
},
)
.boxed() .boxed()
} }
@@ -445,10 +574,10 @@ impl Stream for FrameStream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) }; let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
match inner.poll_next(cx) { match inner.poll_next(cx) {
Poll::Ready(Some(Ok(bytes))) => { Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(hyper::body::Frame::data(bytes)))),
Poll::Ready(Some(Ok(hyper::body::Frame::data(bytes)))) Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
} Box::new(e) as Box<dyn std::error::Error + Send + Sync>
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>))), ))),
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
@@ -482,7 +611,11 @@ fn storage_error_response(err: &StorageError, request_id: &str) -> Response<BoxB
.unwrap() .unwrap()
} }
fn json_response(status: StatusCode, value: serde_json::Value, request_id: &str) -> Response<BoxBody> { fn json_response(
status: StatusCode,
value: serde_json::Value,
request_id: &str,
) -> Response<BoxBody> {
Response::builder() Response::builder()
.status(status) .status(status)
.header("content-type", "application/json") .header("content-type", "application/json")
@@ -491,7 +624,12 @@ fn json_response(status: StatusCode, value: serde_json::Value, request_id: &str)
.unwrap() .unwrap()
} }
fn text_response(status: StatusCode, content_type: &str, body: String, request_id: &str) -> Response<BoxBody> { fn text_response(
status: StatusCode,
content_type: &str,
body: String,
request_id: &str,
) -> Response<BoxBody> {
Response::builder() Response::builder()
.status(status) .status(status)
.header("content-type", content_type) .header("content-type", content_type)
@@ -521,17 +659,20 @@ async fn handle_request(
} }
if method == Method::GET && uri.path().starts_with("/-/") { if method == Method::GET && uri.path().starts_with("/-/") {
let resp = match handle_operational_request(uri.path(), store, &config, &metrics, &request_id).await { let resp =
Ok(resp) => resp, match handle_operational_request(uri.path(), store, &config, &metrics, &request_id)
Err(error) => { .await
tracing::error!(error = %error, "Operational endpoint failed"); {
json_response( Ok(resp) => resp,
StatusCode::INTERNAL_SERVER_ERROR, Err(error) => {
serde_json::json!({ "ok": false, "error": error.to_string() }), tracing::error!(error = %error, "Operational endpoint failed");
&request_id, json_response(
) StatusCode::INTERNAL_SERVER_ERROR,
} serde_json::json!({ "ok": false, "error": error.to_string() }),
}; &request_id,
)
}
};
metrics.record_response(resp.status()); metrics.record_response(resp.status());
return Ok(resp); return Ok(resp);
} }
@@ -672,7 +813,11 @@ async fn handle_operational_request(
let cluster_health = store.get_cluster_health().await?; let cluster_health = store.get_cluster_health().await?;
let stats = store.get_storage_stats().await?; let stats = store.get_storage_stats().await?;
let cluster_enabled = if cluster_health.enabled { 1 } else { 0 }; let cluster_enabled = if cluster_health.enabled { 1 } else { 0 };
let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) { 1 } else { 0 }; let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) {
1
} else {
0
};
let body = format!( let body = format!(
"# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\ "# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\
# TYPE smartstorage_requests_total counter\n\ # TYPE smartstorage_requests_total counter\n\
@@ -720,6 +865,12 @@ async fn authorize_request(
identity: Option<&AuthenticatedIdentity>, identity: Option<&AuthenticatedIdentity>,
policy_store: &PolicyStore, policy_store: &PolicyStore,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
if let Some(identity) = identity {
if let Some(bucket_name) = identity.bucket_name.as_deref() {
authorize_scoped_credential(ctx, bucket_name)?;
}
}
// ListAllMyBuckets requires authentication (no bucket to apply policy to) // ListAllMyBuckets requires authentication (no bucket to apply policy to)
if ctx.action == StorageAction::ListAllMyBuckets { if ctx.action == StorageAction::ListAllMyBuckets {
if identity.is_none() { if identity.is_none() {
@@ -750,6 +901,46 @@ async fn authorize_request(
Ok(()) Ok(())
} }
fn authorize_scoped_credential(
ctx: &RequestContext,
bucket_name: &str,
) -> Result<(), StorageError> {
let Some(request_bucket) = ctx.bucket.as_deref() else {
return Err(StorageError::access_denied());
};
if request_bucket != bucket_name {
return Err(StorageError::access_denied());
}
if let Some(source_bucket) = ctx.source_bucket.as_deref() {
if source_bucket != bucket_name {
return Err(StorageError::access_denied());
}
}
match ctx.action {
StorageAction::CreateBucket
| StorageAction::DeleteBucket
| StorageAction::GetBucketPolicy
| StorageAction::PutBucketPolicy
| StorageAction::DeleteBucketPolicy
| StorageAction::ListAllMyBuckets => Err(StorageError::access_denied()),
StorageAction::HeadBucket
| StorageAction::ListBucket
| StorageAction::GetObject
| StorageAction::HeadObject
| StorageAction::PutObject
| StorageAction::DeleteObject
| StorageAction::CopyObject
| StorageAction::ListBucketMultipartUploads
| StorageAction::AbortMultipartUpload
| StorageAction::InitiateMultipartUpload
| StorageAction::UploadPart
| StorageAction::CompleteMultipartUpload => Ok(()),
}
}
// ============================ // ============================
// Routing // Routing
// ============================ // ============================
@@ -788,9 +979,16 @@ async fn route_request(
// Check for ?policy query parameter // Check for ?policy query parameter
if query.contains_key("policy") { if query.contains_key("policy") {
return match method { return match method {
Method::GET => handle_get_bucket_policy(policy_store, &bucket, request_id).await, Method::GET => {
Method::PUT => handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id).await, handle_get_bucket_policy(policy_store, &bucket, request_id).await
Method::DELETE => handle_delete_bucket_policy(policy_store, &bucket, request_id).await, }
Method::PUT => {
handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id)
.await
}
Method::DELETE => {
handle_delete_bucket_policy(policy_store, &bucket, request_id).await
}
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
}; };
} }
@@ -804,7 +1002,9 @@ async fn route_request(
} }
} }
Method::PUT => handle_create_bucket(store, &bucket, request_id).await, Method::PUT => handle_create_bucket(store, &bucket, request_id).await,
Method::DELETE => handle_delete_bucket(store, &bucket, request_id, policy_store).await, Method::DELETE => {
handle_delete_bucket(store, &bucket, request_id, policy_store).await
}
Method::HEAD => handle_head_bucket(store, &bucket, request_id).await, Method::HEAD => handle_head_bucket(store, &bucket, request_id).await,
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
} }
@@ -824,12 +1024,8 @@ async fn route_request(
handle_put_object(req, store, &bucket, &key, request_id).await handle_put_object(req, store, &bucket, &key, request_id).await
} }
} }
Method::GET => { Method::GET => handle_get_object(req, store, &bucket, &key, request_id).await,
handle_get_object(req, store, &bucket, &key, request_id).await Method::HEAD => handle_head_object(store, &bucket, &key, request_id).await,
}
Method::HEAD => {
handle_head_object(store, &bucket, &key, request_id).await
}
Method::DELETE => { Method::DELETE => {
if query.contains_key("uploadId") { if query.contains_key("uploadId") {
let upload_id = query.get("uploadId").unwrap(); let upload_id = query.get("uploadId").unwrap();
@@ -843,7 +1039,8 @@ async fn route_request(
handle_initiate_multipart(req, store, &bucket, &key, request_id).await handle_initiate_multipart(req, store, &bucket, &key, request_id).await
} else if query.contains_key("uploadId") { } else if query.contains_key("uploadId") {
let upload_id = query.get("uploadId").unwrap().clone(); let upload_id = query.get("uploadId").unwrap().clone();
handle_complete_multipart(req, store, &bucket, &key, &upload_id, request_id).await handle_complete_multipart(req, store, &bucket, &key, &upload_id, request_id)
.await
} else { } else {
let err = StorageError::invalid_request("Invalid POST request"); let err = StorageError::invalid_request("Invalid POST request");
Ok(storage_error_response(&err, request_id)) Ok(storage_error_response(&err, request_id))
@@ -972,7 +1169,13 @@ async fn handle_get_object(
let mut builder = Response::builder() let mut builder = Response::builder()
.header("ETag", format!("\"{}\"", result.md5)) .header("ETag", format!("\"{}\"", result.md5))
.header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string()) .header(
"Last-Modified",
result
.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string(),
)
.header("Content-Type", &content_type) .header("Content-Type", &content_type)
.header("Accept-Ranges", "bytes") .header("Accept-Ranges", "bytes")
.header("x-amz-request-id", request_id); .header("x-amz-request-id", request_id);
@@ -1023,7 +1226,13 @@ async fn handle_head_object(
let mut builder = Response::builder() let mut builder = Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header("ETag", format!("\"{}\"", result.md5)) .header("ETag", format!("\"{}\"", result.md5))
.header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string()) .header(
"Last-Modified",
result
.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string(),
)
.header("Content-Type", &content_type) .header("Content-Type", &content_type)
.header("Content-Length", result.size.to_string()) .header("Content-Length", result.size.to_string())
.header("Accept-Ranges", "bytes") .header("Accept-Ranges", "bytes")
@@ -1086,7 +1295,14 @@ async fn handle_copy_object(
}; };
let result = store let result = store
.copy_object(&src_bucket, &src_key, dest_bucket, dest_key, &metadata_directive, new_metadata) .copy_object(
&src_bucket,
&src_key,
dest_bucket,
dest_key,
&metadata_directive,
new_metadata,
)
.await?; .await?;
let xml = xml_response::copy_object_result_xml(&result.md5, &result.last_modified.to_rfc3339()); let xml = xml_response::copy_object_result_xml(&result.md5, &result.last_modified.to_rfc3339());
@@ -1130,7 +1346,11 @@ async fn handle_put_bucket_policy(
} }
// Read body // Read body
let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes(); let body_bytes = req
.collect()
.await
.map_err(|e| anyhow::anyhow!("Body error: {}", e))?
.to_bytes();
let body_str = String::from_utf8_lossy(&body_bytes); let body_str = String::from_utf8_lossy(&body_bytes);
// Validate and parse // Validate and parse
@@ -1212,7 +1432,11 @@ async fn handle_complete_multipart(
request_id: &str, request_id: &str,
) -> Result<Response<BoxBody>> { ) -> Result<Response<BoxBody>> {
// Read request body (XML) // Read request body (XML)
let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes(); let body_bytes = req
.collect()
.await
.map_err(|e| anyhow::anyhow!("Body error: {}", e))?
.to_bytes();
let body_str = String::from_utf8_lossy(&body_bytes); let body_str = String::from_utf8_lossy(&body_bytes);
// Parse parts from XML using regex-like approach // Parse parts from XML using regex-like approach
@@ -1276,8 +1500,12 @@ fn extract_metadata(headers: &hyper::HeaderMap) -> HashMap<String, String> {
let name_str = name.as_str().to_lowercase(); let name_str = name.as_str().to_lowercase();
if let Ok(val) = value.to_str() { if let Ok(val) = value.to_str() {
match name_str.as_str() { match name_str.as_str() {
"content-type" | "cache-control" | "content-disposition" "content-type"
| "content-encoding" | "content-language" | "expires" => { | "cache-control"
| "content-disposition"
| "content-encoding"
| "content-language"
| "expires" => {
metadata.insert(name_str, val.to_string()); metadata.insert(name_str, val.to_string());
} }
_ if name_str.starts_with("x-amz-meta-") => { _ if name_str.starts_with("x-amz-meta-") => {
@@ -1290,7 +1518,10 @@ fn extract_metadata(headers: &hyper::HeaderMap) -> HashMap<String, String> {
// Default content-type // Default content-type
if !metadata.contains_key("content-type") { if !metadata.contains_key("content-type") {
metadata.insert("content-type".to_string(), "binary/octet-stream".to_string()); metadata.insert(
"content-type".to_string(),
"binary/octet-stream".to_string(),
);
} }
metadata metadata
@@ -1325,10 +1556,9 @@ fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
if let Some(part_end) = after_part.find("</Part>") { if let Some(part_end) = after_part.find("</Part>") {
let part_content = &after_part[..part_end]; let part_content = &after_part[..part_end];
let part_number = extract_xml_value(part_content, "PartNumber") let part_number =
.and_then(|s| s.parse::<u32>().ok()); extract_xml_value(part_content, "PartNumber").and_then(|s| s.parse::<u32>().ok());
let etag = extract_xml_value(part_content, "ETag") let etag = extract_xml_value(part_content, "ETag").map(|s| s.replace('"', ""));
.map(|s| s.replace('"', ""));
if let (Some(pn), Some(et)) = (part_number, etag) { if let (Some(pn), Some(et)) = (part_number, etag) {
parts.push((pn, et)); parts.push((pn, et));
@@ -1394,9 +1624,6 @@ fn add_cors_headers(headers: &mut hyper::HeaderMap, config: &SmartStorageConfig)
); );
} }
if config.cors.allow_credentials == Some(true) { if config.cors.allow_credentials == Some(true) {
headers.insert( headers.insert("access-control-allow-credentials", "true".parse().unwrap());
"access-control-allow-credentials",
"true".parse().unwrap(),
);
} }
} }
+157
View File
@@ -99,6 +99,25 @@ pub struct StorageStats {
pub storage_locations: Vec<StorageLocationSummary>, pub storage_locations: Vec<StorageLocationSummary>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketExport {
pub format: String,
pub bucket_name: String,
pub exported_at: i64,
pub objects: Vec<BucketExportObject>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketExportObject {
pub key: String,
pub size: u64,
pub md5: String,
pub metadata: HashMap<String, String>,
pub data_hex: String,
}
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ClusterPeerHealth { pub struct ClusterPeerHealth {
@@ -593,6 +612,40 @@ impl FileStore {
Ok(PutResult { md5: md5_hex }) Ok(PutResult { md5: md5_hex })
} }
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&object_path, data).await?;
let md5_hex = format!("{:x}", Md5::digest(data));
fs::write(format!("{}.md5", object_path.display()), &md5_hex).await?;
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(
format!("{}.metadata.json", object_path.display()),
metadata_json,
)
.await?;
self.track_object_upsert(bucket, previous_size, data.len() as u64)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object( pub async fn get_object(
&self, &self,
bucket: &str, bucket: &str,
@@ -1311,6 +1364,25 @@ impl StorageBackend {
} }
} }
pub async fn delete_bucket_recursive(&self, bucket: &str) -> Result<()> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
loop {
let objects = self.list_objects(bucket, "", "", 1000, None).await?;
if objects.contents.is_empty() {
break;
}
for object in objects.contents {
self.delete_object(bucket, &object.key).await?;
}
}
self.delete_bucket(bucket).await
}
pub async fn put_object( pub async fn put_object(
&self, &self,
bucket: &str, bucket: &str,
@@ -1324,6 +1396,21 @@ impl StorageBackend {
} }
} }
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.put_object_bytes(bucket, key, data, metadata).await
}
StorageBackend::Clustered(ds) => ds.put_object_bytes(bucket, key, data, metadata).await,
}
}
pub async fn get_object( pub async fn get_object(
&self, &self,
bucket: &str, bucket: &str,
@@ -1453,6 +1540,76 @@ impl StorageBackend {
StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await, StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await,
} }
} }
pub async fn export_bucket(&self, bucket: &str) -> Result<BucketExport> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let objects = self.list_objects(bucket, "", "", usize::MAX, None).await?;
let mut exported_objects = Vec::with_capacity(objects.contents.len());
for object in objects.contents {
let result = self.get_object(bucket, &object.key, None).await?;
let mut file = result.body;
let mut data = Vec::with_capacity(result.size as usize);
file.read_to_end(&mut data).await?;
exported_objects.push(BucketExportObject {
key: object.key,
size: result.size,
md5: result.md5,
metadata: result.metadata,
data_hex: hex::encode(data),
});
}
Ok(BucketExport {
format: "smartstorage.bucket.v1".to_string(),
bucket_name: bucket.to_string(),
exported_at: Utc::now().timestamp_millis(),
objects: exported_objects,
})
}
pub async fn import_bucket(&self, bucket: &str, source: BucketExport) -> Result<()> {
if source.format != "smartstorage.bucket.v1" {
return Err(StorageError::invalid_request("Unsupported bucket export format.").into());
}
let mut import_objects = Vec::with_capacity(source.objects.len());
for object in source.objects {
let data = hex::decode(&object.data_hex)
.map_err(|error| StorageError::invalid_request(&error.to_string()))?;
if data.len() as u64 != object.size {
return Err(StorageError::invalid_request(&format!(
"Bucket export object '{}' size does not match payload.",
object.key
))
.into());
}
let md5_hex = format!("{:x}", Md5::digest(&data));
if !object.md5.eq_ignore_ascii_case(&md5_hex) {
return Err(StorageError::invalid_request(&format!(
"Bucket export object '{}' md5 does not match payload.",
object.key
))
.into());
}
import_objects.push((object.key, data, object.metadata));
}
if !self.bucket_exists(bucket).await {
self.create_bucket(bucket).await?;
}
for (key, data, metadata) in import_objects {
self.put_object_bytes(bucket, &key, &data, metadata).await?;
}
Ok(())
}
} }
// ============================ // ============================
+8 -2
View File
@@ -47,7 +47,10 @@ pub fn list_objects_v1_xml(bucket: &str, result: &ListObjectsResult) -> String {
); );
if !result.delimiter.is_empty() { if !result.delimiter.is_empty() {
xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(&result.delimiter))); xml.push_str(&format!(
"<Delimiter>{}</Delimiter>",
xml_escape(&result.delimiter)
));
} }
for entry in &result.contents { for entry in &result.contents {
@@ -95,7 +98,10 @@ pub fn list_objects_v2_xml(bucket: &str, result: &ListObjectsResult) -> String {
); );
if !result.delimiter.is_empty() { if !result.delimiter.is_empty() {
xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(&result.delimiter))); xml.push_str(&format!(
"<Delimiter>{}</Delimiter>",
xml_escape(&result.delimiter)
));
} }
if let Some(ref token) = result.next_continuation_token { if let Some(ref token) = result.next_continuation_token {
+11
View File
@@ -75,6 +75,17 @@ tap.test('should expose disabled cluster health in standalone mode', async () =>
expect(clusterHealth.drives).toEqual(undefined); expect(clusterHealth.drives).toEqual(undefined);
}); });
tap.test('should expose health and metrics with auth disabled', async () => {
const health = await testSmartStorageInstance.getHealth();
expect(health.running).toEqual(true);
expect(health.auth.enabled).toEqual(false);
expect(health.auth.tenantCredentialCount).toEqual(0);
const metrics = await testSmartStorageInstance.getMetrics();
expect(metrics.tenantCredentialCount).toEqual(0);
expect(metrics.prometheusText).toMatch(/smartstorage_tenant_credentials_total 0/);
});
tap.test('should create a bucket', async () => { tap.test('should create a bucket', async () => {
const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' })); const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' }));
expect(response.$metadata.httpStatusCode).toEqual(200); expect(response.$metadata.httpStatusCode).toEqual(200);
+404
View File
@@ -0,0 +1,404 @@
/// <reference types="node" />
import { expect, tap } from '@git.zone/tstest/tapbundle';
import {
CopyObjectCommand,
GetBucketPolicyCommand,
GetObjectCommand,
HeadBucketCommand,
ListBucketsCommand,
ListObjectsV2Command,
PutBucketPolicyCommand,
PutObjectCommand,
DeleteObjectCommand,
S3Client,
} from '@aws-sdk/client-s3';
import { rm } from 'fs/promises';
import { fileURLToPath } from 'url';
import { Readable } from 'stream';
import * as smartstorage from '../ts/index.js';
const TEST_PORT = 3361;
const STORAGE_DIR = fileURLToPath(new URL('../.nogit/bucket-tenant-tests', import.meta.url));
const WORKAPP_A_BUCKET = 'workapp-a-bucket';
const WORKAPP_B_BUCKET = 'workapp-b-bucket';
const RESTORE_BUCKET = 'workapp-a-restore-bucket';
const CORRUPT_RESTORE_BUCKET = 'workapp-a-corrupt-restore-bucket';
const POLICY_BUCKET = 'workapp-policy-bucket';
const DUPLICATE_TENANT_BUCKET = 'workapp-duplicate-tenant-bucket';
const REVOKE_ONLY_BUCKET = 'workapp-revoke-only-bucket';
const ADMIN_CREDENTIAL: smartstorage.IStorageCredential = {
accessKeyId: 'TENANTADMIN',
secretAccessKey: 'TENANTADMINSECRET123',
};
let testSmartStorageInstance: smartstorage.SmartStorage;
let adminClient: S3Client;
let tenantA: smartstorage.IBucketTenantDescriptor;
let tenantB: smartstorage.IBucketTenantDescriptor;
let tenantAClient: S3Client;
let tenantBClient: S3Client;
let oldTenantAClient: S3Client;
function createS3Client(
credential: smartstorage.IStorageCredential,
region = 'us-east-1',
): S3Client {
return new S3Client({
endpoint: `http://localhost:${TEST_PORT}`,
region,
credentials: {
accessKeyId: credential.accessKeyId,
secretAccessKey: credential.secretAccessKey,
},
forcePathStyle: true,
});
}
function createS3ClientFromDescriptor(
descriptor: smartstorage.IBucketTenantDescriptor,
): S3Client {
return new S3Client({
endpoint: `http://${descriptor.endpoint}:${descriptor.port}`,
region: descriptor.region,
credentials: {
accessKeyId: descriptor.accessKeyId,
secretAccessKey: descriptor.secretAccessKey,
},
forcePathStyle: true,
});
}
async function streamToString(stream: Readable): Promise<string> {
const chunks: Buffer[] = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk)));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
}
async function startStorage() {
testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({
server: {
port: TEST_PORT,
silent: true,
region: 'us-east-1',
},
storage: {
directory: STORAGE_DIR,
cleanSlate: false,
},
auth: {
enabled: true,
credentials: [ADMIN_CREDENTIAL],
},
});
adminClient = createS3Client(ADMIN_CREDENTIAL);
}
tap.test('bucket tenant client APIs require auth before IPC', async () => {
const storage = new smartstorage.SmartStorage({
auth: {
enabled: false,
credentials: [ADMIN_CREDENTIAL],
},
});
await expect(storage.listBucketTenants()).rejects.toThrow();
await expect(storage.getBucketTenantDescriptor({
bucketName: WORKAPP_A_BUCKET,
})).rejects.toThrow();
});
tap.test('setup: start storage and provision bucket tenants', async () => {
await rm(STORAGE_DIR, { recursive: true, force: true });
await startStorage();
tenantA = await testSmartStorageInstance.createBucketTenant({
bucketName: WORKAPP_A_BUCKET,
});
tenantB = await testSmartStorageInstance.createBucketTenant({
bucketName: WORKAPP_B_BUCKET,
});
tenantAClient = createS3ClientFromDescriptor(tenantA);
tenantBClient = createS3ClientFromDescriptor(tenantB);
});
tap.test('tenant descriptors expose app-ready S3 connection data', async () => {
expect(tenantA.endpoint).toEqual('localhost');
expect(tenantA.port).toEqual(TEST_PORT);
expect(tenantA.region).toEqual('us-east-1');
expect(tenantA.bucket).toEqual(WORKAPP_A_BUCKET);
expect(tenantA.bucketName).toEqual(WORKAPP_A_BUCKET);
expect(tenantA.accessKeyId).toBeTypeofString();
expect(tenantA.secretAccessKey).toBeTypeofString();
expect(tenantA.useSsl).toEqual(false);
expect(tenantA.env.S3_BUCKET).toEqual(WORKAPP_A_BUCKET);
expect(tenantA.env.AWS_ACCESS_KEY_ID).toEqual(tenantA.accessKeyId);
});
tap.test('listBucketTenants returns scoped credential metadata without secrets', async () => {
const tenants = await testSmartStorageInstance.listBucketTenants();
expect(tenants.length).toEqual(2);
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_A_BUCKET)).toEqual(true);
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_B_BUCKET)).toEqual(true);
expect((tenants[0] as any).secretAccessKey).toEqual(undefined);
});
tap.test('createBucketTenant validates credentials before creating buckets', async () => {
await expect(testSmartStorageInstance.createBucketTenant({
bucketName: DUPLICATE_TENANT_BUCKET,
accessKeyId: ADMIN_CREDENTIAL.accessKeyId,
secretAccessKey: 'DUPLICATESECRET123',
})).rejects.toThrow();
await expect(adminClient.send(new HeadBucketCommand({
Bucket: DUPLICATE_TENANT_BUCKET,
}))).rejects.toThrow();
});
tap.test('tenant credentials work with AWS SDK v3 for their assigned bucket', async () => {
const putA = await tenantAClient.send(new PutObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
Body: 'hello from tenant a',
ContentType: 'text/plain',
}));
expect(putA.$metadata.httpStatusCode).toEqual(200);
const putB = await tenantBClient.send(new PutObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
Body: 'hello from tenant b',
ContentType: 'text/plain',
}));
expect(putB.$metadata.httpStatusCode).toEqual(200);
const getA = await tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(getA.Body as Readable)).toEqual('hello from tenant a');
const listA = await tenantAClient.send(new ListObjectsV2Command({
Bucket: WORKAPP_A_BUCKET,
}));
expect(listA.Contents?.some((object) => object.Key === 'hello.txt')).toEqual(true);
});
tap.test('tenant credentials cannot access unrelated buckets', async () => {
await expect(tenantAClient.send(new ListBucketsCommand({}))).rejects.toThrow();
await expect(tenantAClient.send(new HeadBucketCommand({
Bucket: WORKAPP_B_BUCKET,
}))).rejects.toThrow();
await expect(tenantAClient.send(new PutObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'blocked-write.txt',
Body: 'blocked',
}))).rejects.toThrow();
await expect(tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
}))).rejects.toThrow();
await expect(tenantAClient.send(new DeleteObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
}))).rejects.toThrow();
await expect(tenantAClient.send(new CopyObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'copy-from-other-bucket.txt',
CopySource: `/${WORKAPP_B_BUCKET}/other.txt`,
}))).rejects.toThrow();
await expect(tenantBClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}))).rejects.toThrow();
});
tap.test('health and metrics expose running storage state', async () => {
const health = await testSmartStorageInstance.getHealth();
expect(health.running).toEqual(true);
expect(health.ok).toEqual(true);
expect(health.storageDirectory).toEqual(STORAGE_DIR);
expect(health.auth.enabled).toEqual(true);
expect(health.auth.tenantCredentialCount).toEqual(2);
expect(health.bucketCount >= 2).toEqual(true);
expect(health.objectCount >= 2).toEqual(true);
expect(health.totalBytes > 0).toEqual(true);
const metrics = await testSmartStorageInstance.getMetrics();
expect(metrics.tenantCredentialCount).toEqual(2);
expect(metrics.prometheusText).toMatch(/smartstorage_tenant_credentials_total 2/);
});
tap.test('export/import targets one bucket without unrelated tenant data', async () => {
const bucketExport = await testSmartStorageInstance.exportBucket({
bucketName: WORKAPP_A_BUCKET,
});
expect(bucketExport.format).toEqual('smartstorage.bucket.v1');
expect(bucketExport.bucketName).toEqual(WORKAPP_A_BUCKET);
expect(bucketExport.objects.some((object) => object.key === 'hello.txt')).toEqual(true);
expect(bucketExport.objects.some((object) => object.key === 'other.txt')).toEqual(false);
await testSmartStorageInstance.importBucket({
bucketName: RESTORE_BUCKET,
source: bucketExport,
});
const restoredObject = await adminClient.send(new GetObjectCommand({
Bucket: RESTORE_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(restoredObject.Body as Readable)).toEqual('hello from tenant a');
const restoredObjects = await adminClient.send(new ListObjectsV2Command({
Bucket: RESTORE_BUCKET,
}));
expect(restoredObjects.Contents?.some((object) => object.Key === 'other.txt')).toEqual(false);
const corruptExport: smartstorage.IBucketExport = {
...bucketExport,
objects: bucketExport.objects.map((object, index) => index === 0 ? {
...object,
size: object.size + 1,
} : object),
};
await expect(testSmartStorageInstance.importBucket({
bucketName: CORRUPT_RESTORE_BUCKET,
source: corruptExport,
})).rejects.toThrow();
await expect(adminClient.send(new HeadBucketCommand({
Bucket: CORRUPT_RESTORE_BUCKET,
}))).rejects.toThrow();
});
tap.test('bucket policies persist across restart', async () => {
await testSmartStorageInstance.createBucket(POLICY_BUCKET);
const policy = JSON.stringify({
Version: '2012-10-17',
Statement: [{
Sid: 'TenantPolicyPersistence',
Effect: 'Allow',
Principal: { AWS: ADMIN_CREDENTIAL.accessKeyId },
Action: ['s3:GetBucketPolicy', 's3:PutBucketPolicy', 's3:ListBucket'],
Resource: `arn:aws:s3:::${POLICY_BUCKET}`,
}],
});
const response = await adminClient.send(new PutBucketPolicyCommand({
Bucket: POLICY_BUCKET,
Policy: policy,
}));
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('credential rotation replaces the active tenant credential', async () => {
oldTenantAClient = tenantAClient;
tenantA = await testSmartStorageInstance.rotateBucketTenantCredentials({
bucketName: WORKAPP_A_BUCKET,
});
tenantAClient = createS3ClientFromDescriptor(tenantA);
await expect(oldTenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}))).rejects.toThrow();
const getA = await tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(getA.Body as Readable)).toEqual('hello from tenant a');
const descriptor = await testSmartStorageInstance.getBucketTenantDescriptor({
bucketName: WORKAPP_A_BUCKET,
});
expect(descriptor.accessKeyId).toEqual(tenantA.accessKeyId);
expect(descriptor.secretAccessKey).toEqual(tenantA.secretAccessKey);
});
tap.test('runtime credentials survive restart', async () => {
await testSmartStorageInstance.stop();
await startStorage();
const persistedTenantA = await testSmartStorageInstance.getBucketTenantDescriptor({
bucketName: WORKAPP_A_BUCKET,
});
expect(persistedTenantA.accessKeyId).toEqual(tenantA.accessKeyId);
expect(persistedTenantA.secretAccessKey).toEqual(tenantA.secretAccessKey);
tenantAClient = createS3ClientFromDescriptor(persistedTenantA);
const getA = await tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(getA.Body as Readable)).toEqual('hello from tenant a');
const tenants = await testSmartStorageInstance.listBucketTenants();
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_A_BUCKET)).toEqual(true);
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_B_BUCKET)).toEqual(true);
const policyResponse = await adminClient.send(new GetBucketPolicyCommand({
Bucket: POLICY_BUCKET,
}));
expect(policyResponse.Policy?.includes('TenantPolicyPersistence')).toEqual(true);
});
tap.test('deleteBucketTenant refuses buckets without tenant credentials', async () => {
await expect(testSmartStorageInstance.deleteBucketTenant({
bucketName: POLICY_BUCKET,
})).rejects.toThrow();
const headAfterRefusedDelete = await adminClient.send(new HeadBucketCommand({
Bucket: POLICY_BUCKET,
}));
expect(headAfterRefusedDelete.$metadata.httpStatusCode).toEqual(200);
});
tap.test('deleteBucketTenant can revoke credentials and delete tenant buckets', async () => {
const revokeOnlyTenant = await testSmartStorageInstance.createBucketTenant({
bucketName: REVOKE_ONLY_BUCKET,
});
const revokeOnlyClient = createS3ClientFromDescriptor(revokeOnlyTenant);
await revokeOnlyClient.send(new PutObjectCommand({
Bucket: REVOKE_ONLY_BUCKET,
Key: 'revoke-only.txt',
Body: 'revocation target',
}));
await testSmartStorageInstance.deleteBucketTenant({
bucketName: REVOKE_ONLY_BUCKET,
accessKeyId: revokeOnlyTenant.accessKeyId,
});
await expect(revokeOnlyClient.send(new GetObjectCommand({
Bucket: REVOKE_ONLY_BUCKET,
Key: 'revoke-only.txt',
}))).rejects.toThrow();
const headAfterRevoke = await adminClient.send(new HeadBucketCommand({
Bucket: REVOKE_ONLY_BUCKET,
}));
expect(headAfterRevoke.$metadata.httpStatusCode).toEqual(200);
await testSmartStorageInstance.deleteBucketTenant({
bucketName: WORKAPP_B_BUCKET,
});
await expect(tenantBClient.send(new GetObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
}))).rejects.toThrow();
await expect(adminClient.send(new HeadBucketCommand({
Bucket: WORKAPP_B_BUCKET,
}))).rejects.toThrow();
const tenants = await testSmartStorageInstance.listBucketTenants();
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_B_BUCKET)).toEqual(false);
});
tap.test('teardown: stop storage server', async () => {
await testSmartStorageInstance.stop();
});
export default tap.start();
+1 -1
View File
@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstorage', name: '@push.rocks/smartstorage',
version: '6.4.1', version: '6.5.1',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.' description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
} }
+297
View File
@@ -7,10 +7,14 @@ import * as paths from './paths.js';
export interface IStorageCredential { export interface IStorageCredential {
accessKeyId: string; accessKeyId: string;
secretAccessKey: string; secretAccessKey: string;
bucketName?: string;
region?: string;
} }
export interface IStorageCredentialMetadata { export interface IStorageCredentialMetadata {
accessKeyId: string; accessKeyId: string;
bucketName?: string;
region?: string;
} }
/** /**
@@ -155,6 +159,88 @@ export interface IStorageStats {
storageLocations?: IStorageLocationSummary[]; storageLocations?: IStorageLocationSummary[];
} }
export interface IBucketTenantInput {
bucketName: string;
accessKeyId?: string;
secretAccessKey?: string;
region?: string;
}
export interface IDeleteBucketTenantInput {
bucketName: string;
accessKeyId?: string;
}
export interface IBucketTenantMetadata {
bucketName: string;
accessKeyId: string;
region?: string;
}
export interface IBucketTenantDescriptor extends plugins.tsclass.storage.IS3Descriptor {
endpoint: string;
port: number;
region: string;
bucket: string;
bucketName: string;
accessKeyId: string;
secretAccessKey: string;
accessKey: string;
accessSecret: string;
useSsl: boolean;
ssl: boolean;
env: Record<string, string>;
}
export interface IBucketExportObject {
key: string;
size: number;
md5: string;
metadata: Record<string, string>;
dataHex: string;
}
export interface IBucketExport {
format: 'smartstorage.bucket.v1';
bucketName: string;
exportedAt: number;
objects: IBucketExportObject[];
}
export interface IExportBucketInput {
bucketName: string;
}
export interface IImportBucketInput {
bucketName: string;
source: IBucketExport;
}
export interface ISmartStorageHealth {
ok: boolean;
running: boolean;
storageDirectory: string;
auth: {
enabled: boolean;
credentialCount: number;
tenantCredentialCount: number;
};
bucketCount: number;
objectCount: number;
totalBytes: number;
cluster: IClusterHealth;
}
export interface ISmartStorageMetrics {
bucketCount: number;
objectCount: number;
totalBytes: number;
authCredentialCount: number;
tenantCredentialCount: number;
clusterEnabled: boolean;
prometheusText: string;
}
/** /**
* Known peer status from the local node's current cluster view. * Known peer status from the local node's current cluster view.
*/ */
@@ -306,6 +392,14 @@ function mergeConfig(userConfig: ISmartStorageConfig): Required<ISmartStorageCon
} as Required<ISmartStorageConfig>; } as Required<ISmartStorageConfig>;
} }
function createAccessKeyId(): string {
return `SS${plugins.crypto.randomBytes(10).toString('hex').toUpperCase()}`;
}
function createSecretAccessKey(): string {
return plugins.crypto.randomBytes(32).toString('hex');
}
/** /**
* IPC command type map for RustBridge * IPC command type map for RustBridge
*/ */
@@ -313,6 +407,35 @@ type TRustStorageCommands = {
start: { params: { config: Required<ISmartStorageConfig> }; result: {} }; start: { params: { config: Required<ISmartStorageConfig> }; result: {} };
stop: { params: {}; result: {} }; stop: { params: {}; result: {} };
createBucket: { params: { name: string }; result: {} }; createBucket: { params: { name: string }; result: {} };
createBucketTenant: {
params: {
bucketName: string;
accessKeyId: string;
secretAccessKey: string;
region?: string;
};
result: IStorageCredential;
};
deleteBucketTenant: {
params: { bucketName: string; accessKeyId?: string };
result: {};
};
rotateBucketTenantCredentials: {
params: {
bucketName: string;
accessKeyId: string;
secretAccessKey: string;
region?: string;
};
result: IStorageCredential;
};
listBucketTenants: { params: {}; result: IBucketTenantMetadata[] };
getBucketTenantCredential: {
params: { bucketName: string };
result: IStorageCredential;
};
exportBucket: { params: { bucketName: string }; result: IBucketExport };
importBucket: { params: { bucketName: string; source: IBucketExport }; result: {} };
getStorageStats: { params: {}; result: IStorageStats }; getStorageStats: { params: {}; result: IStorageStats };
listBucketSummaries: { params: {}; result: IBucketSummary[] }; listBucketSummaries: { params: {}; result: IBucketSummary[] };
listCredentials: { params: {}; result: IStorageCredentialMetadata[] }; listCredentials: { params: {}; result: IStorageCredentialMetadata[] };
@@ -334,6 +457,7 @@ export class SmartStorage {
// INSTANCE // INSTANCE
public config: Required<ISmartStorageConfig>; public config: Required<ISmartStorageConfig>;
private bridge: InstanceType<typeof plugins.RustBridge<TRustStorageCommands>>; private bridge: InstanceType<typeof plugins.RustBridge<TRustStorageCommands>>;
private running = false;
constructor(configArg: ISmartStorageConfig = {}) { constructor(configArg: ISmartStorageConfig = {}) {
this.config = mergeConfig(configArg); this.config = mergeConfig(configArg);
@@ -353,6 +477,7 @@ export class SmartStorage {
throw new Error('Failed to spawn ruststorage binary. Make sure it is compiled (pnpm build).'); throw new Error('Failed to spawn ruststorage binary. Make sure it is compiled (pnpm build).');
} }
await this.bridge.sendCommand('start', { config: this.config }); await this.bridge.sendCommand('start', { config: this.config });
this.running = true;
if (!this.config.server.silent) { if (!this.config.server.silent) {
console.log('storage server is running'); console.log('storage server is running');
@@ -382,11 +507,112 @@ export class SmartStorage {
}; };
} }
private getEndpoint(): string {
return this.config.server.address === '0.0.0.0' ? 'localhost' : this.config.server.address!;
}
private buildBucketTenantDescriptor(
credential: IStorageCredential,
bucketNameArg: string,
): IBucketTenantDescriptor {
const bucketName = credential.bucketName || bucketNameArg;
const region = credential.region || this.config.server.region || 'us-east-1';
const endpoint = this.getEndpoint();
const port = this.config.server.port!;
const useSsl = false;
return {
endpoint,
port,
region,
bucket: bucketName,
bucketName,
accessKeyId: credential.accessKeyId,
secretAccessKey: credential.secretAccessKey,
accessKey: credential.accessKeyId,
accessSecret: credential.secretAccessKey,
useSsl,
ssl: useSsl,
env: {
S3_ENDPOINT: endpoint,
S3_PORT: String(port),
S3_REGION: region,
S3_BUCKET: bucketName,
S3_ACCESS_KEY_ID: credential.accessKeyId,
S3_SECRET_ACCESS_KEY: credential.secretAccessKey,
S3_USE_SSL: String(useSsl),
AWS_ACCESS_KEY_ID: credential.accessKeyId,
AWS_SECRET_ACCESS_KEY: credential.secretAccessKey,
AWS_REGION: region,
},
};
}
private assertTenantAuthEnabled(): void {
if (!this.config.auth.enabled) {
throw new Error('Bucket tenant APIs require auth.enabled=true.');
}
}
public async createBucket(bucketNameArg: string) { public async createBucket(bucketNameArg: string) {
await this.bridge.sendCommand('createBucket', { name: bucketNameArg }); await this.bridge.sendCommand('createBucket', { name: bucketNameArg });
return { name: bucketNameArg }; return { name: bucketNameArg };
} }
public async createBucketTenant(
tenantArg: IBucketTenantInput,
): Promise<IBucketTenantDescriptor> {
this.assertTenantAuthEnabled();
const credential = await this.bridge.sendCommand('createBucketTenant', {
bucketName: tenantArg.bucketName,
accessKeyId: tenantArg.accessKeyId || createAccessKeyId(),
secretAccessKey: tenantArg.secretAccessKey || createSecretAccessKey(),
region: tenantArg.region || this.config.server.region,
});
return this.buildBucketTenantDescriptor(credential, tenantArg.bucketName);
}
public async deleteBucketTenant(tenantArg: IDeleteBucketTenantInput): Promise<void> {
this.assertTenantAuthEnabled();
await this.bridge.sendCommand('deleteBucketTenant', tenantArg);
}
public async rotateBucketTenantCredentials(
tenantArg: IBucketTenantInput,
): Promise<IBucketTenantDescriptor> {
this.assertTenantAuthEnabled();
const credential = await this.bridge.sendCommand('rotateBucketTenantCredentials', {
bucketName: tenantArg.bucketName,
accessKeyId: tenantArg.accessKeyId || createAccessKeyId(),
secretAccessKey: tenantArg.secretAccessKey || createSecretAccessKey(),
region: tenantArg.region || this.config.server.region,
});
return this.buildBucketTenantDescriptor(credential, tenantArg.bucketName);
}
public async listBucketTenants(): Promise<IBucketTenantMetadata[]> {
this.assertTenantAuthEnabled();
return this.bridge.sendCommand('listBucketTenants', {});
}
public async getBucketTenantDescriptor(optionsArg: {
bucketName: string;
}): Promise<IBucketTenantDescriptor> {
this.assertTenantAuthEnabled();
const credential = await this.bridge.sendCommand('getBucketTenantCredential', {
bucketName: optionsArg.bucketName,
});
return this.buildBucketTenantDescriptor(credential, optionsArg.bucketName);
}
public async exportBucket(optionsArg: IExportBucketInput): Promise<IBucketExport> {
return this.bridge.sendCommand('exportBucket', { bucketName: optionsArg.bucketName });
}
public async importBucket(optionsArg: IImportBucketInput): Promise<void> {
await this.bridge.sendCommand('importBucket', optionsArg);
}
public async getStorageStats(): Promise<IStorageStats> { public async getStorageStats(): Promise<IStorageStats> {
return this.bridge.sendCommand('getStorageStats', {}); return this.bridge.sendCommand('getStorageStats', {});
} }
@@ -408,8 +634,79 @@ export class SmartStorage {
return this.bridge.sendCommand('getClusterHealth', {}); return this.bridge.sendCommand('getClusterHealth', {});
} }
public async getHealth(): Promise<ISmartStorageHealth> {
if (!this.running) {
return {
ok: false,
running: false,
storageDirectory: this.config.storage.directory || paths.bucketsDir,
auth: {
enabled: this.config.auth.enabled,
credentialCount: this.config.auth.credentials.length,
tenantCredentialCount: 0,
},
bucketCount: 0,
objectCount: 0,
totalBytes: 0,
cluster: { enabled: false },
};
}
const [stats, credentials, tenants, cluster] = await Promise.all([
this.getStorageStats(),
this.listCredentials(),
this.config.auth.enabled ? this.listBucketTenants() : Promise.resolve([]),
this.getClusterHealth(),
]);
return {
ok: true,
running: true,
storageDirectory: stats.storageDirectory,
auth: {
enabled: this.config.auth.enabled,
credentialCount: credentials.length,
tenantCredentialCount: tenants.length,
},
bucketCount: stats.bucketCount,
objectCount: stats.totalObjectCount,
totalBytes: stats.totalStorageBytes,
cluster,
};
}
public async getMetrics(): Promise<ISmartStorageMetrics> {
const health = await this.getHealth();
const clusterEnabled = health.cluster.enabled;
return {
bucketCount: health.bucketCount,
objectCount: health.objectCount,
totalBytes: health.totalBytes,
authCredentialCount: health.auth.credentialCount,
tenantCredentialCount: health.auth.tenantCredentialCount,
clusterEnabled,
prometheusText: [
'# HELP smartstorage_buckets_total Runtime bucket count.',
'# TYPE smartstorage_buckets_total gauge',
`smartstorage_buckets_total ${health.bucketCount}`,
'# HELP smartstorage_objects_total Runtime object count.',
'# TYPE smartstorage_objects_total gauge',
`smartstorage_objects_total ${health.objectCount}`,
'# HELP smartstorage_storage_bytes_total Runtime storage bytes.',
'# TYPE smartstorage_storage_bytes_total gauge',
`smartstorage_storage_bytes_total ${health.totalBytes}`,
'# HELP smartstorage_tenant_credentials_total Scoped bucket tenant credential count.',
'# TYPE smartstorage_tenant_credentials_total gauge',
`smartstorage_tenant_credentials_total ${health.auth.tenantCredentialCount}`,
'# HELP smartstorage_cluster_enabled Cluster mode enabled.',
'# TYPE smartstorage_cluster_enabled gauge',
`smartstorage_cluster_enabled ${clusterEnabled ? 1 : 0}`,
].join('\n'),
};
}
public async stop() { public async stop() {
await this.bridge.sendCommand('stop', {}); await this.bridge.sendCommand('stop', {});
this.bridge.kill(); this.bridge.kill();
this.running = false;
} }
} }
+2 -1
View File
@@ -1,7 +1,8 @@
// node native // node native
import * as crypto from 'crypto';
import * as path from 'path'; import * as path from 'path';
export { path }; export { crypto, path };
// @push.rocks scope // @push.rocks scope
import * as smartpath from '@push.rocks/smartpath'; import * as smartpath from '@push.rocks/smartpath';