feat: enhance storage stats and cluster health reporting

- Introduced new data structures for bucket and storage statistics, including BucketSummary, StorageStats, and ClusterHealth.
- Implemented runtime statistics tracking for buckets, including object count and total size.
- Added methods to retrieve storage stats and bucket summaries in the FileStore.
- Enhanced the SmartStorage interface to expose storage stats and cluster health.
- Implemented tests for runtime stats, cluster health, and credential management.
- Added support for runtime-managed credentials with atomic replacement.
- Improved filesystem usage reporting for storage locations.
This commit is contained in:
2026-04-19 11:57:28 +00:00
parent c683b02e8c
commit 0e9862efca
16 changed files with 1803 additions and 85 deletions
+494 -29
View File
@@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::cluster::coordinator::DistributedStore;
@@ -64,6 +65,133 @@ pub struct BucketInfo {
pub creation_date: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketSummary {
pub name: String,
pub object_count: u64,
pub total_size_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub creation_date: Option<i64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageLocationSummary {
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageStats {
pub bucket_count: u64,
pub total_object_count: u64,
pub total_storage_bytes: u64,
pub buckets: Vec<BucketSummary>,
pub storage_directory: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub storage_locations: Vec<StorageLocationSummary>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterPeerHealth {
pub node_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub quic_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub s3_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drive_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub missed_heartbeats: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterDriveHealth {
pub index: u32,
pub path: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_count: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_check: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure_set_id: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterErasureHealth {
pub data_shards: usize,
pub parity_shards: usize,
pub chunk_size_bytes: usize,
pub total_shards: usize,
pub read_quorum: usize,
pub write_quorum: usize,
pub erasure_set_count: usize,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterRepairHealth {
pub active: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub scan_interval_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_started_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_completed_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_duration_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_checked: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_healed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterHealth {
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub quorum_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub majority_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub peers: Option<Vec<ClusterPeerHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drives: Option<Vec<ClusterDriveHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure: Option<ClusterErasureHealth>,
#[serde(skip_serializing_if = "Option::is_none")]
pub repairs: Option<ClusterRepairHealth>,
}
pub struct MultipartUploadInfo {
pub upload_id: String,
pub key: String,
@@ -98,22 +226,186 @@ struct PartMetadata {
last_modified: String,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeBucketStats {
pub object_count: u64,
pub total_size_bytes: u64,
pub creation_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeStatsState {
buckets: HashMap<String, RuntimeBucketStats>,
total_object_count: u64,
total_storage_bytes: u64,
}
impl RuntimeStatsState {
pub(crate) fn replace_buckets(&mut self, buckets: HashMap<String, RuntimeBucketStats>) {
self.total_object_count = buckets.values().map(|bucket| bucket.object_count).sum();
self.total_storage_bytes = buckets.values().map(|bucket| bucket.total_size_bytes).sum();
self.buckets = buckets;
}
pub(crate) fn ensure_bucket(&mut self, name: &str, creation_date: Option<DateTime<Utc>>) {
let bucket = self.buckets.entry(name.to_string()).or_default();
if bucket.creation_date.is_none() {
bucket.creation_date = creation_date;
}
}
pub(crate) fn remove_bucket(&mut self, name: &str) {
if let Some(bucket) = self.buckets.remove(name) {
self.total_object_count = self.total_object_count.saturating_sub(bucket.object_count);
self.total_storage_bytes = self
.total_storage_bytes
.saturating_sub(bucket.total_size_bytes);
}
}
pub(crate) fn upsert_object(
&mut self,
bucket_name: &str,
previous_size: Option<u64>,
new_size: u64,
) {
let bucket_was_present = self.buckets.contains_key(bucket_name);
let bucket = self.buckets.entry(bucket_name.to_string()).or_default();
if let Some(previous_size) = previous_size {
if !bucket_was_present {
bucket.object_count = 1;
self.total_object_count += 1;
}
bucket.total_size_bytes =
bucket.total_size_bytes.saturating_sub(previous_size) + new_size;
self.total_storage_bytes =
self.total_storage_bytes.saturating_sub(previous_size) + new_size;
} else {
bucket.object_count += 1;
bucket.total_size_bytes += new_size;
self.total_object_count += 1;
self.total_storage_bytes += new_size;
}
}
pub(crate) fn remove_object(&mut self, bucket_name: &str, existing_size: Option<u64>) {
let Some(existing_size) = existing_size else {
return;
};
let Some(bucket) = self.buckets.get_mut(bucket_name) else {
return;
};
bucket.object_count = bucket.object_count.saturating_sub(1);
bucket.total_size_bytes = bucket.total_size_bytes.saturating_sub(existing_size);
self.total_object_count = self.total_object_count.saturating_sub(1);
self.total_storage_bytes = self.total_storage_bytes.saturating_sub(existing_size);
}
pub(crate) fn bucket_summaries(&self) -> Vec<BucketSummary> {
let mut buckets: Vec<BucketSummary> = self
.buckets
.iter()
.map(|(name, stats)| BucketSummary {
name: name.clone(),
object_count: stats.object_count,
total_size_bytes: stats.total_size_bytes,
creation_date: stats
.creation_date
.as_ref()
.map(|creation_date| creation_date.timestamp_millis()),
})
.collect();
buckets.sort_by(|a, b| a.name.cmp(&b.name));
buckets
}
pub(crate) fn snapshot(
&self,
storage_directory: &Path,
storage_locations: Vec<StorageLocationSummary>,
) -> StorageStats {
StorageStats {
bucket_count: self.buckets.len() as u64,
total_object_count: self.total_object_count,
total_storage_bytes: self.total_storage_bytes,
buckets: self.bucket_summaries(),
storage_directory: storage_directory.to_string_lossy().to_string(),
storage_locations,
}
}
}
#[derive(Debug, Clone, Copy)]
struct FilesystemUsage {
total_bytes: u64,
available_bytes: u64,
used_bytes: u64,
}
pub(crate) fn storage_location_summary(path: &Path) -> StorageLocationSummary {
let usage = filesystem_usage(path);
StorageLocationSummary {
path: path.to_string_lossy().to_string(),
total_bytes: usage.map(|usage| usage.total_bytes),
available_bytes: usage.map(|usage| usage.available_bytes),
used_bytes: usage.map(|usage| usage.used_bytes),
}
}
#[cfg(unix)]
fn filesystem_usage(path: &Path) -> Option<FilesystemUsage> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let path_bytes = path.as_os_str().as_bytes();
let c_path = CString::new(path_bytes).ok()?;
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 {
return None;
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let free_bytes = stat.f_bfree as u64 * block_size;
Some(FilesystemUsage {
total_bytes,
available_bytes,
used_bytes: total_bytes.saturating_sub(free_bytes),
})
}
#[cfg(not(unix))]
fn filesystem_usage(_path: &Path) -> Option<FilesystemUsage> {
None
}
// ============================
// FileStore
// ============================
pub struct FileStore {
root_dir: PathBuf,
runtime_stats: RwLock<RuntimeStatsState>,
}
impl FileStore {
pub fn new(root_dir: PathBuf) -> Self {
Self { root_dir }
Self {
root_dir,
runtime_stats: RwLock::new(RuntimeStatsState::default()),
}
}
pub async fn initialize(&self) -> Result<()> {
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
@@ -127,9 +419,56 @@ impl FileStore {
}
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.snapshot(
&self.root_dir,
vec![storage_location_summary(&self.root_dir)],
))
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.bucket_summaries())
}
async fn refresh_runtime_stats(&self) {
let buckets = match self.list_buckets().await {
Ok(buckets) => buckets,
Err(error) => {
tracing::warn!(path = %self.root_dir.display(), error = %error, "Failed to initialize runtime stats");
return;
}
};
let mut runtime_buckets = HashMap::new();
for bucket in buckets {
let bucket_path = self.root_dir.join(&bucket.name);
match Self::scan_bucket_objects(&bucket_path).await {
Ok((object_count, total_size_bytes)) => {
runtime_buckets.insert(
bucket.name,
RuntimeBucketStats {
object_count,
total_size_bytes,
creation_date: Some(bucket.creation_date),
},
);
}
Err(error) => {
tracing::warn!(bucket = %bucket.name, error = %error, "Failed to scan bucket for runtime stats");
}
}
}
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.replace_buckets(runtime_buckets);
}
// ============================
// Bucket operations
// ============================
@@ -168,6 +507,7 @@ impl FileStore {
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
let bucket_path = self.root_dir.join(bucket);
fs::create_dir_all(&bucket_path).await?;
self.track_bucket_created(bucket).await;
Ok(())
}
@@ -185,6 +525,7 @@ impl FileStore {
}
fs::remove_dir_all(&bucket_path).await?;
self.track_bucket_deleted(bucket).await;
Ok(())
}
@@ -203,6 +544,8 @@ impl FileStore {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
@@ -243,9 +586,11 @@ impl FileStore {
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(&metadata_path, metadata_json).await?;
Ok(PutResult {
md5: md5_hex,
})
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(bucket, previous_size, object_size)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object(
@@ -310,6 +655,7 @@ impl FileStore {
}
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
let existing_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
let md5_path = format!("{}.md5", object_path.display());
let metadata_path = format!("{}.metadata.json", object_path.display());
@@ -337,6 +683,8 @@ impl FileStore {
current = dir.parent().map(|p| p.to_path_buf());
}
self.track_object_deleted(bucket, existing_size).await;
Ok(())
}
@@ -360,6 +708,8 @@ impl FileStore {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(dest_bucket, dest_key).await;
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent).await?;
}
@@ -387,10 +737,10 @@ impl FileStore {
let md5 = self.read_md5(&dest_path).await;
let last_modified: DateTime<Utc> = file_meta.modified()?.into();
Ok(CopyResult {
md5,
last_modified,
})
self.track_object_upsert(dest_bucket, previous_size, file_meta.len())
.await;
Ok(CopyResult { md5, last_modified })
}
pub async fn list_objects(
@@ -438,11 +788,7 @@ impl FileStore {
if !delimiter.is_empty() {
let remaining = &key[prefix.len()..];
if let Some(delim_idx) = remaining.find(delimiter) {
let cp = format!(
"{}{}",
prefix,
&remaining[..delim_idx + delimiter.len()]
);
let cp = format!("{}{}", prefix, &remaining[..delim_idx + delimiter.len()]);
if common_prefix_set.insert(cp.clone()) {
common_prefixes.push(cp);
}
@@ -458,7 +804,10 @@ impl FileStore {
let object_path = self.object_path(bucket, key);
if let Ok(meta) = fs::metadata(&object_path).await {
let md5 = self.read_md5(&object_path).await;
let last_modified: DateTime<Utc> = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH).into();
let last_modified: DateTime<Utc> = meta
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
.into();
contents.push(ListObjectEntry {
key: key.clone(),
size: meta.len(),
@@ -611,6 +960,8 @@ impl FileStore {
let content = fs::read_to_string(&meta_path).await?;
let meta: MultipartMetadata = serde_json::from_str(&content)?;
let previous_size = self.object_size_if_exists(&meta.bucket, &meta.key).await;
let object_path = self.object_path(&meta.bucket, &meta.key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
@@ -653,12 +1004,14 @@ impl FileStore {
let metadata_json = serde_json::to_string_pretty(&meta.metadata)?;
fs::write(&metadata_path, metadata_json).await?;
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(&meta.bucket, previous_size, object_size)
.await;
// Clean up multipart directory
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(CompleteMultipartResult {
etag,
})
Ok(CompleteMultipartResult { etag })
}
pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
@@ -670,10 +1023,7 @@ impl FileStore {
Ok(())
}
pub async fn list_multipart_uploads(
&self,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
let multipart_dir = self.multipart_dir();
if !multipart_dir.is_dir() {
return Ok(Vec::new());
@@ -712,6 +1062,75 @@ impl FileStore {
// Helpers
// ============================
async fn scan_bucket_objects(bucket_path: &Path) -> Result<(u64, u64)> {
let mut object_count = 0u64;
let mut total_size_bytes = 0u64;
let mut directories = vec![bucket_path.to_path_buf()];
while let Some(directory) = directories.pop() {
let mut entries = match fs::read_dir(&directory).await {
Ok(entries) => entries,
Err(_) => continue,
};
while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
if metadata.is_dir() {
directories.push(entry.path());
continue;
}
let name = entry.file_name().to_string_lossy().to_string();
if name.ends_with("._storage_object") {
object_count += 1;
total_size_bytes += metadata.len();
}
}
}
Ok((object_count, total_size_bytes))
}
async fn bucket_creation_date(&self, bucket: &str) -> Option<DateTime<Utc>> {
let metadata = fs::metadata(self.root_dir.join(bucket)).await.ok()?;
let created_or_modified = metadata.created().unwrap_or(
metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
);
Some(created_or_modified.into())
}
async fn object_size_if_exists(&self, bucket: &str, key: &str) -> Option<u64> {
fs::metadata(self.object_path(bucket, key))
.await
.ok()
.map(|metadata| metadata.len())
}
async fn track_bucket_created(&self, bucket: &str) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
}
async fn track_bucket_deleted(&self, bucket: &str) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_bucket(bucket);
}
async fn track_object_upsert(&self, bucket: &str, previous_size: Option<u64>, new_size: u64) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
runtime_stats.upsert_object(bucket, previous_size, new_size);
}
async fn track_object_deleted(&self, bucket: &str, existing_size: Option<u64>) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_object(bucket, existing_size);
}
fn object_path(&self, bucket: &str, key: &str) -> PathBuf {
let encoded = encode_key(key);
self.root_dir
@@ -815,12 +1234,43 @@ impl StorageBackend {
}
}
pub async fn get_cluster_health(&self) -> Result<ClusterHealth> {
match self {
StorageBackend::Standalone(_) => Ok(ClusterHealth {
enabled: false,
node_id: None,
quorum_healthy: None,
majority_healthy: None,
peers: None,
drives: None,
erasure: None,
repairs: None,
}),
StorageBackend::Clustered(ds) => ds.get_cluster_health().await,
}
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
match self {
StorageBackend::Standalone(fs) => fs.get_storage_stats().await,
StorageBackend::Clustered(ds) => ds.get_storage_stats().await,
}
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
match self {
StorageBackend::Standalone(fs) => fs.list_bucket_summaries().await,
StorageBackend::Clustered(ds) => ds.list_bucket_summaries().await,
}
}
pub async fn initialize(&self) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.initialize().await,
StorageBackend::Clustered(ds) => {
// Ensure policies directory exists
tokio::fs::create_dir_all(ds.policies_dir()).await?;
ds.initialize_runtime_stats().await;
Ok(())
}
}
@@ -911,10 +1361,26 @@ impl StorageBackend {
) -> Result<CopyResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await
fs.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
StorageBackend::Clustered(ds) => {
ds.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await
ds.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
}
}
@@ -929,10 +1395,12 @@ impl StorageBackend {
) -> Result<ListObjectsResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await
fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
StorageBackend::Clustered(ds) => {
ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await
ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
}
}
@@ -979,10 +1447,7 @@ impl StorageBackend {
}
}
pub async fn list_multipart_uploads(
&self,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
match self {
StorageBackend::Standalone(fs) => fs.list_multipart_uploads(bucket).await,
StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await,