From 0e9862efcaa238f1957110388f9e834018aafcc0 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 19 Apr 2026 11:57:28 +0000 Subject: [PATCH] feat: enhance storage stats and cluster health reporting - Introduced new data structures for bucket and storage statistics, including BucketSummary, StorageStats, and ClusterHealth. - Implemented runtime statistics tracking for buckets, including object count and total size. - Added methods to retrieve storage stats and bucket summaries in the FileStore. - Enhanced the SmartStorage interface to expose storage stats and cluster health. - Implemented tests for runtime stats, cluster health, and credential management. - Added support for runtime-managed credentials with atomic replacement. - Improved filesystem usage reporting for storage locations. --- changelog.md | 24 ++ readme.hints.md | 14 +- readme.md | 109 +++++++ rust/Cargo.lock | 1 + rust/Cargo.toml | 1 + rust/src/auth.rs | 79 ++++- rust/src/cluster/coordinator.rs | 384 ++++++++++++++++++++-- rust/src/cluster/drive_manager.rs | 56 +++- rust/src/cluster/healing.rs | 64 +++- rust/src/management.rs | 105 ++++++ rust/src/server.rs | 38 ++- rust/src/storage.rs | 523 ++++++++++++++++++++++++++++-- test/test.aws-sdk.node.ts | 118 ++++++- test/test.cluster-health.node.ts | 84 +++++ test/test.credentials.node.ts | 150 +++++++++ ts/index.ts | 138 +++++++- 16 files changed, 1803 insertions(+), 85 deletions(-) create mode 100644 test/test.cluster-health.node.ts create mode 100644 test/test.credentials.node.ts diff --git a/changelog.md b/changelog.md index a632888..f6a3968 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,29 @@ # Changelog +## Next - feat(credentials) +add runtime credential management APIs + +- Expose `listCredentials()` and `replaceCredentials()` through the Rust bridge and the `SmartStorage` TypeScript API. +- Move request authentication onto a native runtime credential store so credential replacement is atomic and effective for new requests immediately without a restart. +- Validate replacement input cleanly by rejecting empty replacement sets, empty credential fields, and duplicate `accessKeyId` values. +- Add runtime credential rotation tests covering initial auth, revocation of old credentials, multiple active credentials, and invalid replacements. + +## Next - feat(cluster-health) +add runtime cluster and drive health introspection + +- Expose `getClusterHealth()` through the Rust bridge and the `SmartStorage` TypeScript API. +- Report native cluster mode state including local node id, peer status, local drive probe health, quorum health, erasure settings, and tracked healing runtime state. +- Return a clear `{ enabled: false }` response when clustering is not active instead of synthesizing config-based data. +- Add standalone and single-node cluster tests plus README documentation for the best-effort semantics of peer and repair health values. + +## Next - feat(stats) +add runtime bucket summaries and storage stats + +- Expose `getStorageStats()` and `listBucketSummaries()` through the Rust bridge and the `SmartStorage` TypeScript API. +- Maintain native runtime stats for bucket counts, object counts, and logical stored bytes, initialized from on-disk state at startup and updated on bucket/object mutations. +- Include cheap filesystem-capacity snapshots for the storage directory or configured cluster drive paths. +- Add AWS SDK integration coverage for object add, delete, and bucket delete stats flows and document the cache consistency semantics in the README. + ## 2026-03-23 - 6.3.2 - fix(docs) update license ownership and correct README license file reference diff --git a/readme.hints.md b/readme.hints.md index 8bebd74..1fe685c 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -11,6 +11,9 @@ - **Bucket policies** (AWS/MinIO-compatible JSON policies, public access support) - CORS support - ListBuckets, ListObjects (v1/v2), CopyObject +- Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans) +- Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state) +- Runtime credential listing and atomic replacement via the Rust bridge ## Architecture @@ -20,6 +23,7 @@ - `management.rs` - IPC loop (newline-delimited JSON over stdin/stdout) - `server.rs` - hyper 1.x HTTP server, routing, CORS, auth+policy pipeline, all S3-compatible handlers - `storage.rs` - FileStore: filesystem-backed storage, multipart manager, `.policies/` dir +- `storage.rs` also owns the runtime stats cache and standalone storage scans used by the bridge stats API - `xml_response.rs` - S3-compatible XML response builders - `error.rs` - StorageError codes with HTTP status mapping - `auth.rs` - AWS SigV4 signature verification (HMAC-SHA256, clock skew, constant-time compare) @@ -37,6 +41,11 @@ | `start` | `{ config: ISmartStorageConfig }` | Init storage + HTTP server | | `stop` | `{}` | Graceful shutdown | | `createBucket` | `{ name: string }` | Create bucket directory | +| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots | +| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries | +| `listCredentials` | `{}` | Return the active runtime auth credential set | +| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set | +| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode | ### Storage Layout - Objects: `{root}/{bucket}/{key}._storage_object` @@ -60,7 +69,10 @@ ## Testing -- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility (10 tests, auth disabled, port 3337) +- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats coverage (18 tests, auth disabled, port 3337) +- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337) +- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349) +- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348) - `test/test.auth.node.ts` - Auth + bucket policy integration (20 tests, auth enabled, port 3344) - `test/test.policy-crud.node.ts` - Policy API CRUD + validation edge cases (17 tests, port 3345) - `test/test.policy-eval.node.ts` - Policy evaluation: principals, actions, resources, deny-vs-allow (22 tests, port 3346) diff --git a/readme.md b/readme.md index 94ca863..3083bb4 100644 --- a/readme.md +++ b/readme.md @@ -32,6 +32,8 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community - ๐Ÿ“‹ **Bucket policies** โ€” IAM-style JSON policies with Allow/Deny evaluation and wildcard matching - ๐ŸŒ **CORS middleware** โ€” configurable cross-origin support - ๐Ÿงน **Clean slate mode** โ€” wipe storage on startup for test isolation +- ๐Ÿ“Š **Runtime storage stats** โ€” cheap bucket summaries and global counts without S3 list scans +- ๐Ÿ”‘ **Runtime credential rotation** โ€” list and replace active auth credentials without mutating internals - โšก **Test-first design** โ€” start/stop in milliseconds, no port conflicts ### Clustering Features @@ -39,6 +41,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community - ๐Ÿ”— **Erasure coding** โ€” Reed-Solomon (configurable k data + m parity shards) for storage efficiency and fault tolerance - ๐Ÿš„ **QUIC transport** โ€” multiplexed, encrypted inter-node communication via `quinn` with zero head-of-line blocking - ๐Ÿ’ฝ **Multi-drive awareness** โ€” each node manages multiple independent storage paths with health monitoring +- ๐Ÿฉบ **Cluster health introspection** โ€” query native node, drive, quorum, and healing status for product dashboards - ๐Ÿค **Cluster membership** โ€” static seed config + runtime join, heartbeat-based failure detection - โœ๏ธ **Quorum writes** โ€” data is only acknowledged after k+1 shards are persisted - ๐Ÿ“– **Quorum reads** โ€” reconstruct from any k available shards, local-first fast path @@ -201,6 +204,112 @@ const storage = await SmartStorage.createAndStart({ }); ``` +## Runtime Credentials + +```typescript +const credentials = await storage.listCredentials(); + +await storage.replaceCredentials([ + { + accessKeyId: 'ADMINA', + secretAccessKey: 'super-secret-a', + }, + { + accessKeyId: 'ADMINB', + secretAccessKey: 'super-secret-b', + }, +]); +``` + +```typescript +interface IStorageCredential { + accessKeyId: string; + secretAccessKey: string; +} +``` + +- `listCredentials()` returns the Rust core's current runtime credential set. +- `replaceCredentials()` swaps the full set atomically. On success, new requests use the new set immediately and the old credentials stop authenticating immediately. +- Requests that were already authenticated before the replacement keep running; auth is evaluated when each request starts. +- No restart is required. +- Replacement input must contain at least one credential, each `accessKeyId` and `secretAccessKey` must be non-empty, and `accessKeyId` values must be unique. + +## Runtime Stats + +```typescript +const stats = await storage.getStorageStats(); +const bucketSummaries = await storage.listBucketSummaries(); + +console.log(stats.bucketCount); +console.log(stats.totalObjectCount); +console.log(stats.totalStorageBytes); +console.log(bucketSummaries[0]?.name, bucketSummaries[0]?.objectCount); +``` + +```typescript +interface IBucketSummary { + name: string; + objectCount: number; + totalSizeBytes: number; + creationDate?: number; +} + +interface IStorageLocationSummary { + path: string; + totalBytes?: number; + availableBytes?: number; + usedBytes?: number; +} + +interface IStorageStats { + bucketCount: number; + totalObjectCount: number; + totalStorageBytes: number; + buckets: IBucketSummary[]; + storageDirectory: string; + storageLocations?: IStorageLocationSummary[]; +} +``` + +- `bucketCount`, `totalObjectCount`, `totalStorageBytes`, and per-bucket totals are logical object stats maintained by the Rust runtime. They count object payload bytes, not sidecar files or erasure-coded shard overhead. +- smartstorage initializes these values from native on-disk state at startup, then keeps them in memory and updates them when bucket/object mutations succeed. Stats reads do not issue S3 `ListObjects` or rescan every object. +- Values are exact for mutations performed through smartstorage after startup. Direct filesystem edits outside smartstorage are not watched; restart the server to resync. +- `storageLocations` is a cheap filesystem-capacity snapshot. Standalone mode reports the storage directory. Cluster mode reports the configured drive paths. + +## Cluster Health + +```typescript +const clusterHealth = await storage.getClusterHealth(); + +if (!clusterHealth.enabled) { + console.log('Cluster mode is disabled'); +} else { + console.log(clusterHealth.nodeId, clusterHealth.quorumHealthy); + console.log(clusterHealth.peers); + console.log(clusterHealth.drives); +} +``` + +```typescript +interface IClusterHealth { + enabled: boolean; + nodeId?: string; + quorumHealthy?: boolean; + majorityHealthy?: boolean; + peers?: IClusterPeerHealth[]; + drives?: IClusterDriveHealth[]; + erasure?: IClusterErasureHealth; + repairs?: IClusterRepairHealth; +} +``` + +- `getClusterHealth()` is served by the Rust core. The TypeScript wrapper does not infer values from static config. +- Standalone mode returns `{ enabled: false }`. +- Peer status is the local node's current view of cluster membership and heartbeats, so it is best-effort and may lag real network state. +- Drive health is based on live native probe checks on the configured local drive paths. Capacity values are cheap filesystem snapshots. +- `quorumHealthy` means the local node currently sees majority quorum and enough available placements in every erasure set to satisfy the configured write quorum. +- Repair fields expose the background healer's currently available runtime state. They are best-effort and limited to what the engine tracks today, such as whether a scan is active, the last completed run, and the last error. + ## Usage with AWS SDK v3 ```typescript diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 479127e..26a5277 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1346,6 +1346,7 @@ dependencies = [ "http-body-util", "hyper", "hyper-util", + "libc", "md-5", "percent-encoding", "quick-xml", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index cb74b80..ff5bbec 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -41,3 +41,4 @@ dashmap = "6" hmac = "0.12" sha2 = "0.10" hex = "0.4" +libc = "0.2" diff --git a/rust/src/auth.rs b/rust/src/auth.rs index ba374cf..cc853a6 100644 --- a/rust/src/auth.rs +++ b/rust/src/auth.rs @@ -2,9 +2,10 @@ use hmac::{Hmac, Mac}; use hyper::body::Incoming; use hyper::Request; use sha2::{Digest, Sha256}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use tokio::sync::RwLock; -use crate::config::{Credential, SmartStorageConfig}; +use crate::config::{AuthConfig, Credential}; use crate::error::StorageError; type HmacSha256 = Hmac; @@ -27,7 +28,7 @@ struct SigV4Header { /// Verify the request's SigV4 signature. Returns the caller identity on success. pub fn verify_request( req: &Request, - config: &SmartStorageConfig, + credentials: &[Credential], ) -> Result { let auth_header = req .headers() @@ -47,7 +48,7 @@ pub fn verify_request( let parsed = parse_auth_header(auth_header)?; // Look up credential - let credential = find_credential(&parsed.access_key_id, config) + let credential = find_credential(&parsed.access_key_id, credentials) .ok_or_else(StorageError::invalid_access_key_id)?; // Get x-amz-date @@ -163,14 +164,76 @@ fn parse_auth_header(header: &str) -> Result { } /// Find a credential by access key ID. -fn find_credential<'a>(access_key_id: &str, config: &'a SmartStorageConfig) -> Option<&'a Credential> { - config - .auth - .credentials +fn find_credential<'a>(access_key_id: &str, credentials: &'a [Credential]) -> Option<&'a Credential> { + credentials .iter() .find(|c| c.access_key_id == access_key_id) } +#[derive(Debug)] +pub struct RuntimeCredentialStore { + enabled: bool, + credentials: RwLock>, +} + +impl RuntimeCredentialStore { + pub fn new(config: &AuthConfig) -> Self { + Self { + enabled: config.enabled, + credentials: RwLock::new(config.credentials.clone()), + } + } + + pub fn enabled(&self) -> bool { + self.enabled + } + + pub async fn list_credentials(&self) -> Vec { + self.credentials.read().await.clone() + } + + pub async fn snapshot_credentials(&self) -> Vec { + self.credentials.read().await.clone() + } + + pub async fn replace_credentials(&self, credentials: Vec) -> Result<(), StorageError> { + validate_credentials(&credentials)?; + *self.credentials.write().await = credentials; + Ok(()) + } +} + +fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> { + if credentials.is_empty() { + return Err(StorageError::invalid_request( + "Credential replacement requires at least one credential.", + )); + } + + let mut seen_access_keys = HashSet::new(); + for credential in credentials { + if credential.access_key_id.trim().is_empty() { + return Err(StorageError::invalid_request( + "Credential accessKeyId must not be empty.", + )); + } + + if credential.secret_access_key.trim().is_empty() { + return Err(StorageError::invalid_request( + "Credential secretAccessKey must not be empty.", + )); + } + + if !seen_access_keys.insert(credential.access_key_id.as_str()) { + return Err(StorageError::invalid_request( + "Credential accessKeyId values must be unique.", + )); + } + } + + Ok(()) +} + /// Check clock skew (15 minutes max). fn check_clock_skew(amz_date: &str) -> Result<(), StorageError> { // Parse ISO 8601 basic format: YYYYMMDDTHHMMSSZ diff --git a/rust/src/cluster/coordinator.rs b/rust/src/cluster/coordinator.rs index b426eeb..d70b849 100644 --- a/rust/src/cluster/coordinator.rs +++ b/rust/src/cluster/coordinator.rs @@ -8,18 +8,24 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::fs; +use tokio::sync::{Mutex, RwLock}; use super::config::ErasureConfig; +use super::drive_manager::{DriveManager, DriveState, DriveStatus}; use super::erasure::ErasureCoder; +use super::healing::HealingRuntimeState; use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement}; use super::placement::ErasureSet; use super::protocol::{ClusterRequest, ShardDeleteRequest, ShardReadRequest, ShardWriteRequest}; use super::quic_transport::QuicTransport; use super::shard_store::{ShardId, ShardStore}; -use super::state::ClusterState; +use super::state::{ClusterState, NodeStatus}; use crate::storage::{ - BucketInfo, CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry, - ListObjectsResult, MultipartUploadInfo, PutResult, + storage_location_summary, BucketInfo, BucketSummary, ClusterDriveHealth, + ClusterErasureHealth, ClusterHealth, ClusterPeerHealth, ClusterRepairHealth, + CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry, + ListObjectsResult, MultipartUploadInfo, PutResult, RuntimeBucketStats, + RuntimeStatsState, StorageLocationSummary, StorageStats, }; use serde::{Deserialize, Serialize}; @@ -53,6 +59,10 @@ pub struct DistributedStore { state: Arc, transport: Arc, erasure_coder: ErasureCoder, + storage_dir: PathBuf, + drive_paths: Vec, + drive_manager: Arc>, + healing_runtime: Arc>, /// Local shard stores, one per drive. Index = drive index. local_shard_stores: Vec>, /// Root directory for manifests on this node @@ -62,6 +72,7 @@ pub struct DistributedStore { /// Root directory for bucket policies policies_dir: PathBuf, erasure_config: ErasureConfig, + runtime_stats: RwLock, } impl DistributedStore { @@ -69,7 +80,10 @@ impl DistributedStore { state: Arc, transport: Arc, erasure_config: ErasureConfig, + storage_dir: PathBuf, drive_paths: Vec, + drive_manager: Arc>, + healing_runtime: Arc>, manifest_dir: PathBuf, buckets_dir: PathBuf, ) -> Result { @@ -86,11 +100,16 @@ impl DistributedStore { state, transport, erasure_coder, + storage_dir, + drive_paths, + drive_manager, + healing_runtime, local_shard_stores, manifest_dir, buckets_dir, policies_dir, erasure_config, + runtime_stats: RwLock::new(RuntimeStatsState::default()), }) } @@ -99,6 +118,80 @@ impl DistributedStore { self.policies_dir.clone() } + pub async fn initialize_runtime_stats(&self) { + let buckets = match self.list_buckets().await { + Ok(buckets) => buckets, + Err(error) => { + tracing::warn!(path = %self.storage_dir.display(), error = %error, "Failed to initialize distributed runtime stats"); + return; + } + }; + + let mut runtime_buckets = HashMap::new(); + for bucket in buckets { + let manifest_bucket_dir = self.manifest_dir.join(&bucket.name); + let (object_count, total_size_bytes) = self + .scan_bucket_manifests(&bucket.name, &manifest_bucket_dir) + .await; + runtime_buckets.insert( + bucket.name, + RuntimeBucketStats { + object_count, + total_size_bytes, + creation_date: Some(bucket.creation_date), + }, + ); + } + + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.replace_buckets(runtime_buckets); + } + + pub async fn get_storage_stats(&self) -> Result { + let runtime_stats = self.runtime_stats.read().await; + Ok(runtime_stats.snapshot(&self.storage_dir, self.storage_locations())) + } + + pub async fn list_bucket_summaries(&self) -> Result> { + let runtime_stats = self.runtime_stats.read().await; + Ok(runtime_stats.bucket_summaries()) + } + + pub async fn get_cluster_health(&self) -> Result { + let nodes = self.state.all_nodes().await; + let erasure_sets = self.state.erasure_sets().await; + let majority_healthy = self.state.has_majority().await; + + let mut drive_manager = self.drive_manager.lock().await; + drive_manager.check_all_drives().await; + let drive_states = drive_manager.snapshot(); + drop(drive_manager); + + let peers = self.peer_health(&nodes); + let drives = self.drive_health(&drive_states, &erasure_sets); + let repairs = self.repair_health().await; + let quorum_healthy = majority_healthy && self.quorum_is_healthy(&nodes, &drive_states, &erasure_sets); + + Ok(ClusterHealth { + enabled: true, + node_id: Some(self.state.local_node_id().to_string()), + quorum_healthy: Some(quorum_healthy), + majority_healthy: Some(majority_healthy), + peers: Some(peers), + drives: Some(drives), + erasure: Some(ClusterErasureHealth { + data_shards: self.erasure_config.data_shards, + parity_shards: self.erasure_config.parity_shards, + chunk_size_bytes: self.erasure_config.chunk_size_bytes, + total_shards: self.erasure_config.total_shards(), + read_quorum: self.erasure_config.read_quorum(), + write_quorum: self.erasure_config.write_quorum(), + erasure_set_count: erasure_sets.len(), + }), + repairs: Some(repairs), + }) + } + // ============================ // Object operations // ============================ @@ -114,6 +207,8 @@ impl DistributedStore { return Err(crate::error::StorageError::no_such_bucket().into()); } + let previous_size = self.manifest_size_if_exists(bucket, key).await; + let erasure_set = self .state .get_erasure_set_for_object(bucket, key) @@ -139,8 +234,7 @@ impl DistributedStore { // Process complete chunks while chunk_buffer.len() >= chunk_size { - let chunk_data: Vec = - chunk_buffer.drain(..chunk_size).collect(); + let chunk_data: Vec = chunk_buffer.drain(..chunk_size).collect(); let chunk_manifest = self .encode_and_distribute_chunk( &erasure_set, @@ -191,6 +285,8 @@ impl DistributedStore { }; self.store_manifest(&manifest).await?; + self.track_object_upsert(bucket, previous_size, total_size) + .await; Ok(PutResult { md5: md5_hex }) } @@ -281,6 +377,7 @@ impl DistributedStore { } pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> { + let existing_size = self.manifest_size_if_exists(bucket, key).await; // Load manifest to find all shards if let Ok(manifest) = self.load_manifest(bucket, key).await { let local_id = self.state.local_node_id().to_string(); @@ -328,6 +425,7 @@ impl DistributedStore { // Delete manifest self.delete_manifest(bucket, key).await?; + self.track_object_deleted(bucket, existing_size).await; Ok(()) } @@ -351,6 +449,8 @@ impl DistributedStore { src_manifest.metadata.clone() }; + let previous_size = self.manifest_size_if_exists(dest_bucket, dest_key).await; + // Read source object fully, then reconstruct let mut full_data = Vec::new(); for chunk in &src_manifest.chunks { @@ -414,6 +514,8 @@ impl DistributedStore { }; self.store_manifest(&manifest).await?; + self.track_object_upsert(dest_bucket, previous_size, manifest.size) + .await; Ok(CopyResult { md5: md5_hex, @@ -468,11 +570,7 @@ impl DistributedStore { if !delimiter.is_empty() { let remaining = &key[prefix.len()..]; if let Some(delim_idx) = remaining.find(delimiter) { - let cp = format!( - "{}{}", - prefix, - &remaining[..delim_idx + delimiter.len()] - ); + let cp = format!("{}{}", prefix, &remaining[..delim_idx + delimiter.len()]); if common_prefix_set.insert(cp.clone()) { common_prefixes.push(cp); } @@ -560,6 +658,7 @@ impl DistributedStore { // Also create manifest bucket dir let manifest_bucket = self.manifest_dir.join(bucket); fs::create_dir_all(&manifest_bucket).await?; + self.track_bucket_created(bucket).await; Ok(()) } @@ -578,6 +677,7 @@ impl DistributedStore { } let _ = fs::remove_dir_all(&bucket_path).await; let _ = fs::remove_dir_all(&manifest_bucket).await; + self.track_bucket_deleted(bucket).await; Ok(()) } @@ -643,7 +743,10 @@ impl DistributedStore { let mut hasher = Md5::new(); // Use upload_id + part_number as a unique key prefix for shard storage - let part_key = format!("{}/_multipart/{}/part-{}", session.key, upload_id, part_number); + let part_key = format!( + "{}/_multipart/{}/part-{}", + session.key, upload_id, part_number + ); let mut body = body; loop { @@ -655,8 +758,7 @@ impl DistributedStore { chunk_buffer.extend_from_slice(&data); while chunk_buffer.len() >= chunk_size { - let chunk_data: Vec = - chunk_buffer.drain(..chunk_size).collect(); + let chunk_data: Vec = chunk_buffer.drain(..chunk_size).collect(); let chunk_manifest = self .encode_and_distribute_chunk( &erasure_set, @@ -717,6 +819,9 @@ impl DistributedStore { ) -> Result { let session = self.load_multipart_session(upload_id).await?; let upload_dir = self.multipart_dir().join(upload_id); + let previous_size = self + .manifest_size_if_exists(&session.bucket, &session.key) + .await; // Read per-part manifests and concatenate chunks sequentially let mut all_chunks = Vec::new(); @@ -777,6 +882,8 @@ impl DistributedStore { }; self.store_manifest(&manifest).await?; + self.track_object_upsert(&session.bucket, previous_size, manifest.size) + .await; // Clean up multipart upload directory let _ = fs::remove_dir_all(&upload_dir).await; @@ -809,9 +916,10 @@ impl DistributedStore { chunk_index: chunk.chunk_index, shard_index: placement.shard_index, }; - if let Some(store) = self.local_shard_stores.get( - placement.drive_id.parse::().unwrap_or(0), - ) { + if let Some(store) = self + .local_shard_stores + .get(placement.drive_id.parse::().unwrap_or(0)) + { let _ = store.delete_shard(&shard_id).await; } } else { @@ -837,10 +945,7 @@ impl DistributedStore { Ok(()) } - pub async fn list_multipart_uploads( - &self, - bucket: &str, - ) -> Result> { + pub async fn list_multipart_uploads(&self, bucket: &str) -> Result> { let multipart_dir = self.multipart_dir(); if !multipart_dir.is_dir() { return Ok(Vec::new()); @@ -883,6 +988,236 @@ impl DistributedStore { Ok(serde_json::from_str(&content)?) } + fn storage_locations(&self) -> Vec { + self.drive_paths + .iter() + .map(|path| storage_location_summary(path)) + .collect() + } + + fn node_status_label(status: &NodeStatus) -> String { + match status { + NodeStatus::Online => "online".to_string(), + NodeStatus::Suspect => "suspect".to_string(), + NodeStatus::Offline => "offline".to_string(), + } + } + + fn drive_status_label(status: &DriveStatus) -> String { + match status { + DriveStatus::Online => "online".to_string(), + DriveStatus::Degraded => "degraded".to_string(), + DriveStatus::Offline => "offline".to_string(), + DriveStatus::Healing => "healing".to_string(), + } + } + + fn peer_health(&self, nodes: &[super::state::NodeState]) -> Vec { + let local_node_id = self.state.local_node_id(); + let mut peers: Vec = nodes + .iter() + .filter(|node| node.info.node_id != local_node_id) + .map(|node| ClusterPeerHealth { + node_id: node.info.node_id.clone(), + status: Self::node_status_label(&node.status), + quic_address: Some(node.info.quic_addr.clone()), + s3_address: Some(node.info.s3_addr.clone()), + drive_count: Some(node.info.drive_count), + last_heartbeat: Some(node.last_heartbeat.timestamp_millis()), + missed_heartbeats: Some(node.missed_heartbeats), + }) + .collect(); + peers.sort_by(|a, b| a.node_id.cmp(&b.node_id)); + peers + } + + fn drive_health(&self, drive_states: &[DriveState], erasure_sets: &[ErasureSet]) -> Vec { + let local_node_id = self.state.local_node_id(); + let mut drive_to_set = HashMap::new(); + for erasure_set in erasure_sets { + for drive in &erasure_set.drives { + if drive.node_id == local_node_id { + drive_to_set.insert(drive.drive_index as usize, erasure_set.set_id); + } + } + } + + drive_states + .iter() + .enumerate() + .map(|(index, drive)| ClusterDriveHealth { + index: index as u32, + path: drive.path.to_string_lossy().to_string(), + status: Self::drive_status_label(&drive.status), + total_bytes: (drive.stats.total_bytes > 0).then_some(drive.stats.total_bytes), + used_bytes: (drive.stats.total_bytes > 0).then_some(drive.stats.used_bytes), + available_bytes: (drive.stats.total_bytes > 0) + .then_some(drive.stats.available_bytes), + error_count: Some(drive.stats.error_count), + last_error: drive.stats.last_error.clone(), + last_check: Some(drive.stats.last_check.timestamp_millis()), + erasure_set_id: drive_to_set.get(&index).copied(), + }) + .collect() + } + + async fn repair_health(&self) -> ClusterRepairHealth { + let runtime_state = self.healing_runtime.read().await; + ClusterRepairHealth { + active: runtime_state.active, + scan_interval_ms: Some(runtime_state.scan_interval_ms), + last_run_started_at: runtime_state + .last_run_started_at + .as_ref() + .map(|timestamp| timestamp.timestamp_millis()), + last_run_completed_at: runtime_state + .last_run_completed_at + .as_ref() + .map(|timestamp| timestamp.timestamp_millis()), + last_duration_ms: runtime_state.last_duration_ms, + shards_checked: runtime_state + .last_stats + .as_ref() + .map(|stats| stats.shards_checked), + shards_healed: runtime_state + .last_stats + .as_ref() + .map(|stats| stats.shards_healed), + failed: runtime_state.last_stats.as_ref().map(|stats| stats.errors), + last_error: runtime_state.last_error.clone(), + } + } + + fn quorum_is_healthy( + &self, + nodes: &[super::state::NodeState], + drive_states: &[DriveState], + erasure_sets: &[ErasureSet], + ) -> bool { + if erasure_sets.is_empty() { + return false; + } + + let local_node_id = self.state.local_node_id(); + let node_statuses: HashMap<&str, &NodeStatus> = nodes + .iter() + .map(|node| (node.info.node_id.as_str(), &node.status)) + .collect(); + + erasure_sets.iter().all(|erasure_set| { + let available = erasure_set + .drives + .iter() + .filter(|drive| { + if drive.node_id == local_node_id { + return drive_states + .get(drive.drive_index as usize) + .map(|drive_state| !matches!(drive_state.status, DriveStatus::Offline)) + .unwrap_or(false); + } + + matches!(node_statuses.get(drive.node_id.as_str()), Some(NodeStatus::Online)) + }) + .count(); + + available >= self.erasure_config.write_quorum() + }) + } + + async fn scan_bucket_manifests( + &self, + bucket: &str, + manifest_bucket_dir: &std::path::Path, + ) -> (u64, u64) { + let mut object_count = 0u64; + let mut total_size_bytes = 0u64; + let mut directories = vec![manifest_bucket_dir.to_path_buf()]; + + while let Some(directory) = directories.pop() { + let mut entries = match fs::read_dir(&directory).await { + Ok(entries) => entries, + Err(_) => continue, + }; + + while let Ok(Some(entry)) = entries.next_entry().await { + let metadata = match entry.metadata().await { + Ok(metadata) => metadata, + Err(error) => { + tracing::warn!(bucket = bucket, error = %error, "Failed to read manifest entry metadata for runtime stats"); + continue; + } + }; + + if metadata.is_dir() { + directories.push(entry.path()); + continue; + } + + let name = entry.file_name().to_string_lossy().to_string(); + if !name.ends_with(".manifest.json") { + continue; + } + + match fs::read_to_string(entry.path()).await { + Ok(content) => match serde_json::from_str::(&content) { + Ok(manifest) => { + object_count += 1; + total_size_bytes += manifest.size; + } + Err(error) => { + tracing::warn!(bucket = bucket, error = %error, "Failed to parse manifest for runtime stats"); + } + }, + Err(error) => { + tracing::warn!(bucket = bucket, error = %error, "Failed to read manifest for runtime stats"); + } + } + } + } + + (object_count, total_size_bytes) + } + + async fn bucket_creation_date(&self, bucket: &str) -> Option> { + let metadata = fs::metadata(self.buckets_dir.join(bucket)).await.ok()?; + let created_or_modified = metadata.created().unwrap_or( + metadata + .modified() + .unwrap_or(std::time::SystemTime::UNIX_EPOCH), + ); + Some(created_or_modified.into()) + } + + async fn manifest_size_if_exists(&self, bucket: &str, key: &str) -> Option { + self.load_manifest(bucket, key) + .await + .ok() + .map(|manifest| manifest.size) + } + + async fn track_bucket_created(&self, bucket: &str) { + let creation_date = self.bucket_creation_date(bucket).await; + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.ensure_bucket(bucket, creation_date); + } + + async fn track_bucket_deleted(&self, bucket: &str) { + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.remove_bucket(bucket); + } + + async fn track_object_upsert(&self, bucket: &str, previous_size: Option, new_size: u64) { + let creation_date = self.bucket_creation_date(bucket).await; + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.ensure_bucket(bucket, creation_date); + runtime_stats.upsert_object(bucket, previous_size, new_size); + } + + async fn track_object_deleted(&self, bucket: &str, existing_size: Option) { + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.remove_object(bucket, existing_size); + } + // ============================ // Internal: erasure encode + distribute // ============================ @@ -920,12 +1255,13 @@ impl DistributedStore { let result = if drive.node_id == self.state.local_node_id() { // Local write - if let Some(store) = - self.local_shard_stores.get(drive.drive_index as usize) - { + if let Some(store) = self.local_shard_stores.get(drive.drive_index as usize) { store.write_shard(&shard_id, shard_data, checksum).await } else { - Err(anyhow::anyhow!("Local drive {} not found", drive.drive_index)) + Err(anyhow::anyhow!( + "Local drive {} not found", + drive.drive_index + )) } } else { // Remote write via QUIC diff --git a/rust/src/cluster/drive_manager.rs b/rust/src/cluster/drive_manager.rs index 8201404..7b9a724 100644 --- a/rust/src/cluster/drive_manager.rs +++ b/rust/src/cluster/drive_manager.rs @@ -1,9 +1,9 @@ +use super::config::DriveConfig; use anyhow::Result; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use tokio::fs; -use super::config::DriveConfig; // ============================ // Drive format (on-disk metadata) @@ -33,6 +33,7 @@ pub enum DriveStatus { #[derive(Debug, Clone)] pub struct DriveStats { pub total_bytes: u64, + pub available_bytes: u64, pub used_bytes: u64, pub avg_write_latency_us: u64, pub avg_read_latency_us: u64, @@ -45,6 +46,7 @@ impl Default for DriveStats { fn default() -> Self { Self { total_bytes: 0, + available_bytes: 0, used_bytes: 0, avg_write_latency_us: 0, avg_read_latency_us: 0, @@ -55,7 +57,7 @@ impl Default for DriveStats { } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DriveState { pub path: PathBuf, pub format: Option, @@ -74,10 +76,15 @@ pub struct DriveManager { impl DriveManager { /// Initialize drive manager with configured drive paths. pub async fn new(config: &DriveConfig) -> Result { - let mut drives = Vec::with_capacity(config.paths.len()); + let paths: Vec = config.paths.iter().map(PathBuf::from).collect(); + Self::from_paths(&paths).await + } - for path_str in &config.paths { - let path = PathBuf::from(path_str); + /// Initialize drive manager from an explicit list of resolved paths. + pub async fn from_paths(paths: &[PathBuf]) -> Result { + let mut drives = Vec::with_capacity(paths.len()); + + for path in paths { let storage_dir = path.join(".smartstorage"); // Ensure the drive directory exists @@ -92,7 +99,7 @@ impl DriveManager { }; drives.push(DriveState { - path, + path: path.clone(), format, status, stats: DriveStats::default(), @@ -154,6 +161,11 @@ impl DriveManager { &self.drives } + /// Get a cloneable snapshot of current drive states. + pub fn snapshot(&self) -> Vec { + self.drives.clone() + } + /// Get drives that are online. pub fn online_drives(&self) -> Vec { self.drives @@ -203,6 +215,11 @@ impl DriveManager { let _ = fs::remove_file(&probe_path).await; let latency = start.elapsed(); + if let Some((total_bytes, available_bytes, used_bytes)) = filesystem_usage(&drive.path) { + drive.stats.total_bytes = total_bytes; + drive.stats.available_bytes = available_bytes; + drive.stats.used_bytes = used_bytes; + } drive.stats.avg_write_latency_us = latency.as_micros() as u64; drive.stats.last_check = Utc::now(); @@ -240,3 +257,30 @@ impl DriveManager { serde_json::from_str(&content).ok() } } + +#[cfg(unix)] +fn filesystem_usage(path: &Path) -> Option<(u64, u64, u64)> { + use std::ffi::CString; + use std::os::unix::ffi::OsStrExt; + + let path_bytes = path.as_os_str().as_bytes(); + let c_path = CString::new(path_bytes).ok()?; + let mut stat: libc::statvfs = unsafe { std::mem::zeroed() }; + + if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 { + return None; + } + + let block_size = stat.f_frsize as u64; + let total_bytes = stat.f_blocks as u64 * block_size; + let available_bytes = stat.f_bavail as u64 * block_size; + let free_bytes = stat.f_bfree as u64 * block_size; + let used_bytes = total_bytes.saturating_sub(free_bytes); + + Some((total_bytes, available_bytes, used_bytes)) +} + +#[cfg(not(unix))] +fn filesystem_usage(_path: &Path) -> Option<(u64, u64, u64)> { + None +} diff --git a/rust/src/cluster/healing.rs b/rust/src/cluster/healing.rs index 51b1b35..335172b 100644 --- a/rust/src/cluster/healing.rs +++ b/rust/src/cluster/healing.rs @@ -1,8 +1,10 @@ use anyhow::Result; +use chrono::{DateTime, Utc}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::fs; +use tokio::sync::RwLock; use super::config::ErasureConfig; use super::erasure::ErasureCoder; @@ -18,6 +20,7 @@ pub struct HealingService { local_shard_stores: Vec>, manifest_dir: PathBuf, scan_interval: Duration, + runtime_state: Arc>, } impl HealingService { @@ -27,16 +30,27 @@ impl HealingService { local_shard_stores: Vec>, manifest_dir: PathBuf, scan_interval_hours: u64, + runtime_state: Arc>, ) -> Result { + let scan_interval = Duration::from_secs(scan_interval_hours * 3600); + if let Ok(mut state_guard) = runtime_state.try_write() { + state_guard.scan_interval_ms = scan_interval.as_millis() as u64; + } + Ok(Self { state, erasure_coder: ErasureCoder::new(erasure_config)?, local_shard_stores, manifest_dir, - scan_interval: Duration::from_secs(scan_interval_hours * 3600), + scan_interval, + runtime_state, }) } + pub fn runtime_state(&self) -> Arc> { + self.runtime_state.clone() + } + /// Run the healing loop as a background task. pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver) { let mut interval = tokio::time::interval(self.scan_interval); @@ -47,9 +61,12 @@ impl HealingService { loop { tokio::select! { _ = interval.tick() => { + let started_at = Utc::now(); + self.mark_healing_started(started_at).await; tracing::info!("Starting healing scan"); match self.heal_scan().await { Ok(stats) => { + self.mark_healing_finished(started_at, Some(stats.clone()), None).await; tracing::info!( checked = stats.shards_checked, healed = stats.shards_healed, @@ -58,6 +75,7 @@ impl HealingService { ); } Err(e) => { + self.mark_healing_finished(started_at, None, Some(e.to_string())).await; tracing::error!("Healing scan failed: {}", e); } } @@ -70,6 +88,37 @@ impl HealingService { } } + async fn mark_healing_started(&self, started_at: DateTime) { + let mut runtime_state = self.runtime_state.write().await; + runtime_state.active = true; + runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64; + runtime_state.last_run_started_at = Some(started_at); + runtime_state.last_error = None; + } + + async fn mark_healing_finished( + &self, + started_at: DateTime, + stats: Option, + last_error: Option, + ) { + let finished_at = Utc::now(); + let mut runtime_state = self.runtime_state.write().await; + runtime_state.active = false; + runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64; + runtime_state.last_run_completed_at = Some(finished_at); + runtime_state.last_duration_ms = Some( + finished_at + .signed_duration_since(started_at) + .num_milliseconds() + .max(0) as u64, + ); + if let Some(stats) = stats { + runtime_state.last_stats = Some(stats); + } + runtime_state.last_error = last_error; + } + /// Scan all manifests for shards on offline nodes, reconstruct and re-place them. async fn heal_scan(&self) -> Result { let mut stats = HealStats::default(); @@ -348,9 +397,20 @@ impl HealingService { } } -#[derive(Debug, Default)] +#[derive(Debug, Clone, Default)] pub struct HealStats { pub shards_checked: u64, pub shards_healed: u64, pub errors: u64, } + +#[derive(Debug, Clone, Default)] +pub struct HealingRuntimeState { + pub active: bool, + pub scan_interval_ms: u64, + pub last_run_started_at: Option>, + pub last_run_completed_at: Option>, + pub last_duration_ms: Option, + pub last_stats: Option, + pub last_error: Option, +} diff --git a/rust/src/management.rs b/rust/src/management.rs index 129f184..397a4be 100644 --- a/rust/src/management.rs +++ b/rust/src/management.rs @@ -4,6 +4,7 @@ use serde_json::Value; use std::io::Write; use tokio::io::{AsyncBufReadExt, BufReader}; +use crate::config::Credential; use crate::config::SmartStorageConfig; use crate::server::StorageServer; @@ -140,6 +141,110 @@ pub async fn management_loop() -> Result<()> { } } } + "getStorageStats" => { + if let Some(ref s) = server { + match s.store().get_storage_stats().await { + Ok(stats) => match serde_json::to_value(stats) { + Ok(value) => send_response(id, value), + Err(error) => { + send_error( + id, + format!("Failed to serialize storage stats: {}", error), + ); + } + }, + Err(error) => { + send_error(id, format!("Failed to get storage stats: {}", error)); + } + } + } else { + send_error(id, "Server not started".to_string()); + } + } + "listBucketSummaries" => { + if let Some(ref s) = server { + match s.store().list_bucket_summaries().await { + Ok(summaries) => match serde_json::to_value(summaries) { + Ok(value) => send_response(id, value), + Err(error) => { + send_error( + id, + format!("Failed to serialize bucket summaries: {}", error), + ); + } + }, + Err(error) => { + send_error(id, format!("Failed to list bucket summaries: {}", error)); + } + } + } else { + send_error(id, "Server not started".to_string()); + } + } + "listCredentials" => { + if let Some(ref s) = server { + match serde_json::to_value(s.list_credentials().await) { + Ok(value) => send_response(id, value), + Err(error) => { + send_error( + id, + format!("Failed to serialize credentials: {}", error), + ); + } + } + } else { + send_error(id, "Server not started".to_string()); + } + } + "replaceCredentials" => { + #[derive(Deserialize)] + struct ReplaceCredentialsParams { + credentials: Vec, + } + + match serde_json::from_value::(req.params) { + Ok(params) => { + if let Some(ref s) = server { + match s.replace_credentials(params.credentials).await { + Ok(()) => { + send_response(id, serde_json::json!({})); + } + Err(error) => { + send_error( + id, + format!("Failed to replace credentials: {}", error), + ); + } + } + } else { + send_error(id, "Server not started".to_string()); + } + } + Err(error) => { + send_error(id, format!("Invalid replaceCredentials params: {}", error)); + } + } + } + "getClusterHealth" => { + if let Some(ref s) = server { + match s.store().get_cluster_health().await { + Ok(health) => match serde_json::to_value(health) { + Ok(value) => send_response(id, value), + Err(error) => { + send_error( + id, + format!("Failed to serialize cluster health: {}", error), + ); + } + }, + Err(error) => { + send_error(id, format!("Failed to get cluster health: {}", error)); + } + } + } else { + send_error(id, "Server not started".to_string()); + } + } "clusterStatus" => { send_response( id, diff --git a/rust/src/server.rs b/rust/src/server.rs index 5764e5d..b4a50ba 100644 --- a/rust/src/server.rs +++ b/rust/src/server.rs @@ -37,12 +37,14 @@ use crate::xml_response; pub struct StorageServer { store: Arc, + auth_runtime: Arc, shutdown_tx: watch::Sender, server_handle: tokio::task::JoinHandle<()>, } impl StorageServer { pub async fn start(config: SmartStorageConfig) -> Result { + let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth)); let store: Arc = if let Some(ref cluster_config) = config.cluster { if cluster_config.enabled { Self::start_clustered(&config, cluster_config).await? @@ -65,6 +67,7 @@ impl StorageServer { let server_store = store.clone(); let server_config = config.clone(); + let server_auth_runtime = auth_runtime.clone(); let server_policy_store = policy_store.clone(); let server_handle = tokio::spawn(async move { @@ -78,15 +81,17 @@ impl StorageServer { let io = TokioIo::new(stream); let store = server_store.clone(); let cfg = server_config.clone(); + let auth_runtime = server_auth_runtime.clone(); let ps = server_policy_store.clone(); tokio::spawn(async move { let svc = service_fn(move |req: Request| { let store = store.clone(); let cfg = cfg.clone(); + let auth_runtime = auth_runtime.clone(); let ps = ps.clone(); async move { - handle_request(req, store, cfg, ps).await + handle_request(req, store, cfg, auth_runtime, ps).await } }); @@ -119,6 +124,7 @@ impl StorageServer { Ok(Self { store, + auth_runtime, shutdown_tx, server_handle, }) @@ -133,6 +139,17 @@ impl StorageServer { &self.store } + pub async fn list_credentials(&self) -> Vec { + self.auth_runtime.list_credentials().await + } + + pub async fn replace_credentials( + &self, + credentials: Vec, + ) -> Result<(), StorageError> { + self.auth_runtime.replace_credentials(credentials).await + } + async fn start_standalone(config: &SmartStorageConfig) -> Result> { let store = Arc::new(StorageBackend::Standalone( FileStore::new(config.storage.directory.clone().into()), @@ -220,7 +237,7 @@ impl StorageServer { // Initialize drive manager for health monitoring let drive_manager = Arc::new(tokio::sync::Mutex::new( - DriveManager::new(&cluster_config.drives).await?, + DriveManager::from_paths(&drive_paths).await?, )); // Join cluster if seed nodes are configured @@ -231,7 +248,7 @@ impl StorageServer { cluster_config.heartbeat_interval_ms, local_node_info, ) - .with_drive_manager(drive_manager), + .with_drive_manager(drive_manager.clone()), ); membership .join_cluster(&cluster_config.seed_nodes) @@ -261,12 +278,16 @@ impl StorageServer { }); // Start healing service + let healing_runtime = Arc::new(tokio::sync::RwLock::new( + crate::cluster::healing::HealingRuntimeState::default(), + )); let healing_service = HealingService::new( cluster_state.clone(), &erasure_config, local_shard_stores.clone(), manifest_dir.clone(), 24, // scan every 24 hours + healing_runtime.clone(), )?; let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false); tokio::spawn(async move { @@ -278,11 +299,16 @@ impl StorageServer { cluster_state, transport, erasure_config, + std::path::PathBuf::from(&config.storage.directory), drive_paths, + drive_manager, + healing_runtime, manifest_dir, buckets_dir, )?; + distributed_store.initialize_runtime_stats().await; + let store = Arc::new(StorageBackend::Clustered(distributed_store)); if !config.server.silent { @@ -379,6 +405,7 @@ async fn handle_request( req: Request, store: Arc, config: SmartStorageConfig, + auth_runtime: Arc, policy_store: Arc, ) -> Result, std::convert::Infallible> { let request_id = Uuid::new_v4().to_string(); @@ -396,7 +423,7 @@ async fn handle_request( let request_ctx = action::resolve_action(&req); // Step 2: Auth + policy pipeline - if config.auth.enabled { + if auth_runtime.enabled() { // Attempt authentication let identity = { let has_auth_header = req @@ -407,7 +434,8 @@ async fn handle_request( .unwrap_or(false); if has_auth_header { - match auth::verify_request(&req, &config) { + let credentials = auth_runtime.snapshot_credentials().await; + match auth::verify_request(&req, &credentials) { Ok(id) => Some(id), Err(e) => { tracing::warn!("Auth failed: {}", e.message); diff --git a/rust/src/storage.rs b/rust/src/storage.rs index dd92320..51ea1b7 100644 --- a/rust/src/storage.rs +++ b/rust/src/storage.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}; +use tokio::sync::RwLock; use uuid::Uuid; use crate::cluster::coordinator::DistributedStore; @@ -64,6 +65,133 @@ pub struct BucketInfo { pub creation_date: DateTime, } +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct BucketSummary { + pub name: String, + pub object_count: u64, + pub total_size_bytes: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub creation_date: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StorageLocationSummary { + pub path: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub total_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub available_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub used_bytes: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct StorageStats { + pub bucket_count: u64, + pub total_object_count: u64, + pub total_storage_bytes: u64, + pub buckets: Vec, + pub storage_directory: String, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub storage_locations: Vec, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClusterPeerHealth { + pub node_id: String, + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub quic_address: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub s3_address: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub drive_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_heartbeat: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub missed_heartbeats: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClusterDriveHealth { + pub index: u32, + pub path: String, + pub status: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub total_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub used_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub available_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_check: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub erasure_set_id: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClusterErasureHealth { + pub data_shards: usize, + pub parity_shards: usize, + pub chunk_size_bytes: usize, + pub total_shards: usize, + pub read_quorum: usize, + pub write_quorum: usize, + pub erasure_set_count: usize, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClusterRepairHealth { + pub active: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub scan_interval_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_run_started_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_run_completed_at: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_duration_ms: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub shards_checked: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub shards_healed: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub failed: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ClusterHealth { + pub enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub node_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub quorum_healthy: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub majority_healthy: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub peers: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub drives: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub erasure: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub repairs: Option, +} + pub struct MultipartUploadInfo { pub upload_id: String, pub key: String, @@ -98,22 +226,186 @@ struct PartMetadata { last_modified: String, } +#[derive(Debug, Clone, Default)] +pub(crate) struct RuntimeBucketStats { + pub object_count: u64, + pub total_size_bytes: u64, + pub creation_date: Option>, +} + +#[derive(Debug, Clone, Default)] +pub(crate) struct RuntimeStatsState { + buckets: HashMap, + total_object_count: u64, + total_storage_bytes: u64, +} + +impl RuntimeStatsState { + pub(crate) fn replace_buckets(&mut self, buckets: HashMap) { + self.total_object_count = buckets.values().map(|bucket| bucket.object_count).sum(); + self.total_storage_bytes = buckets.values().map(|bucket| bucket.total_size_bytes).sum(); + self.buckets = buckets; + } + + pub(crate) fn ensure_bucket(&mut self, name: &str, creation_date: Option>) { + let bucket = self.buckets.entry(name.to_string()).or_default(); + if bucket.creation_date.is_none() { + bucket.creation_date = creation_date; + } + } + + pub(crate) fn remove_bucket(&mut self, name: &str) { + if let Some(bucket) = self.buckets.remove(name) { + self.total_object_count = self.total_object_count.saturating_sub(bucket.object_count); + self.total_storage_bytes = self + .total_storage_bytes + .saturating_sub(bucket.total_size_bytes); + } + } + + pub(crate) fn upsert_object( + &mut self, + bucket_name: &str, + previous_size: Option, + new_size: u64, + ) { + let bucket_was_present = self.buckets.contains_key(bucket_name); + let bucket = self.buckets.entry(bucket_name.to_string()).or_default(); + + if let Some(previous_size) = previous_size { + if !bucket_was_present { + bucket.object_count = 1; + self.total_object_count += 1; + } + bucket.total_size_bytes = + bucket.total_size_bytes.saturating_sub(previous_size) + new_size; + self.total_storage_bytes = + self.total_storage_bytes.saturating_sub(previous_size) + new_size; + } else { + bucket.object_count += 1; + bucket.total_size_bytes += new_size; + self.total_object_count += 1; + self.total_storage_bytes += new_size; + } + } + + pub(crate) fn remove_object(&mut self, bucket_name: &str, existing_size: Option) { + let Some(existing_size) = existing_size else { + return; + }; + + let Some(bucket) = self.buckets.get_mut(bucket_name) else { + return; + }; + + bucket.object_count = bucket.object_count.saturating_sub(1); + bucket.total_size_bytes = bucket.total_size_bytes.saturating_sub(existing_size); + self.total_object_count = self.total_object_count.saturating_sub(1); + self.total_storage_bytes = self.total_storage_bytes.saturating_sub(existing_size); + } + + pub(crate) fn bucket_summaries(&self) -> Vec { + let mut buckets: Vec = self + .buckets + .iter() + .map(|(name, stats)| BucketSummary { + name: name.clone(), + object_count: stats.object_count, + total_size_bytes: stats.total_size_bytes, + creation_date: stats + .creation_date + .as_ref() + .map(|creation_date| creation_date.timestamp_millis()), + }) + .collect(); + buckets.sort_by(|a, b| a.name.cmp(&b.name)); + buckets + } + + pub(crate) fn snapshot( + &self, + storage_directory: &Path, + storage_locations: Vec, + ) -> StorageStats { + StorageStats { + bucket_count: self.buckets.len() as u64, + total_object_count: self.total_object_count, + total_storage_bytes: self.total_storage_bytes, + buckets: self.bucket_summaries(), + storage_directory: storage_directory.to_string_lossy().to_string(), + storage_locations, + } + } +} + +#[derive(Debug, Clone, Copy)] +struct FilesystemUsage { + total_bytes: u64, + available_bytes: u64, + used_bytes: u64, +} + +pub(crate) fn storage_location_summary(path: &Path) -> StorageLocationSummary { + let usage = filesystem_usage(path); + StorageLocationSummary { + path: path.to_string_lossy().to_string(), + total_bytes: usage.map(|usage| usage.total_bytes), + available_bytes: usage.map(|usage| usage.available_bytes), + used_bytes: usage.map(|usage| usage.used_bytes), + } +} + +#[cfg(unix)] +fn filesystem_usage(path: &Path) -> Option { + use std::ffi::CString; + use std::os::unix::ffi::OsStrExt; + + let path_bytes = path.as_os_str().as_bytes(); + let c_path = CString::new(path_bytes).ok()?; + let mut stat: libc::statvfs = unsafe { std::mem::zeroed() }; + + if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 { + return None; + } + + let block_size = stat.f_frsize as u64; + let total_bytes = stat.f_blocks as u64 * block_size; + let available_bytes = stat.f_bavail as u64 * block_size; + let free_bytes = stat.f_bfree as u64 * block_size; + + Some(FilesystemUsage { + total_bytes, + available_bytes, + used_bytes: total_bytes.saturating_sub(free_bytes), + }) +} + +#[cfg(not(unix))] +fn filesystem_usage(_path: &Path) -> Option { + None +} + // ============================ // FileStore // ============================ pub struct FileStore { root_dir: PathBuf, + runtime_stats: RwLock, } impl FileStore { pub fn new(root_dir: PathBuf) -> Self { - Self { root_dir } + Self { + root_dir, + runtime_stats: RwLock::new(RuntimeStatsState::default()), + } } pub async fn initialize(&self) -> Result<()> { fs::create_dir_all(&self.root_dir).await?; fs::create_dir_all(self.policies_dir()).await?; + self.refresh_runtime_stats().await; Ok(()) } @@ -127,9 +419,56 @@ impl FileStore { } fs::create_dir_all(&self.root_dir).await?; fs::create_dir_all(self.policies_dir()).await?; + self.refresh_runtime_stats().await; Ok(()) } + pub async fn get_storage_stats(&self) -> Result { + let runtime_stats = self.runtime_stats.read().await; + Ok(runtime_stats.snapshot( + &self.root_dir, + vec![storage_location_summary(&self.root_dir)], + )) + } + + pub async fn list_bucket_summaries(&self) -> Result> { + let runtime_stats = self.runtime_stats.read().await; + Ok(runtime_stats.bucket_summaries()) + } + + async fn refresh_runtime_stats(&self) { + let buckets = match self.list_buckets().await { + Ok(buckets) => buckets, + Err(error) => { + tracing::warn!(path = %self.root_dir.display(), error = %error, "Failed to initialize runtime stats"); + return; + } + }; + + let mut runtime_buckets = HashMap::new(); + for bucket in buckets { + let bucket_path = self.root_dir.join(&bucket.name); + match Self::scan_bucket_objects(&bucket_path).await { + Ok((object_count, total_size_bytes)) => { + runtime_buckets.insert( + bucket.name, + RuntimeBucketStats { + object_count, + total_size_bytes, + creation_date: Some(bucket.creation_date), + }, + ); + } + Err(error) => { + tracing::warn!(bucket = %bucket.name, error = %error, "Failed to scan bucket for runtime stats"); + } + } + } + + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.replace_buckets(runtime_buckets); + } + // ============================ // Bucket operations // ============================ @@ -168,6 +507,7 @@ impl FileStore { pub async fn create_bucket(&self, bucket: &str) -> Result<()> { let bucket_path = self.root_dir.join(bucket); fs::create_dir_all(&bucket_path).await?; + self.track_bucket_created(bucket).await; Ok(()) } @@ -185,6 +525,7 @@ impl FileStore { } fs::remove_dir_all(&bucket_path).await?; + self.track_bucket_deleted(bucket).await; Ok(()) } @@ -203,6 +544,8 @@ impl FileStore { return Err(StorageError::no_such_bucket().into()); } + let previous_size = self.object_size_if_exists(bucket, key).await; + let object_path = self.object_path(bucket, key); if let Some(parent) = object_path.parent() { fs::create_dir_all(parent).await?; @@ -243,9 +586,11 @@ impl FileStore { let metadata_json = serde_json::to_string_pretty(&metadata)?; fs::write(&metadata_path, metadata_json).await?; - Ok(PutResult { - md5: md5_hex, - }) + let object_size = fs::metadata(&object_path).await?.len(); + self.track_object_upsert(bucket, previous_size, object_size) + .await; + + Ok(PutResult { md5: md5_hex }) } pub async fn get_object( @@ -310,6 +655,7 @@ impl FileStore { } pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> { + let existing_size = self.object_size_if_exists(bucket, key).await; let object_path = self.object_path(bucket, key); let md5_path = format!("{}.md5", object_path.display()); let metadata_path = format!("{}.metadata.json", object_path.display()); @@ -337,6 +683,8 @@ impl FileStore { current = dir.parent().map(|p| p.to_path_buf()); } + self.track_object_deleted(bucket, existing_size).await; + Ok(()) } @@ -360,6 +708,8 @@ impl FileStore { return Err(StorageError::no_such_bucket().into()); } + let previous_size = self.object_size_if_exists(dest_bucket, dest_key).await; + if let Some(parent) = dest_path.parent() { fs::create_dir_all(parent).await?; } @@ -387,10 +737,10 @@ impl FileStore { let md5 = self.read_md5(&dest_path).await; let last_modified: DateTime = file_meta.modified()?.into(); - Ok(CopyResult { - md5, - last_modified, - }) + self.track_object_upsert(dest_bucket, previous_size, file_meta.len()) + .await; + + Ok(CopyResult { md5, last_modified }) } pub async fn list_objects( @@ -438,11 +788,7 @@ impl FileStore { if !delimiter.is_empty() { let remaining = &key[prefix.len()..]; if let Some(delim_idx) = remaining.find(delimiter) { - let cp = format!( - "{}{}", - prefix, - &remaining[..delim_idx + delimiter.len()] - ); + let cp = format!("{}{}", prefix, &remaining[..delim_idx + delimiter.len()]); if common_prefix_set.insert(cp.clone()) { common_prefixes.push(cp); } @@ -458,7 +804,10 @@ impl FileStore { let object_path = self.object_path(bucket, key); if let Ok(meta) = fs::metadata(&object_path).await { let md5 = self.read_md5(&object_path).await; - let last_modified: DateTime = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH).into(); + let last_modified: DateTime = meta + .modified() + .unwrap_or(std::time::SystemTime::UNIX_EPOCH) + .into(); contents.push(ListObjectEntry { key: key.clone(), size: meta.len(), @@ -611,6 +960,8 @@ impl FileStore { let content = fs::read_to_string(&meta_path).await?; let meta: MultipartMetadata = serde_json::from_str(&content)?; + let previous_size = self.object_size_if_exists(&meta.bucket, &meta.key).await; + let object_path = self.object_path(&meta.bucket, &meta.key); if let Some(parent) = object_path.parent() { fs::create_dir_all(parent).await?; @@ -653,12 +1004,14 @@ impl FileStore { let metadata_json = serde_json::to_string_pretty(&meta.metadata)?; fs::write(&metadata_path, metadata_json).await?; + let object_size = fs::metadata(&object_path).await?.len(); + self.track_object_upsert(&meta.bucket, previous_size, object_size) + .await; + // Clean up multipart directory let _ = fs::remove_dir_all(&upload_dir).await; - Ok(CompleteMultipartResult { - etag, - }) + Ok(CompleteMultipartResult { etag }) } pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> { @@ -670,10 +1023,7 @@ impl FileStore { Ok(()) } - pub async fn list_multipart_uploads( - &self, - bucket: &str, - ) -> Result> { + pub async fn list_multipart_uploads(&self, bucket: &str) -> Result> { let multipart_dir = self.multipart_dir(); if !multipart_dir.is_dir() { return Ok(Vec::new()); @@ -712,6 +1062,75 @@ impl FileStore { // Helpers // ============================ + async fn scan_bucket_objects(bucket_path: &Path) -> Result<(u64, u64)> { + let mut object_count = 0u64; + let mut total_size_bytes = 0u64; + let mut directories = vec![bucket_path.to_path_buf()]; + + while let Some(directory) = directories.pop() { + let mut entries = match fs::read_dir(&directory).await { + Ok(entries) => entries, + Err(_) => continue, + }; + + while let Some(entry) = entries.next_entry().await? { + let metadata = entry.metadata().await?; + if metadata.is_dir() { + directories.push(entry.path()); + continue; + } + + let name = entry.file_name().to_string_lossy().to_string(); + if name.ends_with("._storage_object") { + object_count += 1; + total_size_bytes += metadata.len(); + } + } + } + + Ok((object_count, total_size_bytes)) + } + + async fn bucket_creation_date(&self, bucket: &str) -> Option> { + let metadata = fs::metadata(self.root_dir.join(bucket)).await.ok()?; + let created_or_modified = metadata.created().unwrap_or( + metadata + .modified() + .unwrap_or(std::time::SystemTime::UNIX_EPOCH), + ); + Some(created_or_modified.into()) + } + + async fn object_size_if_exists(&self, bucket: &str, key: &str) -> Option { + fs::metadata(self.object_path(bucket, key)) + .await + .ok() + .map(|metadata| metadata.len()) + } + + async fn track_bucket_created(&self, bucket: &str) { + let creation_date = self.bucket_creation_date(bucket).await; + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.ensure_bucket(bucket, creation_date); + } + + async fn track_bucket_deleted(&self, bucket: &str) { + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.remove_bucket(bucket); + } + + async fn track_object_upsert(&self, bucket: &str, previous_size: Option, new_size: u64) { + let creation_date = self.bucket_creation_date(bucket).await; + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.ensure_bucket(bucket, creation_date); + runtime_stats.upsert_object(bucket, previous_size, new_size); + } + + async fn track_object_deleted(&self, bucket: &str, existing_size: Option) { + let mut runtime_stats = self.runtime_stats.write().await; + runtime_stats.remove_object(bucket, existing_size); + } + fn object_path(&self, bucket: &str, key: &str) -> PathBuf { let encoded = encode_key(key); self.root_dir @@ -815,12 +1234,43 @@ impl StorageBackend { } } + pub async fn get_cluster_health(&self) -> Result { + match self { + StorageBackend::Standalone(_) => Ok(ClusterHealth { + enabled: false, + node_id: None, + quorum_healthy: None, + majority_healthy: None, + peers: None, + drives: None, + erasure: None, + repairs: None, + }), + StorageBackend::Clustered(ds) => ds.get_cluster_health().await, + } + } + + pub async fn get_storage_stats(&self) -> Result { + match self { + StorageBackend::Standalone(fs) => fs.get_storage_stats().await, + StorageBackend::Clustered(ds) => ds.get_storage_stats().await, + } + } + + pub async fn list_bucket_summaries(&self) -> Result> { + match self { + StorageBackend::Standalone(fs) => fs.list_bucket_summaries().await, + StorageBackend::Clustered(ds) => ds.list_bucket_summaries().await, + } + } + pub async fn initialize(&self) -> Result<()> { match self { StorageBackend::Standalone(fs) => fs.initialize().await, StorageBackend::Clustered(ds) => { // Ensure policies directory exists tokio::fs::create_dir_all(ds.policies_dir()).await?; + ds.initialize_runtime_stats().await; Ok(()) } } @@ -911,10 +1361,26 @@ impl StorageBackend { ) -> Result { match self { StorageBackend::Standalone(fs) => { - fs.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await + fs.copy_object( + src_bucket, + src_key, + dest_bucket, + dest_key, + metadata_directive, + new_metadata, + ) + .await } StorageBackend::Clustered(ds) => { - ds.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await + ds.copy_object( + src_bucket, + src_key, + dest_bucket, + dest_key, + metadata_directive, + new_metadata, + ) + .await } } } @@ -929,10 +1395,12 @@ impl StorageBackend { ) -> Result { match self { StorageBackend::Standalone(fs) => { - fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await + fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token) + .await } StorageBackend::Clustered(ds) => { - ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await + ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token) + .await } } } @@ -979,10 +1447,7 @@ impl StorageBackend { } } - pub async fn list_multipart_uploads( - &self, - bucket: &str, - ) -> Result> { + pub async fn list_multipart_uploads(&self, bucket: &str) -> Result> { match self { StorageBackend::Standalone(fs) => fs.list_multipart_uploads(bucket).await, StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await, diff --git a/test/test.aws-sdk.node.ts b/test/test.aws-sdk.node.ts index 66b4645..81cd728 100644 --- a/test/test.aws-sdk.node.ts +++ b/test/test.aws-sdk.node.ts @@ -1,16 +1,28 @@ +/// + import { expect, tap } from '@git.zone/tstest/tapbundle'; import { S3Client, CreateBucketCommand, ListBucketsCommand, PutObjectCommand, GetObjectCommand, DeleteObjectCommand, DeleteBucketCommand } from '@aws-sdk/client-s3'; +import { Buffer } from 'buffer'; import { Readable } from 'stream'; import * as smartstorage from '../ts/index.js'; let testSmartStorageInstance: smartstorage.SmartStorage; let s3Client: S3Client; +const testObjectBody = 'Hello from AWS SDK!'; +const testObjectSize = Buffer.byteLength(testObjectBody); + +function getBucketSummary( + summaries: smartstorage.IBucketSummary[], + bucketName: string, +): smartstorage.IBucketSummary | undefined { + return summaries.find((summary) => summary.name === bucketName); +} // Helper to convert stream to string async function streamToString(stream: Readable): Promise { const chunks: Buffer[] = []; return new Promise((resolve, reject) => { - stream.on('data', (chunk) => chunks.push(Buffer.from(chunk))); + stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk))); stream.on('error', reject); stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); }); @@ -46,28 +58,82 @@ tap.test('should list buckets (empty)', async () => { expect(response.Buckets!.length).toEqual(0); }); +tap.test('should expose empty runtime stats after startup', async () => { + const stats = await testSmartStorageInstance.getStorageStats(); + expect(stats.bucketCount).toEqual(0); + expect(stats.totalObjectCount).toEqual(0); + expect(stats.totalStorageBytes).toEqual(0); + expect(stats.buckets.length).toEqual(0); + expect(stats.storageDirectory.length > 0).toEqual(true); +}); + +tap.test('should expose disabled cluster health in standalone mode', async () => { + const clusterHealth = await testSmartStorageInstance.getClusterHealth(); + expect(clusterHealth.enabled).toEqual(false); + expect(clusterHealth.nodeId).toEqual(undefined); + expect(clusterHealth.quorumHealthy).toEqual(undefined); + expect(clusterHealth.drives).toEqual(undefined); +}); + tap.test('should create a bucket', async () => { const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' })); expect(response.$metadata.httpStatusCode).toEqual(200); }); -tap.test('should list buckets (showing created bucket)', async () => { +tap.test('should create an empty bucket through the bridge', async () => { + const response = await testSmartStorageInstance.createBucket('empty-bucket'); + expect(response.name).toEqual('empty-bucket'); +}); + +tap.test('should list buckets (showing created buckets)', async () => { const response = await s3Client.send(new ListBucketsCommand({})); - expect(response.Buckets!.length).toEqual(1); - expect(response.Buckets![0].Name).toEqual('test-bucket'); + expect(response.Buckets!.length).toEqual(2); + expect(response.Buckets!.some((bucket) => bucket.Name === 'test-bucket')).toEqual(true); + expect(response.Buckets!.some((bucket) => bucket.Name === 'empty-bucket')).toEqual(true); +}); + +tap.test('should expose runtime bucket summaries after bucket creation', async () => { + const stats = await testSmartStorageInstance.getStorageStats(); + const summaries = await testSmartStorageInstance.listBucketSummaries(); + const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket'); + const emptyBucketSummary = getBucketSummary(summaries, 'empty-bucket'); + + expect(stats.bucketCount).toEqual(2); + expect(stats.totalObjectCount).toEqual(0); + expect(stats.totalStorageBytes).toEqual(0); + expect(summaries.length).toEqual(2); + expect(testBucketSummary?.objectCount).toEqual(0); + expect(testBucketSummary?.totalSizeBytes).toEqual(0); + expect(typeof testBucketSummary?.creationDate).toEqual('number'); + expect(emptyBucketSummary?.objectCount).toEqual(0); + expect(emptyBucketSummary?.totalSizeBytes).toEqual(0); }); tap.test('should upload an object', async () => { const response = await s3Client.send(new PutObjectCommand({ Bucket: 'test-bucket', Key: 'test-file.txt', - Body: 'Hello from AWS SDK!', + Body: testObjectBody, ContentType: 'text/plain', })); expect(response.$metadata.httpStatusCode).toEqual(200); expect(response.ETag).toBeTypeofString(); }); +tap.test('should reflect uploaded object in runtime stats', async () => { + const stats = await testSmartStorageInstance.getStorageStats(); + const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket'); + const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket'); + + expect(stats.bucketCount).toEqual(2); + expect(stats.totalObjectCount).toEqual(1); + expect(stats.totalStorageBytes).toEqual(testObjectSize); + expect(testBucketSummary?.objectCount).toEqual(1); + expect(testBucketSummary?.totalSizeBytes).toEqual(testObjectSize); + expect(emptyBucketSummary?.objectCount).toEqual(0); + expect(emptyBucketSummary?.totalSizeBytes).toEqual(0); +}); + tap.test('should download the object', async () => { const response = await s3Client.send(new GetObjectCommand({ Bucket: 'test-bucket', @@ -76,7 +142,7 @@ tap.test('should download the object', async () => { expect(response.$metadata.httpStatusCode).toEqual(200); const content = await streamToString(response.Body as Readable); - expect(content).toEqual('Hello from AWS SDK!'); + expect(content).toEqual(testObjectBody); }); tap.test('should delete the object', async () => { @@ -87,6 +153,20 @@ tap.test('should delete the object', async () => { expect(response.$metadata.httpStatusCode).toEqual(204); }); +tap.test('should reflect object deletion in runtime stats', async () => { + const stats = await testSmartStorageInstance.getStorageStats(); + const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket'); + const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket'); + + expect(stats.bucketCount).toEqual(2); + expect(stats.totalObjectCount).toEqual(0); + expect(stats.totalStorageBytes).toEqual(0); + expect(testBucketSummary?.objectCount).toEqual(0); + expect(testBucketSummary?.totalSizeBytes).toEqual(0); + expect(emptyBucketSummary?.objectCount).toEqual(0); + expect(emptyBucketSummary?.totalSizeBytes).toEqual(0); +}); + tap.test('should fail to get deleted object', async () => { await expect( s3Client.send(new GetObjectCommand({ @@ -96,11 +176,37 @@ tap.test('should fail to get deleted object', async () => { ).rejects.toThrow(); }); +tap.test('should delete the empty bucket', async () => { + const response = await s3Client.send(new DeleteBucketCommand({ Bucket: 'empty-bucket' })); + expect(response.$metadata.httpStatusCode).toEqual(204); +}); + +tap.test('should reflect bucket deletion in runtime stats', async () => { + const stats = await testSmartStorageInstance.getStorageStats(); + const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket'); + const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket'); + + expect(stats.bucketCount).toEqual(1); + expect(stats.totalObjectCount).toEqual(0); + expect(stats.totalStorageBytes).toEqual(0); + expect(testBucketSummary?.objectCount).toEqual(0); + expect(testBucketSummary?.totalSizeBytes).toEqual(0); + expect(emptyBucketSummary).toEqual(undefined); +}); + tap.test('should delete the bucket', async () => { const response = await s3Client.send(new DeleteBucketCommand({ Bucket: 'test-bucket' })); expect(response.$metadata.httpStatusCode).toEqual(204); }); +tap.test('should expose empty runtime stats after deleting all buckets', async () => { + const stats = await testSmartStorageInstance.getStorageStats(); + expect(stats.bucketCount).toEqual(0); + expect(stats.totalObjectCount).toEqual(0); + expect(stats.totalStorageBytes).toEqual(0); + expect(stats.buckets.length).toEqual(0); +}); + tap.test('should stop the storage server', async () => { await testSmartStorageInstance.stop(); }); diff --git a/test/test.cluster-health.node.ts b/test/test.cluster-health.node.ts new file mode 100644 index 0000000..15300c8 --- /dev/null +++ b/test/test.cluster-health.node.ts @@ -0,0 +1,84 @@ +/// + +import { rm } from 'fs/promises'; +import { join } from 'path'; +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as smartstorage from '../ts/index.js'; + +let clusterStorage: smartstorage.SmartStorage; +const baseDir = join(process.cwd(), '.nogit', `cluster-health-${Date.now()}`); +const drivePaths = Array.from({ length: 6 }, (_value, index) => { + return join(baseDir, `drive-${index + 1}`); +}); +const storageDir = join(baseDir, 'storage'); + +tap.test('setup: start clustered storage server', async () => { + clusterStorage = await smartstorage.SmartStorage.createAndStart({ + server: { + port: 3348, + silent: true, + }, + storage: { + directory: storageDir, + }, + cluster: { + enabled: true, + nodeId: 'cluster-health-node', + quicPort: 4348, + seedNodes: [], + erasure: { + dataShards: 4, + parityShards: 2, + chunkSizeBytes: 1024 * 1024, + }, + drives: { + paths: drivePaths, + }, + }, + }); +}); + +tap.test('should expose clustered runtime health', async () => { + const health = await clusterStorage.getClusterHealth(); + + expect(health.enabled).toEqual(true); + expect(health.nodeId).toEqual('cluster-health-node'); + expect(health.quorumHealthy).toEqual(true); + expect(health.majorityHealthy).toEqual(true); + expect(Array.isArray(health.peers)).toEqual(true); + expect(health.peers!.length).toEqual(0); + expect(Array.isArray(health.drives)).toEqual(true); + expect(health.drives!.length).toEqual(6); + expect(health.drives!.every((drive) => drive.status === 'online')).toEqual(true); + expect(health.drives!.every((drive) => drivePaths.includes(drive.path))).toEqual(true); + expect(health.drives!.every((drive) => drive.totalBytes !== undefined)).toEqual(true); + expect(health.drives!.every((drive) => drive.usedBytes !== undefined)).toEqual(true); + expect(health.drives!.every((drive) => drive.lastCheck !== undefined)).toEqual(true); + expect(health.drives!.every((drive) => drive.erasureSetId === 0)).toEqual(true); + expect(health.erasure?.dataShards).toEqual(4); + expect(health.erasure?.parityShards).toEqual(2); + expect(health.erasure?.chunkSizeBytes).toEqual(1024 * 1024); + expect(health.erasure?.totalShards).toEqual(6); + expect(health.erasure?.readQuorum).toEqual(4); + expect(health.erasure?.writeQuorum).toEqual(5); + expect(health.erasure?.erasureSetCount).toEqual(1); + expect(health.repairs?.active).toEqual(false); + expect(health.repairs?.scanIntervalMs).toEqual(24 * 60 * 60 * 1000); +}); + +tap.test('should expose cluster health after bucket creation', async () => { + const bucket = await clusterStorage.createBucket('cluster-health-bucket'); + const health = await clusterStorage.getClusterHealth(); + + expect(bucket.name).toEqual('cluster-health-bucket'); + expect(health.enabled).toEqual(true); + expect(health.quorumHealthy).toEqual(true); + expect(health.drives!.length).toEqual(6); +}); + +tap.test('teardown: stop clustered server and clean files', async () => { + await clusterStorage.stop(); + await rm(baseDir, { recursive: true, force: true }); +}); + +export default tap.start() diff --git a/test/test.credentials.node.ts b/test/test.credentials.node.ts new file mode 100644 index 0000000..e708315 --- /dev/null +++ b/test/test.credentials.node.ts @@ -0,0 +1,150 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import { + CreateBucketCommand, + DeleteBucketCommand, + ListBucketsCommand, + S3Client, +} from '@aws-sdk/client-s3'; +import * as smartstorage from '../ts/index.js'; + +const TEST_PORT = 3349; +const INITIAL_CREDENTIAL: smartstorage.IStorageCredential = { + accessKeyId: 'RUNTIMEINITIAL', + secretAccessKey: 'RUNTIMEINITIALSECRET123', +}; +const ROTATED_CREDENTIAL_A: smartstorage.IStorageCredential = { + accessKeyId: 'RUNTIMEA', + secretAccessKey: 'RUNTIMEASECRET123', +}; +const ROTATED_CREDENTIAL_B: smartstorage.IStorageCredential = { + accessKeyId: 'RUNTIMEB', + secretAccessKey: 'RUNTIMEBSECRET123', +}; +const TEST_BUCKET = 'runtime-credentials-bucket'; + +let testSmartStorageInstance: smartstorage.SmartStorage; +let initialClient: S3Client; +let rotatedClientA: S3Client; +let rotatedClientB: S3Client; + +function createS3Client(credential: smartstorage.IStorageCredential): S3Client { + return new S3Client({ + endpoint: `http://localhost:${TEST_PORT}`, + region: 'us-east-1', + credentials: { + accessKeyId: credential.accessKeyId, + secretAccessKey: credential.secretAccessKey, + }, + forcePathStyle: true, + }); +} + +tap.test('setup: start storage server with runtime-managed credentials', async () => { + testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({ + server: { + port: TEST_PORT, + silent: true, + region: 'us-east-1', + }, + storage: { + cleanSlate: true, + }, + auth: { + enabled: true, + credentials: [INITIAL_CREDENTIAL], + }, + }); + + initialClient = createS3Client(INITIAL_CREDENTIAL); + rotatedClientA = createS3Client(ROTATED_CREDENTIAL_A); + rotatedClientB = createS3Client(ROTATED_CREDENTIAL_B); +}); + +tap.test('startup credentials authenticate successfully', async () => { + const response = await initialClient.send(new ListBucketsCommand({})); + expect(response.$metadata.httpStatusCode).toEqual(200); +}); + +tap.test('listCredentials returns the active startup credential set', async () => { + const credentials = await testSmartStorageInstance.listCredentials(); + expect(credentials.length).toEqual(1); + expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId); + expect(credentials[0].secretAccessKey).toEqual(INITIAL_CREDENTIAL.secretAccessKey); +}); + +tap.test('invalid replacement input fails cleanly and leaves old credentials active', async () => { + await expect( + testSmartStorageInstance.replaceCredentials([ + { + accessKeyId: '', + secretAccessKey: 'invalid-secret', + }, + ]), + ).rejects.toThrow(); + + const credentials = await testSmartStorageInstance.listCredentials(); + expect(credentials.length).toEqual(1); + expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId); + + const response = await initialClient.send(new ListBucketsCommand({})); + expect(response.$metadata.httpStatusCode).toEqual(200); +}); + +tap.test('replacing credentials swaps the active set atomically', async () => { + await testSmartStorageInstance.replaceCredentials([ + ROTATED_CREDENTIAL_A, + ROTATED_CREDENTIAL_B, + ]); + + const credentials = await testSmartStorageInstance.listCredentials(); + expect(credentials.length).toEqual(2); + expect(credentials[0].accessKeyId).toEqual(ROTATED_CREDENTIAL_A.accessKeyId); + expect(credentials[1].accessKeyId).toEqual(ROTATED_CREDENTIAL_B.accessKeyId); +}); + +tap.test('old credentials stop working immediately for new requests', async () => { + await expect(initialClient.send(new ListBucketsCommand({}))).rejects.toThrow(); +}); + +tap.test('first rotated credential authenticates successfully', async () => { + const response = await rotatedClientA.send( + new CreateBucketCommand({ Bucket: TEST_BUCKET }), + ); + expect(response.$metadata.httpStatusCode).toEqual(200); +}); + +tap.test('multiple rotated credentials remain active', async () => { + const response = await rotatedClientB.send(new ListBucketsCommand({})); + expect(response.$metadata.httpStatusCode).toEqual(200); + expect(response.Buckets?.some((bucket) => bucket.Name === TEST_BUCKET)).toEqual(true); +}); + +tap.test('duplicate replacement input fails cleanly without changing the active set', async () => { + await expect( + testSmartStorageInstance.replaceCredentials([ + ROTATED_CREDENTIAL_A, + { + accessKeyId: ROTATED_CREDENTIAL_A.accessKeyId, + secretAccessKey: 'another-secret', + }, + ]), + ).rejects.toThrow(); + + const credentials = await testSmartStorageInstance.listCredentials(); + expect(credentials.length).toEqual(2); + expect(credentials[0].accessKeyId).toEqual(ROTATED_CREDENTIAL_A.accessKeyId); + expect(credentials[1].accessKeyId).toEqual(ROTATED_CREDENTIAL_B.accessKeyId); + + const response = await rotatedClientA.send(new ListBucketsCommand({})); + expect(response.$metadata.httpStatusCode).toEqual(200); +}); + +tap.test('teardown: clean up bucket and stop the storage server', async () => { + const response = await rotatedClientA.send( + new DeleteBucketCommand({ Bucket: TEST_BUCKET }), + ); + expect(response.$metadata.httpStatusCode).toEqual(204); + await testSmartStorageInstance.stop(); +}); + +export default tap.start() diff --git a/ts/index.ts b/ts/index.ts index 33836b3..d1e0bb3 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -1,15 +1,20 @@ import * as plugins from './plugins.js'; import * as paths from './paths.js'; +/** + * Authentication configuration + */ +export interface IStorageCredential { + accessKeyId: string; + secretAccessKey: string; +} + /** * Authentication configuration */ export interface IAuthConfig { enabled: boolean; - credentials: Array<{ - accessKeyId: string; - secretAccessKey: string; - }>; + credentials: IStorageCredential[]; } /** @@ -113,6 +118,105 @@ export interface ISmartStorageConfig { cluster?: IClusterConfig; } +/** + * Logical bucket stats maintained by the Rust runtime. + * Values are initialized from native storage on startup and updated on smartstorage mutations. + */ +export interface IBucketSummary { + name: string; + objectCount: number; + totalSizeBytes: number; + creationDate?: number; +} + +/** + * Filesystem-level capacity snapshot for the storage directory or configured drive path. + */ +export interface IStorageLocationSummary { + path: string; + totalBytes?: number; + availableBytes?: number; + usedBytes?: number; +} + +/** + * Runtime storage stats served by the Rust core without issuing S3 list calls. + */ +export interface IStorageStats { + bucketCount: number; + totalObjectCount: number; + totalStorageBytes: number; + buckets: IBucketSummary[]; + storageDirectory: string; + storageLocations?: IStorageLocationSummary[]; +} + +/** + * Known peer status from the local node's current cluster view. + */ +export interface IClusterPeerHealth { + nodeId: string; + status: 'online' | 'suspect' | 'offline'; + quicAddress?: string; + s3Address?: string; + driveCount?: number; + lastHeartbeat?: number; + missedHeartbeats?: number; +} + +/** + * Local drive health as measured by smartstorage's runtime probes. + */ +export interface IClusterDriveHealth { + index: number; + path: string; + status: 'online' | 'degraded' | 'offline' | 'healing'; + totalBytes?: number; + usedBytes?: number; + availableBytes?: number; + errorCount?: number; + lastError?: string; + lastCheck?: number; + erasureSetId?: number; +} + +export interface IClusterErasureHealth { + dataShards: number; + parityShards: number; + chunkSizeBytes: number; + totalShards: number; + readQuorum: number; + writeQuorum: number; + erasureSetCount: number; +} + +export interface IClusterRepairHealth { + active: boolean; + scanIntervalMs?: number; + lastRunStartedAt?: number; + lastRunCompletedAt?: number; + lastDurationMs?: number; + shardsChecked?: number; + shardsHealed?: number; + failed?: number; + lastError?: string; +} + +/** + * Cluster runtime health from the Rust core. + * When clustering is disabled, the response is `{ enabled: false }`. + */ +export interface IClusterHealth { + enabled: boolean; + nodeId?: string; + quorumHealthy?: boolean; + majorityHealthy?: boolean; + peers?: IClusterPeerHealth[]; + drives?: IClusterDriveHealth[]; + erasure?: IClusterErasureHealth; + repairs?: IClusterRepairHealth; +} + /** * Default configuration values */ @@ -205,6 +309,11 @@ type TRustStorageCommands = { start: { params: { config: Required }; result: {} }; stop: { params: {}; result: {} }; createBucket: { params: { name: string }; result: {} }; + getStorageStats: { params: {}; result: IStorageStats }; + listBucketSummaries: { params: {}; result: IBucketSummary[] }; + listCredentials: { params: {}; result: IStorageCredential[] }; + replaceCredentials: { params: { credentials: IStorageCredential[] }; result: {} }; + getClusterHealth: { params: {}; result: IClusterHealth }; }; /** @@ -274,6 +383,27 @@ export class SmartStorage { return { name: bucketNameArg }; } + public async getStorageStats(): Promise { + return this.bridge.sendCommand('getStorageStats', {}); + } + + public async listBucketSummaries(): Promise { + return this.bridge.sendCommand('listBucketSummaries', {}); + } + + public async listCredentials(): Promise { + return this.bridge.sendCommand('listCredentials', {}); + } + + public async replaceCredentials(credentials: IStorageCredential[]): Promise { + await this.bridge.sendCommand('replaceCredentials', { credentials }); + this.config.auth.credentials = credentials.map((credential) => ({ ...credential })); + } + + public async getClusterHealth(): Promise { + return this.bridge.sendCommand('getClusterHealth', {}); + } + public async stop() { await this.bridge.sendCommand('stop', {}); this.bridge.kill();