Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8f1d025330 | |||
| b075de1ecd |
@@ -1,5 +1,20 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-05-02 - 6.5.1 - fix(bucket-tenants)
|
||||
make tenant lifecycle and bucket import validation safer
|
||||
|
||||
- validate exported object size and MD5 before creating and importing target buckets
|
||||
- persist bucket-tenant credential changes with rollback when bucket creation or deletion fails
|
||||
- require auth-enabled access and matching tenant credentials for tenant list, descriptor, and recursive delete flows
|
||||
|
||||
## Next - fix(bucket-tenants)
|
||||
make tenant lifecycle and bucket import validation safer
|
||||
|
||||
- Validate bucket export object size and MD5 before creating/importing into target buckets.
|
||||
- Persist tenant credentials through snapshot-aware helpers and roll back credential changes if bucket create/delete fails.
|
||||
- Require matching tenant credentials before `deleteBucketTenant({ bucketName })` can recursively delete a bucket.
|
||||
- Enforce `auth.enabled` consistently on tenant list/descriptor APIs and document metadata-only credential listings.
|
||||
|
||||
## 2026-05-02 - 6.5.0 - feat(bucket-tenants)
|
||||
add persisted bucket-scoped tenant credentials with bucket export and import APIs
|
||||
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstorage",
|
||||
"version": "6.5.0",
|
||||
"version": "6.5.1",
|
||||
"private": false,
|
||||
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
+6
-6
@@ -1,6 +1,6 @@
|
||||
# Project Hints for smartstorage
|
||||
|
||||
## Current State (v6.4.0)
|
||||
## Current State (v6.5.0)
|
||||
|
||||
- **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge
|
||||
- High-performance: streaming I/O, zero-copy, backpressure, range seek
|
||||
@@ -16,7 +16,7 @@
|
||||
- Runtime credential listing and atomic replacement via the Rust bridge
|
||||
- Runtime credentials persist under `{storage}/.smartstorage/credentials.json`
|
||||
- Bucket tenant APIs provision scoped per-bucket credentials and enforce the scope before bucket-policy/default-auth authorization
|
||||
- Per-bucket export/import uses `smartstorage.bucket.v1` JSON with object payloads encoded per object
|
||||
- Per-bucket export/import uses `smartstorage.bucket.v1` JSON with object payloads encoded per object and size/MD5 validation on import
|
||||
- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/`
|
||||
- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics`
|
||||
- Runtime credential listing returns access-key metadata only; secrets are write-only
|
||||
@@ -48,7 +48,7 @@
|
||||
| `stop` | `{}` | Graceful shutdown |
|
||||
| `createBucket` | `{ name: string }` | Create bucket directory |
|
||||
| `createBucketTenant` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Create bucket and scoped persisted credential |
|
||||
| `deleteBucketTenant` | `{ bucketName, accessKeyId? }` | Revoke scoped credential or delete tenant bucket recursively |
|
||||
| `deleteBucketTenant` | `{ bucketName, accessKeyId? }` | Revoke scoped credential or delete a verified tenant bucket recursively |
|
||||
| `rotateBucketTenantCredentials` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Replace scoped credential for one bucket |
|
||||
| `listBucketTenants` | `{}` | Return scoped credential metadata |
|
||||
| `getBucketTenantCredential` | `{ bucketName }` | Return one scoped credential including secret for descriptor generation |
|
||||
@@ -56,7 +56,7 @@
|
||||
| `importBucket` | `{ bucketName, source }` | Import a `smartstorage.bucket.v1` bucket export |
|
||||
| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots |
|
||||
| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries |
|
||||
| `listCredentials` | `{}` | Return the active runtime auth credential set |
|
||||
| `listCredentials` | `{}` | Return metadata for the active runtime auth credential set |
|
||||
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
|
||||
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
|
||||
|
||||
@@ -92,8 +92,8 @@
|
||||
|
||||
## Testing
|
||||
|
||||
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337)
|
||||
- `test/test.bucket-tenants.node.ts` - bucket tenant provisioning, per-bucket isolation, restart persistence, export/import, policy persistence, rotation, revoke/delete, AWS SDK v3 compatibility (12 tests, port 3361)
|
||||
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health/metrics coverage (20 tests, auth disabled, port 3337)
|
||||
- `test/test.bucket-tenants.node.ts` - bucket tenant provisioning, per-bucket isolation, restart persistence, export/import integrity, policy persistence, rotation, revoke/delete safeguards, AWS SDK v3 compatibility (15 tests, port 3361)
|
||||
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
|
||||
- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353)
|
||||
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
|
||||
|
||||
@@ -271,7 +271,7 @@ const tenants = await storage.listBucketTenants();
|
||||
- `createBucketTenant()` creates the bucket if needed and stores a scoped credential for that bucket.
|
||||
- `rotateBucketTenantCredentials()` replaces the active scoped credential for the bucket and persists the new credential.
|
||||
- `deleteBucketTenant({ bucketName, accessKeyId })` revokes one scoped credential and keeps the bucket.
|
||||
- `deleteBucketTenant({ bucketName })` revokes scoped credentials for the bucket and deletes the bucket contents recursively.
|
||||
- `deleteBucketTenant({ bucketName })` revokes scoped credentials for an existing tenant bucket and deletes that bucket's contents recursively.
|
||||
- Tenant credentials can list, read, write, and delete objects in their assigned bucket, but cannot list all buckets, access other buckets, copy from other buckets, delete buckets, or mutate bucket policies.
|
||||
- Bucket tenant APIs require `auth.enabled: true`.
|
||||
|
||||
@@ -283,7 +283,7 @@ await storage.importBucket({ bucketName: 'workapp-123-restore', source: appBacku
|
||||
```
|
||||
|
||||
- `exportBucket()` returns a self-contained `smartstorage.bucket.v1` JSON export with only the selected bucket's objects and object metadata.
|
||||
- `importBucket()` creates the target bucket if needed and restores the exported objects into that bucket.
|
||||
- `importBucket()` validates object payload size and MD5 before creating the target bucket if needed, then restores the exported objects into that bucket.
|
||||
- Exports do not include credentials, policies, or unrelated tenant data.
|
||||
|
||||
## Health and Metrics APIs
|
||||
@@ -645,7 +645,7 @@ Create a bucket tenant with a generated or supplied scoped credential. Options:
|
||||
|
||||
#### `deleteBucketTenant(options): Promise<void>`
|
||||
|
||||
Revoke a tenant credential or delete the full tenant bucket. Options: `{ bucketName, accessKeyId? }`.
|
||||
Revoke a tenant credential or delete a bucket that still has tenant credentials. Options: `{ bucketName, accessKeyId? }`.
|
||||
|
||||
#### `rotateBucketTenantCredentials(options): Promise<IBucketTenantDescriptor>`
|
||||
|
||||
@@ -665,7 +665,7 @@ Export one bucket's objects and metadata into a `smartstorage.bucket.v1` JSON ob
|
||||
|
||||
#### `importBucket(options): Promise<void>`
|
||||
|
||||
Import a `smartstorage.bucket.v1` JSON object into the target bucket. Options: `{ bucketName, source }`.
|
||||
Import a `smartstorage.bucket.v1` JSON object into the target bucket after validating object size and MD5. Options: `{ bucketName, source }`.
|
||||
|
||||
#### `getStorageDescriptor(options?): Promise<IS3Descriptor>`
|
||||
|
||||
@@ -687,9 +687,9 @@ Read cached logical bucket and object totals from the Rust runtime without issui
|
||||
|
||||
Get per-bucket logical object counts and total payload sizes.
|
||||
|
||||
#### `listCredentials(): Promise<IStorageCredential[]>`
|
||||
#### `listCredentials(): Promise<IStorageCredentialMetadata[]>`
|
||||
|
||||
Return the currently active runtime credential set.
|
||||
Return metadata for the currently active runtime credential set without `secretAccessKey` values.
|
||||
|
||||
#### `replaceCredentials(credentials: IStorageCredential[]): Promise<void>`
|
||||
|
||||
|
||||
+106
-45
@@ -195,6 +195,17 @@ pub struct BucketTenantMetadata {
|
||||
pub region: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CredentialRemoval {
|
||||
pub previous_credentials: Vec<Credential>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CredentialReplacement {
|
||||
pub credential: Credential,
|
||||
pub previous_credentials: Vec<Credential>,
|
||||
}
|
||||
|
||||
impl RuntimeCredentialStore {
|
||||
pub async fn new(
|
||||
config: &AuthConfig,
|
||||
@@ -244,66 +255,54 @@ impl RuntimeCredentialStore {
|
||||
credentials: Vec<Credential>,
|
||||
) -> Result<(), StorageError> {
|
||||
validate_credentials(&credentials)?;
|
||||
let mut credentials_guard = self.credentials.write().await;
|
||||
self.persist_credentials(&credentials).await?;
|
||||
*self.credentials.write().await = credentials;
|
||||
*credentials_guard = credentials;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn replace_bucket_tenant_credential(
|
||||
&self,
|
||||
bucket_name: &str,
|
||||
mut credential: Credential,
|
||||
credential: Credential,
|
||||
) -> Result<Credential, StorageError> {
|
||||
validate_bucket_scope(bucket_name)?;
|
||||
credential.bucket_name = Some(bucket_name.to_string());
|
||||
let replacement = self
|
||||
.replace_bucket_tenant_credential_with_snapshot(bucket_name, credential)
|
||||
.await?;
|
||||
Ok(replacement.credential)
|
||||
}
|
||||
|
||||
let mut credentials = self.credentials.read().await.clone();
|
||||
if credentials.iter().any(|existing| {
|
||||
existing.access_key_id == credential.access_key_id
|
||||
&& existing.bucket_name.as_deref() != Some(bucket_name)
|
||||
}) {
|
||||
return Err(StorageError::invalid_request(
|
||||
"Credential accessKeyId is already assigned to another principal.",
|
||||
));
|
||||
}
|
||||
|
||||
credentials.retain(|existing| existing.bucket_name.as_deref() != Some(bucket_name));
|
||||
credentials.push(credential.clone());
|
||||
validate_credentials(&credentials)?;
|
||||
self.persist_credentials(&credentials).await?;
|
||||
*self.credentials.write().await = credentials;
|
||||
Ok(credential)
|
||||
pub async fn replace_bucket_tenant_credential_with_snapshot(
|
||||
&self,
|
||||
bucket_name: &str,
|
||||
credential: Credential,
|
||||
) -> Result<CredentialReplacement, StorageError> {
|
||||
let mut credentials_guard = self.credentials.write().await;
|
||||
let previous_credentials = credentials_guard.clone();
|
||||
let (credential, updated_credentials) =
|
||||
prepare_bucket_tenant_replacement(bucket_name, credential, &credentials_guard)?;
|
||||
self.persist_credentials(&updated_credentials).await?;
|
||||
*credentials_guard = updated_credentials;
|
||||
Ok(CredentialReplacement {
|
||||
credential,
|
||||
previous_credentials,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn remove_bucket_tenant_credentials(
|
||||
&self,
|
||||
bucket_name: &str,
|
||||
access_key_id: Option<&str>,
|
||||
) -> Result<usize, StorageError> {
|
||||
validate_bucket_scope(bucket_name)?;
|
||||
let mut credentials = self.credentials.read().await.clone();
|
||||
let before = credentials.len();
|
||||
credentials.retain(|credential| {
|
||||
if credential.bucket_name.as_deref() != Some(bucket_name) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if let Some(access_key_id) = access_key_id {
|
||||
credential.access_key_id != access_key_id
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
let removed = before.saturating_sub(credentials.len());
|
||||
if credentials.is_empty() {
|
||||
return Err(StorageError::invalid_request(
|
||||
"Cannot remove the last active credential.",
|
||||
));
|
||||
}
|
||||
self.persist_credentials(&credentials).await?;
|
||||
*self.credentials.write().await = credentials;
|
||||
Ok(removed)
|
||||
) -> Result<CredentialRemoval, StorageError> {
|
||||
let mut credentials_guard = self.credentials.write().await;
|
||||
let previous_credentials = credentials_guard.clone();
|
||||
let (_removed, updated_credentials) =
|
||||
prepare_bucket_tenant_removal(bucket_name, access_key_id, &credentials_guard)?;
|
||||
self.persist_credentials(&updated_credentials).await?;
|
||||
*credentials_guard = updated_credentials;
|
||||
Ok(CredentialRemoval {
|
||||
previous_credentials,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn list_bucket_tenants(&self) -> Vec<BucketTenantMetadata> {
|
||||
@@ -375,6 +374,68 @@ fn validate_bucket_scope(bucket_name: &str) -> Result<(), StorageError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn prepare_bucket_tenant_replacement(
|
||||
bucket_name: &str,
|
||||
mut credential: Credential,
|
||||
credentials: &[Credential],
|
||||
) -> Result<(Credential, Vec<Credential>), StorageError> {
|
||||
validate_bucket_scope(bucket_name)?;
|
||||
credential.bucket_name = Some(bucket_name.to_string());
|
||||
|
||||
if credentials.iter().any(|existing| {
|
||||
existing.access_key_id == credential.access_key_id
|
||||
&& existing.bucket_name.as_deref() != Some(bucket_name)
|
||||
}) {
|
||||
return Err(StorageError::invalid_request(
|
||||
"Credential accessKeyId is already assigned to another principal.",
|
||||
));
|
||||
}
|
||||
|
||||
let mut updated_credentials = credentials.to_vec();
|
||||
updated_credentials.retain(|existing| existing.bucket_name.as_deref() != Some(bucket_name));
|
||||
updated_credentials.push(credential.clone());
|
||||
validate_credentials(&updated_credentials)?;
|
||||
|
||||
Ok((credential, updated_credentials))
|
||||
}
|
||||
|
||||
fn prepare_bucket_tenant_removal(
|
||||
bucket_name: &str,
|
||||
access_key_id: Option<&str>,
|
||||
credentials: &[Credential],
|
||||
) -> Result<(usize, Vec<Credential>), StorageError> {
|
||||
validate_bucket_scope(bucket_name)?;
|
||||
|
||||
let mut updated_credentials = credentials.to_vec();
|
||||
let before = updated_credentials.len();
|
||||
updated_credentials.retain(|credential| {
|
||||
if credential.bucket_name.as_deref() != Some(bucket_name) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if let Some(access_key_id) = access_key_id {
|
||||
credential.access_key_id != access_key_id
|
||||
} else {
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
let removed = before.saturating_sub(updated_credentials.len());
|
||||
if removed == 0 {
|
||||
return Err(StorageError::invalid_request(
|
||||
"No matching bucket tenant credential exists.",
|
||||
));
|
||||
}
|
||||
|
||||
if updated_credentials.is_empty() {
|
||||
return Err(StorageError::invalid_request(
|
||||
"Cannot remove the last active credential.",
|
||||
));
|
||||
}
|
||||
|
||||
Ok((removed, updated_credentials))
|
||||
}
|
||||
|
||||
fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> {
|
||||
if credentials.is_empty() {
|
||||
return Err(StorageError::invalid_request(
|
||||
|
||||
@@ -80,11 +80,7 @@ impl ErasureCoder {
|
||||
let total = self.config.total_shards();
|
||||
|
||||
if shards.len() != total {
|
||||
anyhow::bail!(
|
||||
"Expected {} shards, got {}",
|
||||
total,
|
||||
shards.len()
|
||||
);
|
||||
anyhow::bail!("Expected {} shards, got {}", total, shards.len());
|
||||
}
|
||||
|
||||
let available = shards.iter().filter(|s| s.is_some()).count();
|
||||
@@ -159,7 +155,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_decode_with_missing_shards() {
|
||||
let coder = ErasureCoder::new(&test_config()).unwrap();
|
||||
let original = b"Testing reconstruction with missing shards - this should work with 4 of 6.";
|
||||
let original =
|
||||
b"Testing reconstruction with missing shards - this should work with 4 of 6.";
|
||||
|
||||
let shards = coder.encode_chunk(original).unwrap();
|
||||
|
||||
|
||||
@@ -166,12 +166,7 @@ impl HealingService {
|
||||
Ok(stats)
|
||||
}
|
||||
|
||||
async fn heal_bucket(
|
||||
&self,
|
||||
bucket: &str,
|
||||
offline_nodes: &[String],
|
||||
stats: &mut HealStats,
|
||||
) {
|
||||
async fn heal_bucket(&self, bucket: &str, offline_nodes: &[String], stats: &mut HealStats) {
|
||||
let bucket_dir = self.manifest_dir.join(bucket);
|
||||
let manifests = match self.collect_manifests(&bucket_dir).await {
|
||||
Ok(m) => m,
|
||||
@@ -264,10 +259,10 @@ impl HealingService {
|
||||
}
|
||||
|
||||
// Reconstruct all shards
|
||||
let reconstructed = match self.erasure_coder.decode_chunk(
|
||||
&mut shards,
|
||||
chunk.data_size,
|
||||
) {
|
||||
let reconstructed = match self
|
||||
.erasure_coder
|
||||
.decode_chunk(&mut shards, chunk.data_size)
|
||||
{
|
||||
Ok(_) => true,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
@@ -361,7 +356,8 @@ impl HealingService {
|
||||
/// Collect all manifests under a bucket directory.
|
||||
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
|
||||
let mut manifests = Vec::new();
|
||||
self.collect_manifests_recursive(dir, &mut manifests).await?;
|
||||
self.collect_manifests_recursive(dir, &mut manifests)
|
||||
.await?;
|
||||
Ok(manifests)
|
||||
}
|
||||
|
||||
|
||||
@@ -7,8 +7,7 @@ use tokio::sync::Mutex;
|
||||
|
||||
use super::drive_manager::{DriveManager, DriveStatus};
|
||||
use super::protocol::{
|
||||
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage,
|
||||
NodeInfo,
|
||||
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage, NodeInfo,
|
||||
};
|
||||
use super::quic_transport::QuicTransport;
|
||||
use super::state::ClusterState;
|
||||
@@ -49,7 +48,11 @@ impl MembershipManager {
|
||||
|
||||
/// Join the cluster by contacting seed nodes.
|
||||
/// Sends a JoinRequest to each seed node until one accepts.
|
||||
pub async fn join_cluster(&self, seed_nodes: &[String], allow_bootstrap_on_failure: bool) -> Result<()> {
|
||||
pub async fn join_cluster(
|
||||
&self,
|
||||
seed_nodes: &[String],
|
||||
allow_bootstrap_on_failure: bool,
|
||||
) -> Result<()> {
|
||||
if seed_nodes.is_empty() {
|
||||
tracing::info!("No seed nodes configured, starting as initial cluster node");
|
||||
self.state.add_node(self.local_node_info.clone()).await;
|
||||
@@ -84,14 +87,13 @@ impl MembershipManager {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
anyhow::bail!("Could not reach any configured seed nodes; refusing unsafe cluster bootstrap")
|
||||
anyhow::bail!(
|
||||
"Could not reach any configured seed nodes; refusing unsafe cluster bootstrap"
|
||||
)
|
||||
}
|
||||
|
||||
async fn try_join(&self, addr: SocketAddr) -> Result<()> {
|
||||
let conn = self
|
||||
.transport
|
||||
.get_connection("seed", addr)
|
||||
.await?;
|
||||
let conn = self.transport.get_connection("seed", addr).await?;
|
||||
|
||||
let request = ClusterRequest::JoinRequest(JoinRequestMessage {
|
||||
node_info: self.local_node_info.clone(),
|
||||
@@ -120,10 +122,7 @@ impl MembershipManager {
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"Join rejected: {}",
|
||||
join_resp.error.unwrap_or_default()
|
||||
)
|
||||
anyhow::bail!("Join rejected: {}", join_resp.error.unwrap_or_default())
|
||||
}
|
||||
}
|
||||
ClusterResponse::Error(e) => {
|
||||
|
||||
@@ -113,11 +113,17 @@ mod tests {
|
||||
|
||||
// Set 0 should interleave across nodes
|
||||
let set0_nodes: Vec<&str> = sets[0].drives.iter().map(|d| d.node_id.as_str()).collect();
|
||||
assert_eq!(set0_nodes, vec!["node1", "node2", "node3", "node1", "node2", "node3"]);
|
||||
assert_eq!(
|
||||
set0_nodes,
|
||||
vec!["node1", "node2", "node3", "node1", "node2", "node3"]
|
||||
);
|
||||
|
||||
// Set 1 should also interleave
|
||||
let set1_nodes: Vec<&str> = sets[1].drives.iter().map(|d| d.node_id.as_str()).collect();
|
||||
assert_eq!(set1_nodes, vec!["node1", "node2", "node3", "node1", "node2", "node3"]);
|
||||
assert_eq!(
|
||||
set1_nodes,
|
||||
vec!["node1", "node2", "node3", "node1", "node2", "node3"]
|
||||
);
|
||||
|
||||
// Drive indices should be different between sets
|
||||
let set0_drives: Vec<u32> = sets[0].drives.iter().map(|d| d.drive_index).collect();
|
||||
@@ -129,10 +135,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_form_erasure_sets_remainder() {
|
||||
// 2 nodes, 3 drives each, 4 shards => 1 set (2 drives left over)
|
||||
let nodes = vec![
|
||||
("a".to_string(), 3),
|
||||
("b".to_string(), 3),
|
||||
];
|
||||
let nodes = vec![("a".to_string(), 3), ("b".to_string(), 3)];
|
||||
let sets = form_erasure_sets(&nodes, 4);
|
||||
assert_eq!(sets.len(), 1);
|
||||
assert_eq!(sets[0].drives.len(), 4);
|
||||
|
||||
@@ -13,7 +13,6 @@ pub enum ClusterRequest {
|
||||
// ============================
|
||||
// Shard operations
|
||||
// ============================
|
||||
|
||||
/// Write a shard to a specific drive on the target node.
|
||||
/// Shard data follows after this header on the same stream.
|
||||
ShardWrite(ShardWriteRequest),
|
||||
@@ -30,7 +29,6 @@ pub enum ClusterRequest {
|
||||
// ============================
|
||||
// Manifest operations
|
||||
// ============================
|
||||
|
||||
/// Store an object manifest on the target node.
|
||||
ManifestWrite(ManifestWriteRequest),
|
||||
|
||||
@@ -46,7 +44,6 @@ pub enum ClusterRequest {
|
||||
// ============================
|
||||
// Cluster management
|
||||
// ============================
|
||||
|
||||
/// Periodic heartbeat.
|
||||
Heartbeat(HeartbeatMessage),
|
||||
|
||||
@@ -59,7 +56,6 @@ pub enum ClusterRequest {
|
||||
// ============================
|
||||
// Healing
|
||||
// ============================
|
||||
|
||||
/// Request a shard to be reconstructed and placed on a target drive.
|
||||
HealRequest(HealRequestMessage),
|
||||
}
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
use super::protocol::{
|
||||
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
||||
};
|
||||
use super::shard_store::{ShardId, ShardStore};
|
||||
use super::state::{ClusterState, NodeStatus};
|
||||
use anyhow::Result;
|
||||
use dashmap::DashMap;
|
||||
use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use super::protocol::{
|
||||
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
||||
};
|
||||
use super::shard_store::{ShardId, ShardStore};
|
||||
use super::state::{ClusterState, NodeStatus};
|
||||
|
||||
/// QUIC transport layer for inter-node communication.
|
||||
///
|
||||
@@ -54,13 +54,9 @@ impl QuicTransport {
|
||||
}
|
||||
|
||||
// Establish new connection
|
||||
let conn = self
|
||||
.endpoint
|
||||
.connect(addr, "smartstorage")?
|
||||
.await?;
|
||||
let conn = self.endpoint.connect(addr, "smartstorage")?.await?;
|
||||
|
||||
self.connections
|
||||
.insert(node_id.to_string(), conn.clone());
|
||||
self.connections.insert(node_id.to_string(), conn.clone());
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
@@ -246,7 +242,11 @@ impl QuicTransport {
|
||||
};
|
||||
|
||||
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||
Ok(store) => store.write_shard(&shard_id, &shard_data, write_req.checksum).await,
|
||||
Ok(store) => {
|
||||
store
|
||||
.write_shard(&shard_id, &shard_data, write_req.checksum)
|
||||
.await
|
||||
}
|
||||
Err(error) => Err(error),
|
||||
};
|
||||
|
||||
@@ -272,7 +272,8 @@ impl QuicTransport {
|
||||
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||
Ok(store) => store,
|
||||
Err(error) => {
|
||||
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
|
||||
Self::send_error_response(&mut send, "InvalidDrive", error.to_string())
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
@@ -286,8 +287,10 @@ impl QuicTransport {
|
||||
checksum,
|
||||
};
|
||||
// Send header
|
||||
let header_bytes = bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
||||
send.write_all(&(header_bytes.len() as u32).to_le_bytes()).await?;
|
||||
let header_bytes =
|
||||
bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
||||
send.write_all(&(header_bytes.len() as u32).to_le_bytes())
|
||||
.await?;
|
||||
send.write_all(&header_bytes).await?;
|
||||
// Send shard data
|
||||
send.write_all(&data).await?;
|
||||
@@ -300,8 +303,10 @@ impl QuicTransport {
|
||||
shard_data_length: 0,
|
||||
checksum: 0,
|
||||
};
|
||||
let header_bytes = bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
||||
send.write_all(&(header_bytes.len() as u32).to_le_bytes()).await?;
|
||||
let header_bytes =
|
||||
bincode::serialize(&ClusterResponse::ShardReadResponse(header))?;
|
||||
send.write_all(&(header_bytes.len() as u32).to_le_bytes())
|
||||
.await?;
|
||||
send.write_all(&header_bytes).await?;
|
||||
send.finish()?;
|
||||
}
|
||||
@@ -340,7 +345,8 @@ impl QuicTransport {
|
||||
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||
Ok(store) => store,
|
||||
Err(error) => {
|
||||
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
|
||||
Self::send_error_response(&mut send, "InvalidDrive", error.to_string())
|
||||
.await?;
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
@@ -403,7 +409,13 @@ impl QuicTransport {
|
||||
send.write_all(&response).await?;
|
||||
send.finish()?;
|
||||
|
||||
self.broadcast_topology(&state, Some(response_topology), None, Some(&joining_node_id)).await;
|
||||
self.broadcast_topology(
|
||||
&state,
|
||||
Some(response_topology),
|
||||
None,
|
||||
Some(&joining_node_id),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
ClusterRequest::Heartbeat(heartbeat) => {
|
||||
@@ -434,7 +446,8 @@ impl QuicTransport {
|
||||
send.finish()?;
|
||||
|
||||
if local_topology_version > peer_topology_version {
|
||||
self.broadcast_topology(&state, None, Some(&peer_node_id), None).await;
|
||||
self.broadcast_topology(&state, None, Some(&peer_node_id), None)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -585,8 +598,7 @@ impl QuicTransport {
|
||||
|
||||
/// Close the QUIC endpoint gracefully.
|
||||
pub fn close(&self) {
|
||||
self.endpoint
|
||||
.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||
self.endpoint.close(quinn::VarInt::from_u32(0), b"shutdown");
|
||||
}
|
||||
|
||||
/// Get the local node ID.
|
||||
|
||||
@@ -40,12 +40,7 @@ impl ShardStore {
|
||||
}
|
||||
|
||||
/// Write a shard to disk atomically (write to temp file, then rename).
|
||||
pub async fn write_shard(
|
||||
&self,
|
||||
shard_id: &ShardId,
|
||||
data: &[u8],
|
||||
checksum: u32,
|
||||
) -> Result<()> {
|
||||
pub async fn write_shard(&self, shard_id: &ShardId, data: &[u8], checksum: u32) -> Result<()> {
|
||||
let shard_path = self.shard_data_path(shard_id);
|
||||
let meta_path = self.shard_meta_path(shard_id);
|
||||
|
||||
@@ -117,11 +112,7 @@ impl ShardStore {
|
||||
}
|
||||
|
||||
/// List all shard IDs for a given bucket and key (across all chunks).
|
||||
pub async fn list_shards_for_object(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
) -> Result<Vec<ShardId>> {
|
||||
pub async fn list_shards_for_object(&self, bucket: &str, key: &str) -> Result<Vec<ShardId>> {
|
||||
let key_dir = self.key_dir(bucket, key);
|
||||
if !key_dir.exists() {
|
||||
return Ok(Vec::new());
|
||||
|
||||
+13
-11
@@ -3,16 +3,16 @@ use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use super::placement::{DriveLocation, ErasureSet};
|
||||
use super::persistence;
|
||||
use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo};
|
||||
use super::placement::{DriveLocation, ErasureSet};
|
||||
use super::protocol::{ClusterTopology, DriveLocationInfo, ErasureSetInfo, NodeInfo};
|
||||
|
||||
/// Node status for tracking liveness.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum NodeStatus {
|
||||
Online,
|
||||
Suspect, // missed 2+ heartbeats
|
||||
Offline, // missed 5+ heartbeats
|
||||
Suspect, // missed 2+ heartbeats
|
||||
Offline, // missed 5+ heartbeats
|
||||
}
|
||||
|
||||
/// Tracked state for a peer node.
|
||||
@@ -162,11 +162,8 @@ impl ClusterState {
|
||||
if inner.erasure_sets.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let set_idx = super::placement::erasure_set_for_object(
|
||||
bucket,
|
||||
key,
|
||||
inner.erasure_sets.len() as u32,
|
||||
);
|
||||
let set_idx =
|
||||
super::placement::erasure_set_for_object(bucket, key, inner.erasure_sets.len() as u32);
|
||||
inner.erasure_sets.get(set_idx as usize).cloned()
|
||||
}
|
||||
|
||||
@@ -284,7 +281,10 @@ impl ClusterState {
|
||||
|
||||
let now = chrono::Utc::now();
|
||||
for node_info in &topology.nodes {
|
||||
let existing_status = inner.nodes.get(&node_info.node_id).map(|node| node.status.clone());
|
||||
let existing_status = inner
|
||||
.nodes
|
||||
.get(&node_info.node_id)
|
||||
.map(|node| node.status.clone());
|
||||
let existing_missed_heartbeats = inner
|
||||
.nodes
|
||||
.get(&node_info.node_id)
|
||||
@@ -304,7 +304,9 @@ impl ClusterState {
|
||||
);
|
||||
}
|
||||
|
||||
inner.nodes.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
|
||||
inner
|
||||
.nodes
|
||||
.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
|
||||
|
||||
// Update erasure sets
|
||||
inner.erasure_sets = topology
|
||||
|
||||
+25
-5
@@ -18,15 +18,27 @@ impl StorageError {
|
||||
}
|
||||
|
||||
pub fn no_such_key() -> Self {
|
||||
Self::new("NoSuchKey", "The specified key does not exist.", StatusCode::NOT_FOUND)
|
||||
Self::new(
|
||||
"NoSuchKey",
|
||||
"The specified key does not exist.",
|
||||
StatusCode::NOT_FOUND,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn no_such_bucket() -> Self {
|
||||
Self::new("NoSuchBucket", "The specified bucket does not exist", StatusCode::NOT_FOUND)
|
||||
Self::new(
|
||||
"NoSuchBucket",
|
||||
"The specified bucket does not exist",
|
||||
StatusCode::NOT_FOUND,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn bucket_not_empty() -> Self {
|
||||
Self::new("BucketNotEmpty", "The bucket you tried to delete is not empty", StatusCode::CONFLICT)
|
||||
Self::new(
|
||||
"BucketNotEmpty",
|
||||
"The bucket you tried to delete is not empty",
|
||||
StatusCode::CONFLICT,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn access_denied() -> Self {
|
||||
@@ -34,11 +46,19 @@ impl StorageError {
|
||||
}
|
||||
|
||||
pub fn no_such_upload() -> Self {
|
||||
Self::new("NoSuchUpload", "The specified upload does not exist", StatusCode::NOT_FOUND)
|
||||
Self::new(
|
||||
"NoSuchUpload",
|
||||
"The specified upload does not exist",
|
||||
StatusCode::NOT_FOUND,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn invalid_part_number() -> Self {
|
||||
Self::new("InvalidPartNumber", "Part number must be between 1 and 10000", StatusCode::BAD_REQUEST)
|
||||
Self::new(
|
||||
"InvalidPartNumber",
|
||||
"Part number must be between 1 and 10000",
|
||||
StatusCode::BAD_REQUEST,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn internal_error(msg: &str) -> Self {
|
||||
|
||||
+5
-2
@@ -2,9 +2,9 @@ mod action;
|
||||
mod auth;
|
||||
mod cluster;
|
||||
mod config;
|
||||
mod error;
|
||||
mod management;
|
||||
mod policy;
|
||||
mod error;
|
||||
mod server;
|
||||
mod storage;
|
||||
mod xml_response;
|
||||
@@ -12,7 +12,10 @@ mod xml_response;
|
||||
use clap::Parser;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(name = "ruststorage", about = "High-performance S3-compatible storage server")]
|
||||
#[command(
|
||||
name = "ruststorage",
|
||||
about = "High-performance S3-compatible storage server"
|
||||
)]
|
||||
struct Cli {
|
||||
/// Run in management mode (IPC via stdin/stdout)
|
||||
#[arg(long)]
|
||||
|
||||
+15
-5
@@ -266,10 +266,16 @@ pub async fn management_loop() -> Result<()> {
|
||||
}
|
||||
"listBucketTenants" => {
|
||||
if let Some(ref s) = server {
|
||||
match serde_json::to_value(s.list_bucket_tenants().await) {
|
||||
Ok(value) => send_response(id, value),
|
||||
match s.list_bucket_tenants().await {
|
||||
Ok(tenants) => match serde_json::to_value(tenants) {
|
||||
Ok(value) => send_response(id, value),
|
||||
Err(error) => send_error(
|
||||
id,
|
||||
format!("Failed to serialize bucket tenants: {}", error),
|
||||
),
|
||||
},
|
||||
Err(error) => {
|
||||
send_error(id, format!("Failed to serialize bucket tenants: {}", error))
|
||||
send_error(id, format!("Failed to list bucket tenants: {}", error))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -287,20 +293,24 @@ pub async fn management_loop() -> Result<()> {
|
||||
Ok(params) => {
|
||||
if let Some(ref s) = server {
|
||||
match s.get_bucket_tenant_credential(¶ms.bucket_name).await {
|
||||
Some(credential) => match serde_json::to_value(credential) {
|
||||
Ok(Some(credential)) => match serde_json::to_value(credential) {
|
||||
Ok(value) => send_response(id, value),
|
||||
Err(error) => send_error(
|
||||
id,
|
||||
format!("Failed to serialize bucket tenant: {}", error),
|
||||
),
|
||||
},
|
||||
None => send_error(
|
||||
Ok(None) => send_error(
|
||||
id,
|
||||
format!(
|
||||
"No bucket tenant credential exists for bucket {}",
|
||||
params.bucket_name
|
||||
),
|
||||
),
|
||||
Err(error) => send_error(
|
||||
id,
|
||||
format!("Failed to get bucket tenant credential: {}", error),
|
||||
),
|
||||
}
|
||||
} else {
|
||||
send_error(id, "Server not started".to_string());
|
||||
|
||||
+7
-5
@@ -81,14 +81,14 @@ where
|
||||
let raw = PrincipalRaw::deserialize(deserializer)?;
|
||||
match raw {
|
||||
PrincipalRaw::Star(s) if s == "*" => Ok(Principal::Wildcard),
|
||||
PrincipalRaw::Star(_) => Err(serde::de::Error::custom(
|
||||
"Principal string must be \"*\"",
|
||||
)),
|
||||
PrincipalRaw::Star(_) => Err(serde::de::Error::custom("Principal string must be \"*\"")),
|
||||
PrincipalRaw::Map(map) => {
|
||||
if let Some(aws) = map.get("AWS") {
|
||||
Ok(Principal::Aws(aws.clone().into_vec()))
|
||||
} else {
|
||||
Err(serde::de::Error::custom("Principal map must contain \"AWS\" key"))
|
||||
Err(serde::de::Error::custom(
|
||||
"Principal map must contain \"AWS\" key",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -286,7 +286,9 @@ const MAX_POLICY_SIZE: usize = 20 * 1024; // 20 KB
|
||||
|
||||
pub fn validate_policy(json: &str) -> Result<BucketPolicy, StorageError> {
|
||||
if json.len() > MAX_POLICY_SIZE {
|
||||
return Err(StorageError::malformed_policy("Policy exceeds maximum size of 20KB"));
|
||||
return Err(StorageError::malformed_policy(
|
||||
"Policy exceeds maximum size of 20KB",
|
||||
));
|
||||
}
|
||||
|
||||
let policy: BucketPolicy =
|
||||
|
||||
+47
-11
@@ -195,11 +195,27 @@ impl StorageServer {
|
||||
credential: Credential,
|
||||
) -> Result<Credential> {
|
||||
self.ensure_tenant_auth_enabled()?;
|
||||
self.store.create_bucket(bucket_name).await?;
|
||||
Ok(self
|
||||
let replacement = self
|
||||
.auth_runtime
|
||||
.replace_bucket_tenant_credential(bucket_name, credential)
|
||||
.await?)
|
||||
.replace_bucket_tenant_credential_with_snapshot(bucket_name, credential)
|
||||
.await?;
|
||||
|
||||
if let Err(error) = self.store.create_bucket(bucket_name).await {
|
||||
if let Err(rollback_error) = self
|
||||
.auth_runtime
|
||||
.replace_credentials(replacement.previous_credentials)
|
||||
.await
|
||||
{
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to create tenant bucket: {}; credential rollback failed: {}",
|
||||
error,
|
||||
rollback_error.message
|
||||
));
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
Ok(replacement.credential)
|
||||
}
|
||||
|
||||
pub async fn rotate_bucket_tenant_credentials(
|
||||
@@ -223,23 +239,43 @@ impl StorageServer {
|
||||
access_key_id: Option<&str>,
|
||||
) -> Result<()> {
|
||||
self.ensure_tenant_auth_enabled()?;
|
||||
self.auth_runtime
|
||||
let removal = self
|
||||
.auth_runtime
|
||||
.remove_bucket_tenant_credentials(bucket_name, access_key_id)
|
||||
.await?;
|
||||
if access_key_id.is_none() && self.store.bucket_exists(bucket_name).await {
|
||||
self.store.delete_bucket_recursive(bucket_name).await?;
|
||||
if let Err(error) = self.store.delete_bucket_recursive(bucket_name).await {
|
||||
if let Err(rollback_error) = self
|
||||
.auth_runtime
|
||||
.replace_credentials(removal.previous_credentials)
|
||||
.await
|
||||
{
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to delete tenant bucket: {}; credential rollback failed: {}",
|
||||
error,
|
||||
rollback_error.message
|
||||
));
|
||||
}
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn list_bucket_tenants(&self) -> Vec<crate::auth::BucketTenantMetadata> {
|
||||
self.auth_runtime.list_bucket_tenants().await
|
||||
pub async fn list_bucket_tenants(&self) -> Result<Vec<crate::auth::BucketTenantMetadata>> {
|
||||
self.ensure_tenant_auth_enabled()?;
|
||||
Ok(self.auth_runtime.list_bucket_tenants().await)
|
||||
}
|
||||
|
||||
pub async fn get_bucket_tenant_credential(&self, bucket_name: &str) -> Option<Credential> {
|
||||
self.auth_runtime
|
||||
pub async fn get_bucket_tenant_credential(
|
||||
&self,
|
||||
bucket_name: &str,
|
||||
) -> Result<Option<Credential>> {
|
||||
self.ensure_tenant_auth_enabled()?;
|
||||
Ok(self
|
||||
.auth_runtime
|
||||
.get_bucket_tenant_credential(bucket_name)
|
||||
.await
|
||||
.await)
|
||||
}
|
||||
|
||||
fn ensure_tenant_auth_enabled(&self) -> Result<()> {
|
||||
|
||||
+26
-5
@@ -1576,15 +1576,36 @@ impl StorageBackend {
|
||||
return Err(StorageError::invalid_request("Unsupported bucket export format.").into());
|
||||
}
|
||||
|
||||
let mut import_objects = Vec::with_capacity(source.objects.len());
|
||||
for object in source.objects {
|
||||
let data = hex::decode(&object.data_hex)
|
||||
.map_err(|error| StorageError::invalid_request(&error.to_string()))?;
|
||||
if data.len() as u64 != object.size {
|
||||
return Err(StorageError::invalid_request(&format!(
|
||||
"Bucket export object '{}' size does not match payload.",
|
||||
object.key
|
||||
))
|
||||
.into());
|
||||
}
|
||||
|
||||
let md5_hex = format!("{:x}", Md5::digest(&data));
|
||||
if !object.md5.eq_ignore_ascii_case(&md5_hex) {
|
||||
return Err(StorageError::invalid_request(&format!(
|
||||
"Bucket export object '{}' md5 does not match payload.",
|
||||
object.key
|
||||
))
|
||||
.into());
|
||||
}
|
||||
|
||||
import_objects.push((object.key, data, object.metadata));
|
||||
}
|
||||
|
||||
if !self.bucket_exists(bucket).await {
|
||||
self.create_bucket(bucket).await?;
|
||||
}
|
||||
|
||||
for object in source.objects {
|
||||
let data = hex::decode(&object.data_hex)
|
||||
.map_err(|error| StorageError::invalid_request(&error.to_string()))?;
|
||||
self.put_object_bytes(bucket, &object.key, &data, object.metadata)
|
||||
.await?;
|
||||
for (key, data, metadata) in import_objects {
|
||||
self.put_object_bytes(bucket, &key, &data, metadata).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -47,7 +47,10 @@ pub fn list_objects_v1_xml(bucket: &str, result: &ListObjectsResult) -> String {
|
||||
);
|
||||
|
||||
if !result.delimiter.is_empty() {
|
||||
xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(&result.delimiter)));
|
||||
xml.push_str(&format!(
|
||||
"<Delimiter>{}</Delimiter>",
|
||||
xml_escape(&result.delimiter)
|
||||
));
|
||||
}
|
||||
|
||||
for entry in &result.contents {
|
||||
@@ -95,7 +98,10 @@ pub fn list_objects_v2_xml(bucket: &str, result: &ListObjectsResult) -> String {
|
||||
);
|
||||
|
||||
if !result.delimiter.is_empty() {
|
||||
xml.push_str(&format!("<Delimiter>{}</Delimiter>", xml_escape(&result.delimiter)));
|
||||
xml.push_str(&format!(
|
||||
"<Delimiter>{}</Delimiter>",
|
||||
xml_escape(&result.delimiter)
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(ref token) = result.next_continuation_token {
|
||||
|
||||
@@ -75,6 +75,17 @@ tap.test('should expose disabled cluster health in standalone mode', async () =>
|
||||
expect(clusterHealth.drives).toEqual(undefined);
|
||||
});
|
||||
|
||||
tap.test('should expose health and metrics with auth disabled', async () => {
|
||||
const health = await testSmartStorageInstance.getHealth();
|
||||
expect(health.running).toEqual(true);
|
||||
expect(health.auth.enabled).toEqual(false);
|
||||
expect(health.auth.tenantCredentialCount).toEqual(0);
|
||||
|
||||
const metrics = await testSmartStorageInstance.getMetrics();
|
||||
expect(metrics.tenantCredentialCount).toEqual(0);
|
||||
expect(metrics.prometheusText).toMatch(/smartstorage_tenant_credentials_total 0/);
|
||||
});
|
||||
|
||||
tap.test('should create a bucket', async () => {
|
||||
const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' }));
|
||||
expect(response.$metadata.httpStatusCode).toEqual(200);
|
||||
|
||||
@@ -23,7 +23,10 @@ const STORAGE_DIR = fileURLToPath(new URL('../.nogit/bucket-tenant-tests', impor
|
||||
const WORKAPP_A_BUCKET = 'workapp-a-bucket';
|
||||
const WORKAPP_B_BUCKET = 'workapp-b-bucket';
|
||||
const RESTORE_BUCKET = 'workapp-a-restore-bucket';
|
||||
const CORRUPT_RESTORE_BUCKET = 'workapp-a-corrupt-restore-bucket';
|
||||
const POLICY_BUCKET = 'workapp-policy-bucket';
|
||||
const DUPLICATE_TENANT_BUCKET = 'workapp-duplicate-tenant-bucket';
|
||||
const REVOKE_ONLY_BUCKET = 'workapp-revoke-only-bucket';
|
||||
const ADMIN_CREDENTIAL: smartstorage.IStorageCredential = {
|
||||
accessKeyId: 'TENANTADMIN',
|
||||
secretAccessKey: 'TENANTADMINSECRET123',
|
||||
@@ -94,6 +97,20 @@ async function startStorage() {
|
||||
adminClient = createS3Client(ADMIN_CREDENTIAL);
|
||||
}
|
||||
|
||||
tap.test('bucket tenant client APIs require auth before IPC', async () => {
|
||||
const storage = new smartstorage.SmartStorage({
|
||||
auth: {
|
||||
enabled: false,
|
||||
credentials: [ADMIN_CREDENTIAL],
|
||||
},
|
||||
});
|
||||
|
||||
await expect(storage.listBucketTenants()).rejects.toThrow();
|
||||
await expect(storage.getBucketTenantDescriptor({
|
||||
bucketName: WORKAPP_A_BUCKET,
|
||||
})).rejects.toThrow();
|
||||
});
|
||||
|
||||
tap.test('setup: start storage and provision bucket tenants', async () => {
|
||||
await rm(STORAGE_DIR, { recursive: true, force: true });
|
||||
await startStorage();
|
||||
@@ -129,6 +146,18 @@ tap.test('listBucketTenants returns scoped credential metadata without secrets',
|
||||
expect((tenants[0] as any).secretAccessKey).toEqual(undefined);
|
||||
});
|
||||
|
||||
tap.test('createBucketTenant validates credentials before creating buckets', async () => {
|
||||
await expect(testSmartStorageInstance.createBucketTenant({
|
||||
bucketName: DUPLICATE_TENANT_BUCKET,
|
||||
accessKeyId: ADMIN_CREDENTIAL.accessKeyId,
|
||||
secretAccessKey: 'DUPLICATESECRET123',
|
||||
})).rejects.toThrow();
|
||||
|
||||
await expect(adminClient.send(new HeadBucketCommand({
|
||||
Bucket: DUPLICATE_TENANT_BUCKET,
|
||||
}))).rejects.toThrow();
|
||||
});
|
||||
|
||||
tap.test('tenant credentials work with AWS SDK v3 for their assigned bucket', async () => {
|
||||
const putA = await tenantAClient.send(new PutObjectCommand({
|
||||
Bucket: WORKAPP_A_BUCKET,
|
||||
@@ -227,6 +256,21 @@ tap.test('export/import targets one bucket without unrelated tenant data', async
|
||||
Bucket: RESTORE_BUCKET,
|
||||
}));
|
||||
expect(restoredObjects.Contents?.some((object) => object.Key === 'other.txt')).toEqual(false);
|
||||
|
||||
const corruptExport: smartstorage.IBucketExport = {
|
||||
...bucketExport,
|
||||
objects: bucketExport.objects.map((object, index) => index === 0 ? {
|
||||
...object,
|
||||
size: object.size + 1,
|
||||
} : object),
|
||||
};
|
||||
await expect(testSmartStorageInstance.importBucket({
|
||||
bucketName: CORRUPT_RESTORE_BUCKET,
|
||||
source: corruptExport,
|
||||
})).rejects.toThrow();
|
||||
await expect(adminClient.send(new HeadBucketCommand({
|
||||
Bucket: CORRUPT_RESTORE_BUCKET,
|
||||
}))).rejects.toThrow();
|
||||
});
|
||||
|
||||
tap.test('bucket policies persist across restart', async () => {
|
||||
@@ -301,25 +345,50 @@ tap.test('runtime credentials survive restart', async () => {
|
||||
expect(policyResponse.Policy?.includes('TenantPolicyPersistence')).toEqual(true);
|
||||
});
|
||||
|
||||
tap.test('deleteBucketTenant refuses buckets without tenant credentials', async () => {
|
||||
await expect(testSmartStorageInstance.deleteBucketTenant({
|
||||
bucketName: POLICY_BUCKET,
|
||||
})).rejects.toThrow();
|
||||
|
||||
const headAfterRefusedDelete = await adminClient.send(new HeadBucketCommand({
|
||||
Bucket: POLICY_BUCKET,
|
||||
}));
|
||||
expect(headAfterRefusedDelete.$metadata.httpStatusCode).toEqual(200);
|
||||
});
|
||||
|
||||
tap.test('deleteBucketTenant can revoke credentials and delete tenant buckets', async () => {
|
||||
const revokeOnlyTenant = await testSmartStorageInstance.createBucketTenant({
|
||||
bucketName: REVOKE_ONLY_BUCKET,
|
||||
});
|
||||
const revokeOnlyClient = createS3ClientFromDescriptor(revokeOnlyTenant);
|
||||
await revokeOnlyClient.send(new PutObjectCommand({
|
||||
Bucket: REVOKE_ONLY_BUCKET,
|
||||
Key: 'revoke-only.txt',
|
||||
Body: 'revocation target',
|
||||
}));
|
||||
|
||||
await testSmartStorageInstance.deleteBucketTenant({
|
||||
bucketName: WORKAPP_B_BUCKET,
|
||||
accessKeyId: tenantB.accessKeyId,
|
||||
bucketName: REVOKE_ONLY_BUCKET,
|
||||
accessKeyId: revokeOnlyTenant.accessKeyId,
|
||||
});
|
||||
|
||||
await expect(tenantBClient.send(new GetObjectCommand({
|
||||
Bucket: WORKAPP_B_BUCKET,
|
||||
Key: 'other.txt',
|
||||
await expect(revokeOnlyClient.send(new GetObjectCommand({
|
||||
Bucket: REVOKE_ONLY_BUCKET,
|
||||
Key: 'revoke-only.txt',
|
||||
}))).rejects.toThrow();
|
||||
|
||||
const headAfterRevoke = await adminClient.send(new HeadBucketCommand({
|
||||
Bucket: WORKAPP_B_BUCKET,
|
||||
Bucket: REVOKE_ONLY_BUCKET,
|
||||
}));
|
||||
expect(headAfterRevoke.$metadata.httpStatusCode).toEqual(200);
|
||||
|
||||
await testSmartStorageInstance.deleteBucketTenant({
|
||||
bucketName: WORKAPP_B_BUCKET,
|
||||
});
|
||||
await expect(tenantBClient.send(new GetObjectCommand({
|
||||
Bucket: WORKAPP_B_BUCKET,
|
||||
Key: 'other.txt',
|
||||
}))).rejects.toThrow();
|
||||
await expect(adminClient.send(new HeadBucketCommand({
|
||||
Bucket: WORKAPP_B_BUCKET,
|
||||
}))).rejects.toThrow();
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstorage',
|
||||
version: '6.5.0',
|
||||
version: '6.5.1',
|
||||
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
||||
}
|
||||
|
||||
+3
-1
@@ -591,12 +591,14 @@ export class SmartStorage {
|
||||
}
|
||||
|
||||
public async listBucketTenants(): Promise<IBucketTenantMetadata[]> {
|
||||
this.assertTenantAuthEnabled();
|
||||
return this.bridge.sendCommand('listBucketTenants', {});
|
||||
}
|
||||
|
||||
public async getBucketTenantDescriptor(optionsArg: {
|
||||
bucketName: string;
|
||||
}): Promise<IBucketTenantDescriptor> {
|
||||
this.assertTenantAuthEnabled();
|
||||
const credential = await this.bridge.sendCommand('getBucketTenantCredential', {
|
||||
bucketName: optionsArg.bucketName,
|
||||
});
|
||||
@@ -653,7 +655,7 @@ export class SmartStorage {
|
||||
const [stats, credentials, tenants, cluster] = await Promise.all([
|
||||
this.getStorageStats(),
|
||||
this.listCredentials(),
|
||||
this.listBucketTenants(),
|
||||
this.config.auth.enabled ? this.listBucketTenants() : Promise.resolve([]),
|
||||
this.getClusterHealth(),
|
||||
]);
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user