Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8f1d025330 | |||
| b075de1ecd |
@@ -1,5 +1,20 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-05-02 - 6.5.1 - fix(bucket-tenants)
|
||||||
|
make tenant lifecycle and bucket import validation safer
|
||||||
|
|
||||||
|
- validate exported object size and MD5 before creating and importing target buckets
|
||||||
|
- persist bucket-tenant credential changes with rollback when bucket creation or deletion fails
|
||||||
|
- require auth-enabled access and matching tenant credentials for tenant list, descriptor, and recursive delete flows
|
||||||
|
|
||||||
|
## Next - fix(bucket-tenants)
|
||||||
|
make tenant lifecycle and bucket import validation safer
|
||||||
|
|
||||||
|
- Validate bucket export object size and MD5 before creating/importing into target buckets.
|
||||||
|
- Persist tenant credentials through snapshot-aware helpers and roll back credential changes if bucket create/delete fails.
|
||||||
|
- Require matching tenant credentials before `deleteBucketTenant({ bucketName })` can recursively delete a bucket.
|
||||||
|
- Enforce `auth.enabled` consistently on tenant list/descriptor APIs and document metadata-only credential listings.
|
||||||
|
|
||||||
## 2026-05-02 - 6.5.0 - feat(bucket-tenants)
|
## 2026-05-02 - 6.5.0 - feat(bucket-tenants)
|
||||||
add persisted bucket-scoped tenant credentials with bucket export and import APIs
|
add persisted bucket-scoped tenant credentials with bucket export and import APIs
|
||||||
|
|
||||||
|
|||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartstorage",
|
"name": "@push.rocks/smartstorage",
|
||||||
"version": "6.5.0",
|
"version": "6.5.1",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
+6
-6
@@ -1,6 +1,6 @@
|
|||||||
# Project Hints for smartstorage
|
# Project Hints for smartstorage
|
||||||
|
|
||||||
## Current State (v6.4.0)
|
## Current State (v6.5.0)
|
||||||
|
|
||||||
- **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge
|
- **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge
|
||||||
- High-performance: streaming I/O, zero-copy, backpressure, range seek
|
- High-performance: streaming I/O, zero-copy, backpressure, range seek
|
||||||
@@ -16,7 +16,7 @@
|
|||||||
- Runtime credential listing and atomic replacement via the Rust bridge
|
- Runtime credential listing and atomic replacement via the Rust bridge
|
||||||
- Runtime credentials persist under `{storage}/.smartstorage/credentials.json`
|
- Runtime credentials persist under `{storage}/.smartstorage/credentials.json`
|
||||||
- Bucket tenant APIs provision scoped per-bucket credentials and enforce the scope before bucket-policy/default-auth authorization
|
- Bucket tenant APIs provision scoped per-bucket credentials and enforce the scope before bucket-policy/default-auth authorization
|
||||||
- Per-bucket export/import uses `smartstorage.bucket.v1` JSON with object payloads encoded per object
|
- Per-bucket export/import uses `smartstorage.bucket.v1` JSON with object payloads encoded per object and size/MD5 validation on import
|
||||||
- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/`
|
- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/`
|
||||||
- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics`
|
- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics`
|
||||||
- Runtime credential listing returns access-key metadata only; secrets are write-only
|
- Runtime credential listing returns access-key metadata only; secrets are write-only
|
||||||
@@ -48,7 +48,7 @@
|
|||||||
| `stop` | `{}` | Graceful shutdown |
|
| `stop` | `{}` | Graceful shutdown |
|
||||||
| `createBucket` | `{ name: string }` | Create bucket directory |
|
| `createBucket` | `{ name: string }` | Create bucket directory |
|
||||||
| `createBucketTenant` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Create bucket and scoped persisted credential |
|
| `createBucketTenant` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Create bucket and scoped persisted credential |
|
||||||
| `deleteBucketTenant` | `{ bucketName, accessKeyId? }` | Revoke scoped credential or delete tenant bucket recursively |
|
| `deleteBucketTenant` | `{ bucketName, accessKeyId? }` | Revoke scoped credential or delete a verified tenant bucket recursively |
|
||||||
| `rotateBucketTenantCredentials` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Replace scoped credential for one bucket |
|
| `rotateBucketTenantCredentials` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Replace scoped credential for one bucket |
|
||||||
| `listBucketTenants` | `{}` | Return scoped credential metadata |
|
| `listBucketTenants` | `{}` | Return scoped credential metadata |
|
||||||
| `getBucketTenantCredential` | `{ bucketName }` | Return one scoped credential including secret for descriptor generation |
|
| `getBucketTenantCredential` | `{ bucketName }` | Return one scoped credential including secret for descriptor generation |
|
||||||
@@ -56,7 +56,7 @@
|
|||||||
| `importBucket` | `{ bucketName, source }` | Import a `smartstorage.bucket.v1` bucket export |
|
| `importBucket` | `{ bucketName, source }` | Import a `smartstorage.bucket.v1` bucket export |
|
||||||
| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots |
|
| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots |
|
||||||
| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries |
|
| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries |
|
||||||
| `listCredentials` | `{}` | Return the active runtime auth credential set |
|
| `listCredentials` | `{}` | Return metadata for the active runtime auth credential set |
|
||||||
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
|
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
|
||||||
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
|
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
|
||||||
|
|
||||||
@@ -92,8 +92,8 @@
|
|||||||
|
|
||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337)
|
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health/metrics coverage (20 tests, auth disabled, port 3337)
|
||||||
- `test/test.bucket-tenants.node.ts` - bucket tenant provisioning, per-bucket isolation, restart persistence, export/import, policy persistence, rotation, revoke/delete, AWS SDK v3 compatibility (12 tests, port 3361)
|
- `test/test.bucket-tenants.node.ts` - bucket tenant provisioning, per-bucket isolation, restart persistence, export/import integrity, policy persistence, rotation, revoke/delete safeguards, AWS SDK v3 compatibility (15 tests, port 3361)
|
||||||
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
|
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
|
||||||
- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353)
|
- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353)
|
||||||
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
|
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
|
||||||
|
|||||||
@@ -271,7 +271,7 @@ const tenants = await storage.listBucketTenants();
|
|||||||
- `createBucketTenant()` creates the bucket if needed and stores a scoped credential for that bucket.
|
- `createBucketTenant()` creates the bucket if needed and stores a scoped credential for that bucket.
|
||||||
- `rotateBucketTenantCredentials()` replaces the active scoped credential for the bucket and persists the new credential.
|
- `rotateBucketTenantCredentials()` replaces the active scoped credential for the bucket and persists the new credential.
|
||||||
- `deleteBucketTenant({ bucketName, accessKeyId })` revokes one scoped credential and keeps the bucket.
|
- `deleteBucketTenant({ bucketName, accessKeyId })` revokes one scoped credential and keeps the bucket.
|
||||||
- `deleteBucketTenant({ bucketName })` revokes scoped credentials for the bucket and deletes the bucket contents recursively.
|
- `deleteBucketTenant({ bucketName })` revokes scoped credentials for an existing tenant bucket and deletes that bucket's contents recursively.
|
||||||
- Tenant credentials can list, read, write, and delete objects in their assigned bucket, but cannot list all buckets, access other buckets, copy from other buckets, delete buckets, or mutate bucket policies.
|
- Tenant credentials can list, read, write, and delete objects in their assigned bucket, but cannot list all buckets, access other buckets, copy from other buckets, delete buckets, or mutate bucket policies.
|
||||||
- Bucket tenant APIs require `auth.enabled: true`.
|
- Bucket tenant APIs require `auth.enabled: true`.
|
||||||
|
|
||||||
@@ -283,7 +283,7 @@ await storage.importBucket({ bucketName: 'workapp-123-restore', source: appBacku
|
|||||||
```
|
```
|
||||||
|
|
||||||
- `exportBucket()` returns a self-contained `smartstorage.bucket.v1` JSON export with only the selected bucket's objects and object metadata.
|
- `exportBucket()` returns a self-contained `smartstorage.bucket.v1` JSON export with only the selected bucket's objects and object metadata.
|
||||||
- `importBucket()` creates the target bucket if needed and restores the exported objects into that bucket.
|
- `importBucket()` validates object payload size and MD5 before creating the target bucket if needed, then restores the exported objects into that bucket.
|
||||||
- Exports do not include credentials, policies, or unrelated tenant data.
|
- Exports do not include credentials, policies, or unrelated tenant data.
|
||||||
|
|
||||||
## Health and Metrics APIs
|
## Health and Metrics APIs
|
||||||
@@ -645,7 +645,7 @@ Create a bucket tenant with a generated or supplied scoped credential. Options:
|
|||||||
|
|
||||||
#### `deleteBucketTenant(options): Promise<void>`
|
#### `deleteBucketTenant(options): Promise<void>`
|
||||||
|
|
||||||
Revoke a tenant credential or delete the full tenant bucket. Options: `{ bucketName, accessKeyId? }`.
|
Revoke a tenant credential or delete a bucket that still has tenant credentials. Options: `{ bucketName, accessKeyId? }`.
|
||||||
|
|
||||||
#### `rotateBucketTenantCredentials(options): Promise<IBucketTenantDescriptor>`
|
#### `rotateBucketTenantCredentials(options): Promise<IBucketTenantDescriptor>`
|
||||||
|
|
||||||
@@ -665,7 +665,7 @@ Export one bucket's objects and metadata into a `smartstorage.bucket.v1` JSON ob
|
|||||||
|
|
||||||
#### `importBucket(options): Promise<void>`
|
#### `importBucket(options): Promise<void>`
|
||||||
|
|
||||||
Import a `smartstorage.bucket.v1` JSON object into the target bucket. Options: `{ bucketName, source }`.
|
Import a `smartstorage.bucket.v1` JSON object into the target bucket after validating object size and MD5. Options: `{ bucketName, source }`.
|
||||||
|
|
||||||
#### `getStorageDescriptor(options?): Promise<IS3Descriptor>`
|
#### `getStorageDescriptor(options?): Promise<IS3Descriptor>`
|
||||||
|
|
||||||
@@ -687,9 +687,9 @@ Read cached logical bucket and object totals from the Rust runtime without issui
|
|||||||
|
|
||||||
Get per-bucket logical object counts and total payload sizes.
|
Get per-bucket logical object counts and total payload sizes.
|
||||||
|
|
||||||
#### `listCredentials(): Promise<IStorageCredential[]>`
|
#### `listCredentials(): Promise<IStorageCredentialMetadata[]>`
|
||||||
|
|
||||||
Return the currently active runtime credential set.
|
Return metadata for the currently active runtime credential set without `secretAccessKey` values.
|
||||||
|
|
||||||
#### `replaceCredentials(credentials: IStorageCredential[]): Promise<void>`
|
#### `replaceCredentials(credentials: IStorageCredential[]): Promise<void>`
|
||||||
|
|
||||||
|
|||||||
+106
-45
@@ -195,6 +195,17 @@ pub struct BucketTenantMetadata {
|
|||||||
pub region: Option<String>,
|
pub region: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct CredentialRemoval {
|
||||||
|
pub previous_credentials: Vec<Credential>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct CredentialReplacement {
|
||||||
|
pub credential: Credential,
|
||||||
|
pub previous_credentials: Vec<Credential>,
|
||||||
|
}
|
||||||
|
|
||||||
impl RuntimeCredentialStore {
|
impl RuntimeCredentialStore {
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
config: &AuthConfig,
|
config: &AuthConfig,
|
||||||
@@ -244,66 +255,54 @@ impl RuntimeCredentialStore {
|
|||||||
credentials: Vec<Credential>,
|
credentials: Vec<Credential>,
|
||||||
) -> Result<(), StorageError> {
|
) -> Result<(), StorageError> {
|
||||||
validate_credentials(&credentials)?;
|
validate_credentials(&credentials)?;
|
||||||
|
let mut credentials_guard = self.credentials.write().await;
|
||||||
self.persist_credentials(&credentials).await?;
|
self.persist_credentials(&credentials).await?;
|
||||||
*self.credentials.write().await = credentials;
|
*credentials_guard = credentials;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn replace_bucket_tenant_credential(
|
pub async fn replace_bucket_tenant_credential(
|
||||||
&self,
|
&self,
|
||||||
bucket_name: &str,
|
bucket_name: &str,
|
||||||
mut credential: Credential,
|
credential: Credential,
|
||||||
) -> Result<Credential, StorageError> {
|
) -> Result<Credential, StorageError> {
|
||||||
validate_bucket_scope(bucket_name)?;
|
let replacement = self
|
||||||
credential.bucket_name = Some(bucket_name.to_string());
|
.replace_bucket_tenant_credential_with_snapshot(bucket_name, credential)
|
||||||
|
.await?;
|
||||||
|
Ok(replacement.credential)
|
||||||
|
}
|
||||||
|
|
||||||
let mut credentials = self.credentials.read().await.clone();
|
pub async fn replace_bucket_tenant_credential_with_snapshot(
|
||||||
if credentials.iter().any(|existing| {
|
&self,
|
||||||
existing.access_key_id == credential.access_key_id
|
bucket_name: &str,
|
||||||
&& existing.bucket_name.as_deref() != Some(bucket_name)
|
credential: Credential,
|
||||||
}) {
|
) -> Result<CredentialReplacement, StorageError> {
|
||||||
return Err(StorageError::invalid_request(
|
let mut credentials_guard = self.credentials.write().await;
|
||||||
"Credential accessKeyId is already assigned to another principal.",
|
let previous_credentials = credentials_guard.clone();
|
||||||
));
|
let (credential, updated_credentials) =
|
||||||
}
|
prepare_bucket_tenant_replacement(bucket_name, credential, &credentials_guard)?;
|
||||||
|
self.persist_credentials(&updated_credentials).await?;
|
||||||
credentials.retain(|existing| existing.bucket_name.as_deref() != Some(bucket_name));
|
*credentials_guard = updated_credentials;
|
||||||
credentials.push(credential.clone());
|
Ok(CredentialReplacement {
|
||||||
validate_credentials(&credentials)?;
|
credential,
|
||||||
self.persist_credentials(&credentials).await?;
|
previous_credentials,
|
||||||
*self.credentials.write().await = credentials;
|
})
|
||||||
Ok(credential)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_bucket_tenant_credentials(
|
pub async fn remove_bucket_tenant_credentials(
|
||||||
&self,
|
&self,
|
||||||
bucket_name: &str,
|
bucket_name: &str,
|
||||||
access_key_id: Option<&str>,
|
access_key_id: Option<&str>,
|
||||||
) -> Result<usize, StorageError> {
|
) -> Result<CredentialRemoval, StorageError> {
|
||||||
validate_bucket_scope(bucket_name)?;
|
let mut credentials_guard = self.credentials.write().await;
|
||||||
let mut credentials = self.credentials.read().await.clone();
|
let previous_credentials = credentials_guard.clone();
|
||||||
let before = credentials.len();
|
let (_removed, updated_credentials) =
|
||||||
credentials.retain(|credential| {
|
prepare_bucket_tenant_removal(bucket_name, access_key_id, &credentials_guard)?;
|
||||||
if credential.bucket_name.as_deref() != Some(bucket_name) {
|
self.persist_credentials(&updated_credentials).await?;
|
||||||
return true;
|
*credentials_guard = updated_credentials;
|
||||||
}
|
Ok(CredentialRemoval {
|
||||||
|
previous_credentials,
|
||||||
if let Some(access_key_id) = access_key_id {
|
})
|
||||||
credential.access_key_id != access_key_id
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let removed = before.saturating_sub(credentials.len());
|
|
||||||
if credentials.is_empty() {
|
|
||||||
return Err(StorageError::invalid_request(
|
|
||||||
"Cannot remove the last active credential.",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
self.persist_credentials(&credentials).await?;
|
|
||||||
*self.credentials.write().await = credentials;
|
|
||||||
Ok(removed)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_bucket_tenants(&self) -> Vec<BucketTenantMetadata> {
|
pub async fn list_bucket_tenants(&self) -> Vec<BucketTenantMetadata> {
|
||||||
@@ -375,6 +374,68 @@ fn validate_bucket_scope(bucket_name: &str) -> Result<(), StorageError> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn prepare_bucket_tenant_replacement(
|
||||||
|
bucket_name: &str,
|
||||||
|
mut credential: Credential,
|
||||||
|
credentials: &[Credential],
|
||||||
|
) -> Result<(Credential, Vec<Credential>), StorageError> {
|
||||||
|
validate_bucket_scope(bucket_name)?;
|
||||||
|
credential.bucket_name = Some(bucket_name.to_string());
|
||||||
|
|
||||||
|
if credentials.iter().any(|existing| {
|
||||||
|
existing.access_key_id == credential.access_key_id
|
||||||
|
&& existing.bucket_name.as_deref() != Some(bucket_name)
|
||||||
|
}) {
|
||||||
|
return Err(StorageError::invalid_request(
|
||||||
|
"Credential accessKeyId is already assigned to another principal.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut updated_credentials = credentials.to_vec();
|
||||||
|
updated_credentials.retain(|existing| existing.bucket_name.as_deref() != Some(bucket_name));
|
||||||
|
updated_credentials.push(credential.clone());
|
||||||
|
validate_credentials(&updated_credentials)?;
|
||||||
|
|
||||||
|
Ok((credential, updated_credentials))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn prepare_bucket_tenant_removal(
|
||||||
|
bucket_name: &str,
|
||||||
|
access_key_id: Option<&str>,
|
||||||
|
credentials: &[Credential],
|
||||||
|
) -> Result<(usize, Vec<Credential>), StorageError> {
|
||||||
|
validate_bucket_scope(bucket_name)?;
|
||||||
|
|
||||||
|
let mut updated_credentials = credentials.to_vec();
|
||||||
|
let before = updated_credentials.len();
|
||||||
|
updated_credentials.retain(|credential| {
|
||||||
|
if credential.bucket_name.as_deref() != Some(bucket_name) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(access_key_id) = access_key_id {
|
||||||
|
credential.access_key_id != access_key_id
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let removed = before.saturating_sub(updated_credentials.len());
|
||||||
|
if removed == 0 {
|
||||||
|
return Err(StorageError::invalid_request(
|
||||||
|
"No matching bucket tenant credential exists.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if updated_credentials.is_empty() {
|
||||||
|
return Err(StorageError::invalid_request(
|
||||||
|
"Cannot remove the last active credential.",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((removed, updated_credentials))
|
||||||
|
}
|
||||||
|
|
||||||
fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> {
|
fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> {
|
||||||
if credentials.is_empty() {
|
if credentials.is_empty() {
|
||||||
return Err(StorageError::invalid_request(
|
return Err(StorageError::invalid_request(
|
||||||
|
|||||||
@@ -80,11 +80,7 @@ impl ErasureCoder {
|
|||||||
let total = self.config.total_shards();
|
let total = self.config.total_shards();
|
||||||
|
|
||||||
if shards.len() != total {
|
if shards.len() != total {
|
||||||
anyhow::bail!(
|
anyhow::bail!("Expected {} shards, got {}", total, shards.len());
|
||||||
"Expected {} shards, got {}",
|
|
||||||
total,
|
|
||||||
shards.len()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let available = shards.iter().filter(|s| s.is_some()).count();
|
let available = shards.iter().filter(|s| s.is_some()).count();
|
||||||
@@ -159,7 +155,8 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_decode_with_missing_shards() {
|
fn test_decode_with_missing_shards() {
|
||||||
let coder = ErasureCoder::new(&test_config()).unwrap();
|
let coder = ErasureCoder::new(&test_config()).unwrap();
|
||||||
let original = b"Testing reconstruction with missing shards - this should work with 4 of 6.";
|
let original =
|
||||||
|
b"Testing reconstruction with missing shards - this should work with 4 of 6.";
|
||||||
|
|
||||||
let shards = coder.encode_chunk(original).unwrap();
|
let shards = coder.encode_chunk(original).unwrap();
|
||||||
|
|
||||||
|
|||||||
@@ -166,12 +166,7 @@ impl HealingService {
|
|||||||
Ok(stats)
|
Ok(stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn heal_bucket(
|
async fn heal_bucket(&self, bucket: &str, offline_nodes: &[String], stats: &mut HealStats) {
|
||||||
&self,
|
|
||||||
bucket: &str,
|
|
||||||
offline_nodes: &[String],
|
|
||||||
stats: &mut HealStats,
|
|
||||||
) {
|
|
||||||
let bucket_dir = self.manifest_dir.join(bucket);
|
let bucket_dir = self.manifest_dir.join(bucket);
|
||||||
let manifests = match self.collect_manifests(&bucket_dir).await {
|
let manifests = match self.collect_manifests(&bucket_dir).await {
|
||||||
Ok(m) => m,
|
Ok(m) => m,
|
||||||
@@ -264,10 +259,10 @@ impl HealingService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reconstruct all shards
|
// Reconstruct all shards
|
||||||
let reconstructed = match self.erasure_coder.decode_chunk(
|
let reconstructed = match self
|
||||||
&mut shards,
|
.erasure_coder
|
||||||
chunk.data_size,
|
.decode_chunk(&mut shards, chunk.data_size)
|
||||||
) {
|
{
|
||||||
Ok(_) => true,
|
Ok(_) => true,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!(
|
tracing::error!(
|
||||||
@@ -361,7 +356,8 @@ impl HealingService {
|
|||||||
/// Collect all manifests under a bucket directory.
|
/// Collect all manifests under a bucket directory.
|
||||||
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
|
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
|
||||||
let mut manifests = Vec::new();
|
let mut manifests = Vec::new();
|
||||||
self.collect_manifests_recursive(dir, &mut manifests).await?;
|
self.collect_manifests_recursive(dir, &mut manifests)
|
||||||
|
.await?;
|
||||||
Ok(manifests)
|
Ok(manifests)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,8 +7,7 @@ use tokio::sync::Mutex;
|
|||||||
|
|
||||||
use super::drive_manager::{DriveManager, DriveStatus};
|
use super::drive_manager::{DriveManager, DriveStatus};
|
||||||
use super::protocol::{
|
use super::protocol::{
|
||||||
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage,
|
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage, NodeInfo,
|
||||||
NodeInfo,
|
|
||||||
};
|
};
|
||||||
use super::quic_transport::QuicTransport;
|
use super::quic_transport::QuicTransport;
|
||||||
use super::state::ClusterState;
|
use super::state::ClusterState;
|
||||||
@@ -49,7 +48,11 @@ impl MembershipManager {
|
|||||||
|
|
||||||
/// Join the cluster by contacting seed nodes.
|
/// Join the cluster by contacting seed nodes.
|
||||||
/// Sends a JoinRequest to each seed node until one accepts.
|
/// Sends a JoinRequest to each seed node until one accepts.
|
||||||
pub async fn join_cluster(&self, seed_nodes: &[String], allow_bootstrap_on_failure: bool) -> Result<()> {
|
pub async fn join_cluster(
|
||||||
|
&self,
|
||||||
|
seed_nodes: &[String],
|
||||||
|
allow_bootstrap_on_failure: bool,
|
||||||
|
) -> Result<()> {
|
||||||
if seed_nodes.is_empty() {
|
if seed_nodes.is_empty() {
|
||||||
tracing::info!("No seed nodes configured, starting as initial cluster node");
|
tracing::info!("No seed nodes configured, starting as initial cluster node");
|
||||||
self.state.add_node(self.local_node_info.clone()).await;
|
self.state.add_node(self.local_node_info.clone()).await;
|
||||||
@@ -84,14 +87,13 @@ impl MembershipManager {
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
anyhow::bail!("Could not reach any configured seed nodes; refusing unsafe cluster bootstrap")
|
anyhow::bail!(
|
||||||
|
"Could not reach any configured seed nodes; refusing unsafe cluster bootstrap"
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_join(&self, addr: SocketAddr) -> Result<()> {
|
async fn try_join(&self, addr: SocketAddr) -> Result<()> {
|
||||||
let conn = self
|
let conn = self.transport.get_connection("seed", addr).await?;
|
||||||
.transport
|
|
||||||
.get_connection("seed", addr)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
let request = ClusterRequest::JoinRequest(JoinRequestMessage {
|
let request = ClusterRequest::JoinRequest(JoinRequestMessage {
|
||||||
node_info: self.local_node_info.clone(),
|
node_info: self.local_node_info.clone(),
|
||||||
@@ -120,10 +122,7 @@ impl MembershipManager {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
anyhow::bail!(
|
anyhow::bail!("Join rejected: {}", join_resp.error.unwrap_or_default())
|
||||||
"Join rejected: {}",
|
|
||||||
join_resp.error.unwrap_or_default()
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ClusterResponse::Error(e) => {
|
ClusterResponse::Error(e) => {
|
||||||
|
|||||||
@@ -113,11 +113,17 @@ mod tests {
|
|||||||
|
|
||||||
// Set 0 should interleave across nodes
|
// Set 0 should interleave across nodes
|
||||||
let set0_nodes: Vec<&str> = sets[0].drives.iter().map(|d| d.node_id.as_str()).collect();
|
let set0_nodes: Vec<&str> = sets[0].drives.iter().map(|d| d.node_id.as_str()).collect();
|
||||||
assert_eq!(set0_nodes, vec!["node1", "node2", "node3", "node1", "node2", "node3"]);
|
assert_eq!(
|
||||||
|
set0_nodes,
|
||||||
|
vec!["node1", "node2", "node3", "node1", "node2", "node3"]
|
||||||
|
);
|
||||||
|
|
||||||
// Set 1 should also interleave
|
// Set 1 should also interleave
|
||||||
let set1_nodes: Vec<&str> = sets[1].drives.iter().map(|d| d.node_id.as_str()).collect();
|
let set1_nodes: Vec<&str> = sets[1].drives.iter().map(|d| d.node_id.as_str()).collect();
|
||||||
assert_eq!(set1_nodes, vec!["node1", "node2", "node3", "node1", "node2", "node3"]);
|
assert_eq!(
|
||||||
|
set1_nodes,
|
||||||
|
vec!["node1", "node2", "node3", "node1", "node2", "node3"]
|
||||||
|
);
|
||||||
|
|
||||||
// Drive indices should be different between sets
|
// Drive indices should be different between sets
|
||||||
let set0_drives: Vec<u32> = sets[0].drives.iter().map(|d| d.drive_index).collect();
|
let set0_drives: Vec<u32> = sets[0].drives.iter().map(|d| d.drive_index).collect();
|
||||||
@@ -129,10 +135,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_form_erasure_sets_remainder() {
|
fn test_form_erasure_sets_remainder() {
|
||||||
// 2 nodes, 3 drives each, 4 shards => 1 set (2 drives left over)
|
// 2 nodes, 3 drives each, 4 shards => 1 set (2 drives left over)
|
||||||
let nodes = vec![
|
let nodes = vec![("a".to_string(), 3), ("b".to_string(), 3)];
|
||||||
("a".to_string(), 3),
|
|
||||||
("b".to_string(), 3),
|
|
||||||
];
|
|
||||||
let sets = form_erasure_sets(&nodes, 4);
|
let sets = form_erasure_sets(&nodes, 4);
|
||||||
assert_eq!(sets.len(), 1);
|
assert_eq!(sets.len(), 1);
|
||||||
assert_eq!(sets[0].drives.len(), 4);
|
assert_eq!(sets[0].drives.len(), 4);
|
||||||
|
|||||||
@@ -13,7 +13,6 @@ pub enum ClusterRequest {
|
|||||||
// ============================
|
// ============================
|
||||||
// Shard operations
|
// Shard operations
|
||||||
// ============================
|
// ============================
|
||||||
|
|
||||||
/// Write a shard to a specific drive on the target node.
|
/// Write a shard to a specific drive on the target node.
|
||||||
/// Shard data follows after this header on the same stream.
|
/// Shard data follows after this header on the same stream.
|
||||||
ShardWrite(ShardWriteRequest),
|
ShardWrite(ShardWriteRequest),
|
||||||
@@ -30,7 +29,6 @@ pub enum ClusterRequest {
|
|||||||
// ============================
|
// ============================
|
||||||
// Manifest operations
|
// Manifest operations
|
||||||
// ============================
|
// ============================
|
||||||
|
|
||||||
/// Store an object manifest on the target node.
|
/// Store an object manifest on the target node.
|
||||||
ManifestWrite(ManifestWriteRequest),
|
ManifestWrite(ManifestWriteRequest),
|
||||||
|
|
||||||
@@ -46,7 +44,6 @@ pub enum ClusterRequest {
|
|||||||
// ============================
|
// ============================
|
||||||
// Cluster management
|
// Cluster management
|
||||||
// ============================
|
// ============================
|
||||||
|
|
||||||
/// Periodic heartbeat.
|
/// Periodic heartbeat.
|
||||||
Heartbeat(HeartbeatMessage),
|
Heartbeat(HeartbeatMessage),
|
||||||
|
|
||||||
@@ -59,7 +56,6 @@ pub enum ClusterRequest {
|
|||||||
// ============================
|
// ============================
|
||||||
// Healing
|
// Healing
|
||||||
// ============================
|
// ============================
|
||||||
|
|
||||||
/// Request a shard to be reconstructed and placed on a target drive.
|
/// Request a shard to be reconstructed and placed on a target drive.
|
||||||
HealRequest(HealRequestMessage),
|
HealRequest(HealRequestMessage),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
|
use super::protocol::{
|
||||||
|
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
||||||
|
};
|
||||||
|
use super::shard_store::{ShardId, ShardStore};
|
||||||
|
use super::state::{ClusterState, NodeStatus};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
|
use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
|
||||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
|
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use super::protocol::{
|
|
||||||
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
|
||||||
};
|
|
||||||
use super::shard_store::{ShardId, ShardStore};
|
|
||||||
use super::state::{ClusterState, NodeStatus};
|
|
||||||
|
|
||||||
/// QUIC transport layer for inter-node communication.
|
/// QUIC transport layer for inter-node communication.
|
||||||
///
|
///
|
||||||
@@ -54,13 +54,9 @@ impl QuicTransport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Establish new connection
|
// Establish new connection
|
||||||
let conn = self
|
let conn = self.endpoint.connect(addr, "smartstorage")?.await?;
|
||||||
.endpoint
|
|
||||||
.connect(addr, "smartstorage")?
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
self.connections
|
self.connections.insert(node_id.to_string(), conn.clone());
|
||||||
.insert(node_id.to_string(), conn.clone());
|
|
||||||
|
|
||||||
Ok(conn)
|
Ok(conn)
|
||||||
}
|
}
|
||||||
@@ -246,7 +242,11 @@ impl QuicTransport {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||||
Ok(store) => store.write_shard(&shard_id, &shard_data, write_req.checksum).await,
|
Ok(store) => {
|
||||||
|
store
|
||||||
|
.write_shard(&shard_id, &shard_data, write_req.checksum)
|
||||||
|
.await
|
||||||
|
}
|
||||||
Err(error) => Err(error),
|
Err(error) => Err(error),
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -272,7 +272,8 @@ impl QuicTransport {
|
|||||||
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||||
Ok(store) => store,
|
Ok(store) => store,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
|
Self::send_error_response(&mut send, "InvalidDrive", error.to_string())
|
||||||
|
.await?;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -286,8 +287,10 @@ impl QuicTransport {
|
|||||||
checksum,
|
checksum,
|
||||||
};
|
};
|
||||||
// Send header
|
// Send header
|
||||||
let header_bytes = bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
let header_bytes =
|
||||||
send.write_all(&(header_bytes.len() as u32).to_le_bytes()).await?;
|
bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
||||||
|
send.write_all(&(header_bytes.len() as u32).to_le_bytes())
|
||||||
|
.await?;
|
||||||
send.write_all(&header_bytes).await?;
|
send.write_all(&header_bytes).await?;
|
||||||
// Send shard data
|
// Send shard data
|
||||||
send.write_all(&data).await?;
|
send.write_all(&data).await?;
|
||||||
@@ -300,8 +303,10 @@ impl QuicTransport {
|
|||||||
shard_data_length: 0,
|
shard_data_length: 0,
|
||||||
checksum: 0,
|
checksum: 0,
|
||||||
};
|
};
|
||||||
let header_bytes = bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
let header_bytes =
|
||||||
send.write_all(&(header_bytes.len() as u32).to_le_bytes()).await?;
|
bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
||||||
|
send.write_all(&(header_bytes.len() as u32).to_le_bytes())
|
||||||
|
.await?;
|
||||||
send.write_all(&header_bytes).await?;
|
send.write_all(&header_bytes).await?;
|
||||||
send.finish()?;
|
send.finish()?;
|
||||||
}
|
}
|
||||||
@@ -340,7 +345,8 @@ impl QuicTransport {
|
|||||||
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||||
Ok(store) => store,
|
Ok(store) => store,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
|
Self::send_error_response(&mut send, "InvalidDrive", error.to_string())
|
||||||
|
.await?;
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -403,7 +409,13 @@ impl QuicTransport {
|
|||||||
send.write_all(&response).await?;
|
send.write_all(&response).await?;
|
||||||
send.finish()?;
|
send.finish()?;
|
||||||
|
|
||||||
self.broadcast_topology(&state, Some(response_topology), None, Some(&joining_node_id)).await;
|
self.broadcast_topology(
|
||||||
|
&state,
|
||||||
|
Some(response_topology),
|
||||||
|
None,
|
||||||
|
Some(&joining_node_id),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
ClusterRequest::Heartbeat(heartbeat) => {
|
ClusterRequest::Heartbeat(heartbeat) => {
|
||||||
@@ -434,7 +446,8 @@ impl QuicTransport {
|
|||||||
send.finish()?;
|
send.finish()?;
|
||||||
|
|
||||||
if local_topology_version > peer_topology_version {
|
if local_topology_version > peer_topology_version {
|
||||||
self.broadcast_topology(&state, None, Some(&peer_node_id), None).await;
|
self.broadcast_topology(&state, None, Some(&peer_node_id), None)
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -585,8 +598,7 @@ impl QuicTransport {
|
|||||||
|
|
||||||
/// Close the QUIC endpoint gracefully.
|
/// Close the QUIC endpoint gracefully.
|
||||||
pub fn close(&self) {
|
pub fn close(&self) {
|
||||||
self.endpoint
|
self.endpoint.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||||
.close(quinn::VarInt::from_u32(0), b"shutdown");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the local node ID.
|
/// Get the local node ID.
|
||||||
|
|||||||
@@ -40,12 +40,7 @@ impl ShardStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Write a shard to disk atomically (write to temp file, then rename).
|
/// Write a shard to disk atomically (write to temp file, then rename).
|
||||||
pub async fn write_shard(
|
pub async fn write_shard(&self, shard_id: &ShardId, data: &[u8], checksum: u32) -> Result<()> {
|
||||||
&self,
|
|
||||||
shard_id: &ShardId,
|
|
||||||
data: &[u8],
|
|
||||||
checksum: u32,
|
|
||||||
) -> Result<()> {
|
|
||||||
let shard_path = self.shard_data_path(shard_id);
|
let shard_path = self.shard_data_path(shard_id);
|
||||||
let meta_path = self.shard_meta_path(shard_id);
|
let meta_path = self.shard_meta_path(shard_id);
|
||||||
|
|
||||||
@@ -117,11 +112,7 @@ impl ShardStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// List all shard IDs for a given bucket and key (across all chunks).
|
/// List all shard IDs for a given bucket and key (across all chunks).
|
||||||
pub async fn list_shards_for_object(
|
pub async fn list_shards_for_object(&self, bucket: &str, key: &str) -> Result<Vec<ShardId>> {
|
||||||
&self,
|
|
||||||
bucket: &str,
|
|
||||||
key: &str,
|
|
||||||
) -> Result<Vec<ShardId>> {
|
|
||||||
let key_dir = self.key_dir(bucket, key);
|
let key_dir = self.key_dir(bucket, key);
|
||||||
if !key_dir.exists() {
|
if !key_dir.exists() {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
|
|||||||
+13
-11
@@ -3,16 +3,16 @@ use std::path::PathBuf;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use super::placement::{DriveLocation, ErasureSet};
|
|
||||||
use super::persistence;
|
use super::persistence;
|
||||||
use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo};
|
use super::placement::{DriveLocation, ErasureSet};
|
||||||
|
use super::protocol::{ClusterTopology, DriveLocationInfo, ErasureSetInfo, NodeInfo};
|
||||||
|
|
||||||
/// Node status for tracking liveness.
|
/// Node status for tracking liveness.
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum NodeStatus {
|
pub enum NodeStatus {
|
||||||
Online,
|
Online,
|
||||||
Suspect, // missed 2+ heartbeats
|
Suspect, // missed 2+ heartbeats
|
||||||
Offline, // missed 5+ heartbeats
|
Offline, // missed 5+ heartbeats
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tracked state for a peer node.
|
/// Tracked state for a peer node.
|
||||||
@@ -162,11 +162,8 @@ impl ClusterState {
|
|||||||
if inner.erasure_sets.is_empty() {
|
if inner.erasure_sets.is_empty() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let set_idx = super::placement::erasure_set_for_object(
|
let set_idx =
|
||||||
bucket,
|
super::placement::erasure_set_for_object(bucket, key, inner.erasure_sets.len() as u32);
|
||||||
key,
|
|
||||||
inner.erasure_sets.len() as u32,
|
|
||||||
);
|
|
||||||
inner.erasure_sets.get(set_idx as usize).cloned()
|
inner.erasure_sets.get(set_idx as usize).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -284,7 +281,10 @@ impl ClusterState {
|
|||||||
|
|
||||||
let now = chrono::Utc::now();
|
let now = chrono::Utc::now();
|
||||||
for node_info in &topology.nodes {
|
for node_info in &topology.nodes {
|
||||||
let existing_status = inner.nodes.get(&node_info.node_id).map(|node| node.status.clone());
|
let existing_status = inner
|
||||||
|
.nodes
|
||||||
|
.get(&node_info.node_id)
|
||||||
|
.map(|node| node.status.clone());
|
||||||
let existing_missed_heartbeats = inner
|
let existing_missed_heartbeats = inner
|
||||||
.nodes
|
.nodes
|
||||||
.get(&node_info.node_id)
|
.get(&node_info.node_id)
|
||||||
@@ -304,7 +304,9 @@ impl ClusterState {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
inner.nodes.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
|
inner
|
||||||
|
.nodes
|
||||||
|
.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
|
||||||
|
|
||||||
// Update erasure sets
|
// Update erasure sets
|
||||||
inner.erasure_sets = topology
|
inner.erasure_sets = topology
|
||||||
|
|||||||
+25
-5
@@ -18,15 +18,27 @@ impl StorageError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn no_such_key() -> Self {
|
pub fn no_such_key() -> Self {
|
||||||
Self::new("NoSuchKey", "The specified key does not exist.", StatusCode::NOT_FOUND)
|
Self::new(
|
||||||
|
"NoSuchKey",
|
||||||
|
"The specified key does not exist.",
|
||||||
|
StatusCode::NOT_FOUND,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn no_such_bucket() -> Self {
|
pub fn no_such_bucket() -> Self {
|
||||||
Self::new("NoSuchBucket", "The specified bucket does not exist", StatusCode::NOT_FOUND)
|
Self::new(
|
||||||
|
"NoSuchBucket",
|
||||||
|
"The specified bucket does not exist",
|
||||||
|
StatusCode::NOT_FOUND,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn bucket_not_empty() -> Self {
|
pub fn bucket_not_empty() -> Self {
|
||||||
Self::new("BucketNotEmpty", "The bucket you tried to delete is not empty", StatusCode::CONFLICT)
|
Self::new(
|
||||||
|
"BucketNotEmpty",
|
||||||
|
"The bucket you tried to delete is not empty",
|
||||||
|
StatusCode::CONFLICT,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn access_denied() -> Self {
|
pub fn access_denied() -> Self {
|
||||||
@@ -34,11 +46,19 @@ impl StorageError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn no_such_upload() -> Self {
|
pub fn no_such_upload() -> Self {
|
||||||
Self::new("NoSuchUpload", "The specified upload does not exist", StatusCode::NOT_FOUND)
|
Self::new(
|
||||||
|
"NoSuchUpload",
|
||||||
|
"The specified upload does not exist",
|
||||||
|
StatusCode::NOT_FOUND,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn invalid_part_number() -> Self {
|
pub fn invalid_part_number() -> Self {
|
||||||
Self::new("InvalidPartNumber", "Part number must be between 1 and 10000", StatusCode::BAD_REQUEST)
|
Self::new(
|
||||||
|
"InvalidPartNumber",
|
||||||
|
"Part number must be between 1 and 10000",
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn internal_error(msg: &str) -> Self {
|
pub fn internal_error(msg: &str) -> Self {
|
||||||
|
|||||||
+5
-2
@@ -2,9 +2,9 @@ mod action;
|
|||||||
mod auth;
|
mod auth;
|
||||||
mod cluster;
|
mod cluster;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod error;
|
||||||
mod management;
|
mod management;
|
||||||
mod policy;
|
mod policy;
|
||||||
mod error;
|
|
||||||
mod server;
|
mod server;
|
||||||
mod storage;
|
mod storage;
|
||||||
mod xml_response;
|
mod xml_response;
|
||||||
@@ -12,7 +12,10 @@ mod xml_response;
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
|
||||||
#[derive(Parser)]
|
#[derive(Parser)]
|
||||||
#[command(name = "ruststorage", about = "High-performance S3-compatible storage server")]
|
#[command(
|
||||||
|
name = "ruststorage",
|
||||||
|
about = "High-performance S3-compatible storage server"
|
||||||
|
)]
|
||||||
struct Cli {
|
struct Cli {
|
||||||
/// Run in management mode (IPC via stdin/stdout)
|
/// Run in management mode (IPC via stdin/stdout)
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
|
|||||||
+15
-5
@@ -266,10 +266,16 @@ pub async fn management_loop() -> Result<()> {
|
|||||||
}
|
}
|
||||||
"listBucketTenants" => {
|
"listBucketTenants" => {
|
||||||
if let Some(ref s) = server {
|
if let Some(ref s) = server {
|
||||||
match serde_json::to_value(s.list_bucket_tenants().await) {
|
match s.list_bucket_tenants().await {
|
||||||
Ok(value) => send_response(id, value),
|
Ok(tenants) => match serde_json::to_value(tenants) {
|
||||||
|
Ok(value) => send_response(id, value),
|
||||||
|
Err(error) => send_error(
|
||||||
|
id,
|
||||||
|
format!("Failed to serialize bucket tenants: {}", error),
|
||||||
|
),
|
||||||
|
},
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
send_error(id, format!("Failed to serialize bucket tenants: {}", error))
|
send_error(id, format!("Failed to list bucket tenants: {}", error))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -287,20 +293,24 @@ pub async fn management_loop() -> Result<()> {
|
|||||||
Ok(params) => {
|
Ok(params) => {
|
||||||
if let Some(ref s) = server {
|
if let Some(ref s) = server {
|
||||||
match s.get_bucket_tenant_credential(¶ms.bucket_name).await {
|
match s.get_bucket_tenant_credential(¶ms.bucket_name).await {
|
||||||
Some(credential) => match serde_json::to_value(credential) {
|
Ok(Some(credential)) => match serde_json::to_value(credential) {
|
||||||
Ok(value) => send_response(id, value),
|
Ok(value) => send_response(id, value),
|
||||||
Err(error) => send_error(
|
Err(error) => send_error(
|
||||||
id,
|
id,
|
||||||
format!("Failed to serialize bucket tenant: {}", error),
|
format!("Failed to serialize bucket tenant: {}", error),
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
None => send_error(
|
Ok(None) => send_error(
|
||||||
id,
|
id,
|
||||||
format!(
|
format!(
|
||||||
"No bucket tenant credential exists for bucket {}",
|
"No bucket tenant credential exists for bucket {}",
|
||||||
params.bucket_name
|
params.bucket_name
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
|
Err(error) => send_error(
|
||||||
|
id,
|
||||||
|
format!("Failed to get bucket tenant credential: {}", error),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
send_error(id, "Server not started".to_string());
|
send_error(id, "Server not started".to_string());
|
||||||
|
|||||||
+7
-5
@@ -81,14 +81,14 @@ where
|
|||||||
let raw = PrincipalRaw::deserialize(deserializer)?;
|
let raw = PrincipalRaw::deserialize(deserializer)?;
|
||||||
match raw {
|
match raw {
|
||||||
PrincipalRaw::Star(s) if s == "*" => Ok(Principal::Wildcard),
|
PrincipalRaw::Star(s) if s == "*" => Ok(Principal::Wildcard),
|
||||||
PrincipalRaw::Star(_) => Err(serde::de::Error::custom(
|
PrincipalRaw::Star(_) => Err(serde::de::Error::custom("Principal string must be \"*\"")),
|
||||||
"Principal string must be \"*\"",
|
|
||||||
)),
|
|
||||||
PrincipalRaw::Map(map) => {
|
PrincipalRaw::Map(map) => {
|
||||||
if let Some(aws) = map.get("AWS") {
|
if let Some(aws) = map.get("AWS") {
|
||||||
Ok(Principal::Aws(aws.clone().into_vec()))
|
Ok(Principal::Aws(aws.clone().into_vec()))
|
||||||
} else {
|
} else {
|
||||||
Err(serde::de::Error::custom("Principal map must contain \"AWS\" key"))
|
Err(serde::de::Error::custom(
|
||||||
|
"Principal map must contain \"AWS\" key",
|
||||||
|
))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -286,7 +286,9 @@ const MAX_POLICY_SIZE: usize = 20 * 1024; // 20 KB
|
|||||||
|
|
||||||
pub fn validate_policy(json: &str) -> Result<BucketPolicy, StorageError> {
|
pub fn validate_policy(json: &str) -> Result<BucketPolicy, StorageError> {
|
||||||
if json.len() > MAX_POLICY_SIZE {
|
if json.len() > MAX_POLICY_SIZE {
|
||||||
return Err(StorageError::malformed_policy("Policy exceeds maximum size of 20KB"));
|
return Err(StorageError::malformed_policy(
|
||||||
|
"Policy exceeds maximum size of 20KB",
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let policy: BucketPolicy =
|
let policy: BucketPolicy =
|
||||||
|
|||||||
+47
-11
@@ -195,11 +195,27 @@ impl StorageServer {
|
|||||||
credential: Credential,
|
credential: Credential,
|
||||||
) -> Result<Credential> {
|
) -> Result<Credential> {
|
||||||
self.ensure_tenant_auth_enabled()?;
|
self.ensure_tenant_auth_enabled()?;
|
||||||
self.store.create_bucket(bucket_name).await?;
|
let replacement = self
|
||||||
Ok(self
|
|
||||||
.auth_runtime
|
.auth_runtime
|
||||||
.replace_bucket_tenant_credential(bucket_name, credential)
|
.replace_bucket_tenant_credential_with_snapshot(bucket_name, credential)
|
||||||
.await?)
|
.await?;
|
||||||
|
|
||||||
|
if let Err(error) = self.store.create_bucket(bucket_name).await {
|
||||||
|
if let Err(rollback_error) = self
|
||||||
|
.auth_runtime
|
||||||
|
.replace_credentials(replacement.previous_credentials)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"Failed to create tenant bucket: {}; credential rollback failed: {}",
|
||||||
|
error,
|
||||||
|
rollback_error.message
|
||||||
|
));
|
||||||
|
}
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(replacement.credential)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn rotate_bucket_tenant_credentials(
|
pub async fn rotate_bucket_tenant_credentials(
|
||||||
@@ -223,23 +239,43 @@ impl StorageServer {
|
|||||||
access_key_id: Option<&str>,
|
access_key_id: Option<&str>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
self.ensure_tenant_auth_enabled()?;
|
self.ensure_tenant_auth_enabled()?;
|
||||||
self.auth_runtime
|
let removal = self
|
||||||
|
.auth_runtime
|
||||||
.remove_bucket_tenant_credentials(bucket_name, access_key_id)
|
.remove_bucket_tenant_credentials(bucket_name, access_key_id)
|
||||||
.await?;
|
.await?;
|
||||||
if access_key_id.is_none() && self.store.bucket_exists(bucket_name).await {
|
if access_key_id.is_none() && self.store.bucket_exists(bucket_name).await {
|
||||||
self.store.delete_bucket_recursive(bucket_name).await?;
|
if let Err(error) = self.store.delete_bucket_recursive(bucket_name).await {
|
||||||
|
if let Err(rollback_error) = self
|
||||||
|
.auth_runtime
|
||||||
|
.replace_credentials(removal.previous_credentials)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
return Err(anyhow::anyhow!(
|
||||||
|
"Failed to delete tenant bucket: {}; credential rollback failed: {}",
|
||||||
|
error,
|
||||||
|
rollback_error.message
|
||||||
|
));
|
||||||
|
}
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_bucket_tenants(&self) -> Vec<crate::auth::BucketTenantMetadata> {
|
pub async fn list_bucket_tenants(&self) -> Result<Vec<crate::auth::BucketTenantMetadata>> {
|
||||||
self.auth_runtime.list_bucket_tenants().await
|
self.ensure_tenant_auth_enabled()?;
|
||||||
|
Ok(self.auth_runtime.list_bucket_tenants().await)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_bucket_tenant_credential(&self, bucket_name: &str) -> Option<Credential> {
|
pub async fn get_bucket_tenant_credential(
|
||||||
self.auth_runtime
|
&self,
|
||||||
|
bucket_name: &str,
|
||||||
|
) -> Result<Option<Credential>> {
|
||||||
|
self.ensure_tenant_auth_enabled()?;
|
||||||
|
Ok(self
|
||||||
|
.auth_runtime
|
||||||
.get_bucket_tenant_credential(bucket_name)
|
.get_bucket_tenant_credential(bucket_name)
|
||||||
.await
|
.await)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ensure_tenant_auth_enabled(&self) -> Result<()> {
|
fn ensure_tenant_auth_enabled(&self) -> Result<()> {
|
||||||
|
|||||||
+26
-5
@@ -1576,15 +1576,36 @@ impl StorageBackend {
|
|||||||
return Err(StorageError::invalid_request("Unsupported bucket export format.").into());
|
return Err(StorageError::invalid_request("Unsupported bucket export format.").into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut import_objects = Vec::with_capacity(source.objects.len());
|
||||||
|
for object in source.objects {
|
||||||
|
let data = hex::decode(&object.data_hex)
|
||||||
|
.map_err(|error| StorageError::invalid_request(&error.to_string()))?;
|
||||||
|
if data.len() as u64 != object.size {
|
||||||
|
return Err(StorageError::invalid_request(&format!(
|
||||||
|
"Bucket export object '{}' size does not match payload.",
|
||||||
|
object.key
|
||||||
|
))
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let md5_hex = format!("{:x}", Md5::digest(&data));
|
||||||
|
if !object.md5.eq_ignore_ascii_case(&md5_hex) {
|
||||||
|
return Err(StorageError::invalid_request(&format!(
|
||||||
|
"Bucket export object '{}' md5 does not match payload.",
|
||||||
|
object.key
|
||||||
|
))
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
|
import_objects.push((object.key, data, object.metadata));
|
||||||
|
}
|
||||||
|
|
||||||
if !self.bucket_exists(bucket).await {
|
if !self.bucket_exists(bucket).await {
|
||||||
self.create_bucket(bucket).await?;
|
self.create_bucket(bucket).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
for object in source.objects {
|
for (key, data, metadata) in import_objects {
|
||||||
let data = hex::decode(&object.data_hex)
|
self.put_object_bytes(bucket, &key, &data, metadata).await?;
|
||||||
.map_err(|error| StorageError::invalid_request(&error.to_string()))?;
|
|
||||||
self.put_object_bytes(bucket, &object.key, &data, object.metadata)
|
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -47,7 +47,10 @@ pub fn list_objects_v1_xml(bucket: &str, result: &ListObjectsResult) -> String {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if !result.delimiter.is_empty() {
|
if !result.delimiter.is_empty() {
|
||||||
xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(&result.delimiter)));
|
xml.push_str(&format!(
|
||||||
|
"<Delimiter>{}</Delimiter>",
|
||||||
|
xml_escape(&result.delimiter)
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
for entry in &result.contents {
|
for entry in &result.contents {
|
||||||
@@ -95,7 +98,10 @@ pub fn list_objects_v2_xml(bucket: &str, result: &ListObjectsResult) -> String {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if !result.delimiter.is_empty() {
|
if !result.delimiter.is_empty() {
|
||||||
xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(&result.delimiter)));
|
xml.push_str(&format!(
|
||||||
|
"<Delimiter>{}</Delimiter>",
|
||||||
|
xml_escape(&result.delimiter)
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref token) = result.next_continuation_token {
|
if let Some(ref token) = result.next_continuation_token {
|
||||||
|
|||||||
@@ -75,6 +75,17 @@ tap.test('should expose disabled cluster health in standalone mode', async () =>
|
|||||||
expect(clusterHealth.drives).toEqual(undefined);
|
expect(clusterHealth.drives).toEqual(undefined);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tap.test('should expose health and metrics with auth disabled', async () => {
|
||||||
|
const health = await testSmartStorageInstance.getHealth();
|
||||||
|
expect(health.running).toEqual(true);
|
||||||
|
expect(health.auth.enabled).toEqual(false);
|
||||||
|
expect(health.auth.tenantCredentialCount).toEqual(0);
|
||||||
|
|
||||||
|
const metrics = await testSmartStorageInstance.getMetrics();
|
||||||
|
expect(metrics.tenantCredentialCount).toEqual(0);
|
||||||
|
expect(metrics.prometheusText).toMatch(/smartstorage_tenant_credentials_total 0/);
|
||||||
|
});
|
||||||
|
|
||||||
tap.test('should create a bucket', async () => {
|
tap.test('should create a bucket', async () => {
|
||||||
const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' }));
|
const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' }));
|
||||||
expect(response.$metadata.httpStatusCode).toEqual(200);
|
expect(response.$metadata.httpStatusCode).toEqual(200);
|
||||||
|
|||||||
@@ -23,7 +23,10 @@ const STORAGE_DIR = fileURLToPath(new URL('../.nogit/bucket-tenant-tests', impor
|
|||||||
const WORKAPP_A_BUCKET = 'workapp-a-bucket';
|
const WORKAPP_A_BUCKET = 'workapp-a-bucket';
|
||||||
const WORKAPP_B_BUCKET = 'workapp-b-bucket';
|
const WORKAPP_B_BUCKET = 'workapp-b-bucket';
|
||||||
const RESTORE_BUCKET = 'workapp-a-restore-bucket';
|
const RESTORE_BUCKET = 'workapp-a-restore-bucket';
|
||||||
|
const CORRUPT_RESTORE_BUCKET = 'workapp-a-corrupt-restore-bucket';
|
||||||
const POLICY_BUCKET = 'workapp-policy-bucket';
|
const POLICY_BUCKET = 'workapp-policy-bucket';
|
||||||
|
const DUPLICATE_TENANT_BUCKET = 'workapp-duplicate-tenant-bucket';
|
||||||
|
const REVOKE_ONLY_BUCKET = 'workapp-revoke-only-bucket';
|
||||||
const ADMIN_CREDENTIAL: smartstorage.IStorageCredential = {
|
const ADMIN_CREDENTIAL: smartstorage.IStorageCredential = {
|
||||||
accessKeyId: 'TENANTADMIN',
|
accessKeyId: 'TENANTADMIN',
|
||||||
secretAccessKey: 'TENANTADMINSECRET123',
|
secretAccessKey: 'TENANTADMINSECRET123',
|
||||||
@@ -94,6 +97,20 @@ async function startStorage() {
|
|||||||
adminClient = createS3Client(ADMIN_CREDENTIAL);
|
adminClient = createS3Client(ADMIN_CREDENTIAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tap.test('bucket tenant client APIs require auth before IPC', async () => {
|
||||||
|
const storage = new smartstorage.SmartStorage({
|
||||||
|
auth: {
|
||||||
|
enabled: false,
|
||||||
|
credentials: [ADMIN_CREDENTIAL],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await expect(storage.listBucketTenants()).rejects.toThrow();
|
||||||
|
await expect(storage.getBucketTenantDescriptor({
|
||||||
|
bucketName: WORKAPP_A_BUCKET,
|
||||||
|
})).rejects.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
tap.test('setup: start storage and provision bucket tenants', async () => {
|
tap.test('setup: start storage and provision bucket tenants', async () => {
|
||||||
await rm(STORAGE_DIR, { recursive: true, force: true });
|
await rm(STORAGE_DIR, { recursive: true, force: true });
|
||||||
await startStorage();
|
await startStorage();
|
||||||
@@ -129,6 +146,18 @@ tap.test('listBucketTenants returns scoped credential metadata without secrets',
|
|||||||
expect((tenants[0] as any).secretAccessKey).toEqual(undefined);
|
expect((tenants[0] as any).secretAccessKey).toEqual(undefined);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tap.test('createBucketTenant validates credentials before creating buckets', async () => {
|
||||||
|
await expect(testSmartStorageInstance.createBucketTenant({
|
||||||
|
bucketName: DUPLICATE_TENANT_BUCKET,
|
||||||
|
accessKeyId: ADMIN_CREDENTIAL.accessKeyId,
|
||||||
|
secretAccessKey: 'DUPLICATESECRET123',
|
||||||
|
})).rejects.toThrow();
|
||||||
|
|
||||||
|
await expect(adminClient.send(new HeadBucketCommand({
|
||||||
|
Bucket: DUPLICATE_TENANT_BUCKET,
|
||||||
|
}))).rejects.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
tap.test('tenant credentials work with AWS SDK v3 for their assigned bucket', async () => {
|
tap.test('tenant credentials work with AWS SDK v3 for their assigned bucket', async () => {
|
||||||
const putA = await tenantAClient.send(new PutObjectCommand({
|
const putA = await tenantAClient.send(new PutObjectCommand({
|
||||||
Bucket: WORKAPP_A_BUCKET,
|
Bucket: WORKAPP_A_BUCKET,
|
||||||
@@ -227,6 +256,21 @@ tap.test('export/import targets one bucket without unrelated tenant data', async
|
|||||||
Bucket: RESTORE_BUCKET,
|
Bucket: RESTORE_BUCKET,
|
||||||
}));
|
}));
|
||||||
expect(restoredObjects.Contents?.some((object) => object.Key === 'other.txt')).toEqual(false);
|
expect(restoredObjects.Contents?.some((object) => object.Key === 'other.txt')).toEqual(false);
|
||||||
|
|
||||||
|
const corruptExport: smartstorage.IBucketExport = {
|
||||||
|
...bucketExport,
|
||||||
|
objects: bucketExport.objects.map((object, index) => index === 0 ? {
|
||||||
|
...object,
|
||||||
|
size: object.size + 1,
|
||||||
|
} : object),
|
||||||
|
};
|
||||||
|
await expect(testSmartStorageInstance.importBucket({
|
||||||
|
bucketName: CORRUPT_RESTORE_BUCKET,
|
||||||
|
source: corruptExport,
|
||||||
|
})).rejects.toThrow();
|
||||||
|
await expect(adminClient.send(new HeadBucketCommand({
|
||||||
|
Bucket: CORRUPT_RESTORE_BUCKET,
|
||||||
|
}))).rejects.toThrow();
|
||||||
});
|
});
|
||||||
|
|
||||||
tap.test('bucket policies persist across restart', async () => {
|
tap.test('bucket policies persist across restart', async () => {
|
||||||
@@ -301,25 +345,50 @@ tap.test('runtime credentials survive restart', async () => {
|
|||||||
expect(policyResponse.Policy?.includes('TenantPolicyPersistence')).toEqual(true);
|
expect(policyResponse.Policy?.includes('TenantPolicyPersistence')).toEqual(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tap.test('deleteBucketTenant refuses buckets without tenant credentials', async () => {
|
||||||
|
await expect(testSmartStorageInstance.deleteBucketTenant({
|
||||||
|
bucketName: POLICY_BUCKET,
|
||||||
|
})).rejects.toThrow();
|
||||||
|
|
||||||
|
const headAfterRefusedDelete = await adminClient.send(new HeadBucketCommand({
|
||||||
|
Bucket: POLICY_BUCKET,
|
||||||
|
}));
|
||||||
|
expect(headAfterRefusedDelete.$metadata.httpStatusCode).toEqual(200);
|
||||||
|
});
|
||||||
|
|
||||||
tap.test('deleteBucketTenant can revoke credentials and delete tenant buckets', async () => {
|
tap.test('deleteBucketTenant can revoke credentials and delete tenant buckets', async () => {
|
||||||
|
const revokeOnlyTenant = await testSmartStorageInstance.createBucketTenant({
|
||||||
|
bucketName: REVOKE_ONLY_BUCKET,
|
||||||
|
});
|
||||||
|
const revokeOnlyClient = createS3ClientFromDescriptor(revokeOnlyTenant);
|
||||||
|
await revokeOnlyClient.send(new PutObjectCommand({
|
||||||
|
Bucket: REVOKE_ONLY_BUCKET,
|
||||||
|
Key: 'revoke-only.txt',
|
||||||
|
Body: 'revocation target',
|
||||||
|
}));
|
||||||
|
|
||||||
await testSmartStorageInstance.deleteBucketTenant({
|
await testSmartStorageInstance.deleteBucketTenant({
|
||||||
bucketName: WORKAPP_B_BUCKET,
|
bucketName: REVOKE_ONLY_BUCKET,
|
||||||
accessKeyId: tenantB.accessKeyId,
|
accessKeyId: revokeOnlyTenant.accessKeyId,
|
||||||
});
|
});
|
||||||
|
|
||||||
await expect(tenantBClient.send(new GetObjectCommand({
|
await expect(revokeOnlyClient.send(new GetObjectCommand({
|
||||||
Bucket: WORKAPP_B_BUCKET,
|
Bucket: REVOKE_ONLY_BUCKET,
|
||||||
Key: 'other.txt',
|
Key: 'revoke-only.txt',
|
||||||
}))).rejects.toThrow();
|
}))).rejects.toThrow();
|
||||||
|
|
||||||
const headAfterRevoke = await adminClient.send(new HeadBucketCommand({
|
const headAfterRevoke = await adminClient.send(new HeadBucketCommand({
|
||||||
Bucket: WORKAPP_B_BUCKET,
|
Bucket: REVOKE_ONLY_BUCKET,
|
||||||
}));
|
}));
|
||||||
expect(headAfterRevoke.$metadata.httpStatusCode).toEqual(200);
|
expect(headAfterRevoke.$metadata.httpStatusCode).toEqual(200);
|
||||||
|
|
||||||
await testSmartStorageInstance.deleteBucketTenant({
|
await testSmartStorageInstance.deleteBucketTenant({
|
||||||
bucketName: WORKAPP_B_BUCKET,
|
bucketName: WORKAPP_B_BUCKET,
|
||||||
});
|
});
|
||||||
|
await expect(tenantBClient.send(new GetObjectCommand({
|
||||||
|
Bucket: WORKAPP_B_BUCKET,
|
||||||
|
Key: 'other.txt',
|
||||||
|
}))).rejects.toThrow();
|
||||||
await expect(adminClient.send(new HeadBucketCommand({
|
await expect(adminClient.send(new HeadBucketCommand({
|
||||||
Bucket: WORKAPP_B_BUCKET,
|
Bucket: WORKAPP_B_BUCKET,
|
||||||
}))).rejects.toThrow();
|
}))).rejects.toThrow();
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartstorage',
|
name: '@push.rocks/smartstorage',
|
||||||
version: '6.5.0',
|
version: '6.5.1',
|
||||||
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
||||||
}
|
}
|
||||||
|
|||||||
+3
-1
@@ -591,12 +591,14 @@ export class SmartStorage {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async listBucketTenants(): Promise<IBucketTenantMetadata[]> {
|
public async listBucketTenants(): Promise<IBucketTenantMetadata[]> {
|
||||||
|
this.assertTenantAuthEnabled();
|
||||||
return this.bridge.sendCommand('listBucketTenants', {});
|
return this.bridge.sendCommand('listBucketTenants', {});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async getBucketTenantDescriptor(optionsArg: {
|
public async getBucketTenantDescriptor(optionsArg: {
|
||||||
bucketName: string;
|
bucketName: string;
|
||||||
}): Promise<IBucketTenantDescriptor> {
|
}): Promise<IBucketTenantDescriptor> {
|
||||||
|
this.assertTenantAuthEnabled();
|
||||||
const credential = await this.bridge.sendCommand('getBucketTenantCredential', {
|
const credential = await this.bridge.sendCommand('getBucketTenantCredential', {
|
||||||
bucketName: optionsArg.bucketName,
|
bucketName: optionsArg.bucketName,
|
||||||
});
|
});
|
||||||
@@ -653,7 +655,7 @@ export class SmartStorage {
|
|||||||
const [stats, credentials, tenants, cluster] = await Promise.all([
|
const [stats, credentials, tenants, cluster] = await Promise.all([
|
||||||
this.getStorageStats(),
|
this.getStorageStats(),
|
||||||
this.listCredentials(),
|
this.listCredentials(),
|
||||||
this.listBucketTenants(),
|
this.config.auth.enabled ? this.listBucketTenants() : Promise.resolve([]),
|
||||||
this.getClusterHealth(),
|
this.getClusterHealth(),
|
||||||
]);
|
]);
|
||||||
return {
|
return {
|
||||||
|
|||||||
Reference in New Issue
Block a user