Files
smartstorage/rust/src/storage.rs
T

1635 lines
52 KiB
Rust

use anyhow::Result;
use chrono::{DateTime, Utc};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use md5::{Digest, Md5};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::cluster::coordinator::DistributedStore;
use crate::error::StorageError;
// ============================
// Result types
// ============================
pub struct PutResult {
pub md5: String,
}
pub struct GetResult {
pub size: u64,
pub last_modified: DateTime<Utc>,
pub md5: String,
pub metadata: HashMap<String, String>,
pub body: tokio::fs::File,
pub content_length: u64,
}
pub struct HeadResult {
pub size: u64,
pub last_modified: DateTime<Utc>,
pub md5: String,
pub metadata: HashMap<String, String>,
}
pub struct CopyResult {
pub md5: String,
pub last_modified: DateTime<Utc>,
}
pub struct ListObjectEntry {
pub key: String,
pub size: u64,
pub last_modified: DateTime<Utc>,
pub md5: String,
}
pub struct ListObjectsResult {
pub contents: Vec<ListObjectEntry>,
pub common_prefixes: Vec<String>,
pub is_truncated: bool,
pub next_continuation_token: Option<String>,
pub prefix: String,
pub delimiter: String,
pub max_keys: usize,
}
pub struct BucketInfo {
pub name: String,
pub creation_date: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketSummary {
pub name: String,
pub object_count: u64,
pub total_size_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub creation_date: Option<i64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageLocationSummary {
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageStats {
pub bucket_count: u64,
pub total_object_count: u64,
pub total_storage_bytes: u64,
pub buckets: Vec<BucketSummary>,
pub storage_directory: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub storage_locations: Vec<StorageLocationSummary>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketExport {
pub format: String,
pub bucket_name: String,
pub exported_at: i64,
pub objects: Vec<BucketExportObject>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketExportObject {
pub key: String,
pub size: u64,
pub md5: String,
pub metadata: HashMap<String, String>,
pub data_hex: String,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterPeerHealth {
pub node_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub quic_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub s3_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drive_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub missed_heartbeats: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterDriveHealth {
pub index: u32,
pub path: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_count: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_check: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure_set_id: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterErasureHealth {
pub data_shards: usize,
pub parity_shards: usize,
pub chunk_size_bytes: usize,
pub total_shards: usize,
pub read_quorum: usize,
pub write_quorum: usize,
pub erasure_set_count: usize,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterRepairHealth {
pub active: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub scan_interval_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_started_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_completed_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_duration_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_checked: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_healed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterHealth {
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub quorum_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub majority_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub peers: Option<Vec<ClusterPeerHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drives: Option<Vec<ClusterDriveHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure: Option<ClusterErasureHealth>,
#[serde(skip_serializing_if = "Option::is_none")]
pub repairs: Option<ClusterRepairHealth>,
}
pub struct MultipartUploadInfo {
pub upload_id: String,
pub key: String,
pub initiated: DateTime<Utc>,
}
pub struct CompleteMultipartResult {
pub etag: String,
}
// ============================
// Multipart metadata (disk format, compatible with TS)
// ============================
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct MultipartMetadata {
upload_id: String,
bucket: String,
key: String,
initiated: String,
metadata: HashMap<String, String>,
parts: Vec<PartMetadata>,
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PartMetadata {
part_number: u32,
etag: String,
size: u64,
last_modified: String,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeBucketStats {
pub object_count: u64,
pub total_size_bytes: u64,
pub creation_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeStatsState {
buckets: HashMap<String, RuntimeBucketStats>,
total_object_count: u64,
total_storage_bytes: u64,
}
impl RuntimeStatsState {
pub(crate) fn replace_buckets(&mut self, buckets: HashMap<String, RuntimeBucketStats>) {
self.total_object_count = buckets.values().map(|bucket| bucket.object_count).sum();
self.total_storage_bytes = buckets.values().map(|bucket| bucket.total_size_bytes).sum();
self.buckets = buckets;
}
pub(crate) fn ensure_bucket(&mut self, name: &str, creation_date: Option<DateTime<Utc>>) {
let bucket = self.buckets.entry(name.to_string()).or_default();
if bucket.creation_date.is_none() {
bucket.creation_date = creation_date;
}
}
pub(crate) fn remove_bucket(&mut self, name: &str) {
if let Some(bucket) = self.buckets.remove(name) {
self.total_object_count = self.total_object_count.saturating_sub(bucket.object_count);
self.total_storage_bytes = self
.total_storage_bytes
.saturating_sub(bucket.total_size_bytes);
}
}
pub(crate) fn upsert_object(
&mut self,
bucket_name: &str,
previous_size: Option<u64>,
new_size: u64,
) {
let bucket_was_present = self.buckets.contains_key(bucket_name);
let bucket = self.buckets.entry(bucket_name.to_string()).or_default();
if let Some(previous_size) = previous_size {
if !bucket_was_present {
bucket.object_count = 1;
self.total_object_count += 1;
}
bucket.total_size_bytes =
bucket.total_size_bytes.saturating_sub(previous_size) + new_size;
self.total_storage_bytes =
self.total_storage_bytes.saturating_sub(previous_size) + new_size;
} else {
bucket.object_count += 1;
bucket.total_size_bytes += new_size;
self.total_object_count += 1;
self.total_storage_bytes += new_size;
}
}
pub(crate) fn remove_object(&mut self, bucket_name: &str, existing_size: Option<u64>) {
let Some(existing_size) = existing_size else {
return;
};
let Some(bucket) = self.buckets.get_mut(bucket_name) else {
return;
};
bucket.object_count = bucket.object_count.saturating_sub(1);
bucket.total_size_bytes = bucket.total_size_bytes.saturating_sub(existing_size);
self.total_object_count = self.total_object_count.saturating_sub(1);
self.total_storage_bytes = self.total_storage_bytes.saturating_sub(existing_size);
}
pub(crate) fn bucket_summaries(&self) -> Vec<BucketSummary> {
let mut buckets: Vec<BucketSummary> = self
.buckets
.iter()
.map(|(name, stats)| BucketSummary {
name: name.clone(),
object_count: stats.object_count,
total_size_bytes: stats.total_size_bytes,
creation_date: stats
.creation_date
.as_ref()
.map(|creation_date| creation_date.timestamp_millis()),
})
.collect();
buckets.sort_by(|a, b| a.name.cmp(&b.name));
buckets
}
pub(crate) fn snapshot(
&self,
storage_directory: &Path,
storage_locations: Vec<StorageLocationSummary>,
) -> StorageStats {
StorageStats {
bucket_count: self.buckets.len() as u64,
total_object_count: self.total_object_count,
total_storage_bytes: self.total_storage_bytes,
buckets: self.bucket_summaries(),
storage_directory: storage_directory.to_string_lossy().to_string(),
storage_locations,
}
}
}
#[derive(Debug, Clone, Copy)]
struct FilesystemUsage {
total_bytes: u64,
available_bytes: u64,
used_bytes: u64,
}
pub(crate) fn storage_location_summary(path: &Path) -> StorageLocationSummary {
let usage = filesystem_usage(path);
StorageLocationSummary {
path: path.to_string_lossy().to_string(),
total_bytes: usage.map(|usage| usage.total_bytes),
available_bytes: usage.map(|usage| usage.available_bytes),
used_bytes: usage.map(|usage| usage.used_bytes),
}
}
#[cfg(unix)]
fn filesystem_usage(path: &Path) -> Option<FilesystemUsage> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let path_bytes = path.as_os_str().as_bytes();
let c_path = CString::new(path_bytes).ok()?;
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 {
return None;
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let free_bytes = stat.f_bfree as u64 * block_size;
Some(FilesystemUsage {
total_bytes,
available_bytes,
used_bytes: total_bytes.saturating_sub(free_bytes),
})
}
#[cfg(not(unix))]
fn filesystem_usage(_path: &Path) -> Option<FilesystemUsage> {
None
}
// ============================
// FileStore
// ============================
pub struct FileStore {
root_dir: PathBuf,
runtime_stats: RwLock<RuntimeStatsState>,
}
impl FileStore {
pub fn new(root_dir: PathBuf) -> Self {
Self {
root_dir,
runtime_stats: RwLock::new(RuntimeStatsState::default()),
}
}
pub async fn initialize(&self) -> Result<()> {
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
pub fn policies_dir(&self) -> PathBuf {
self.root_dir.join(".policies")
}
pub async fn reset(&self) -> Result<()> {
if self.root_dir.exists() {
fs::remove_dir_all(&self.root_dir).await?;
}
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.snapshot(
&self.root_dir,
vec![storage_location_summary(&self.root_dir)],
))
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.bucket_summaries())
}
async fn refresh_runtime_stats(&self) {
let buckets = match self.list_buckets().await {
Ok(buckets) => buckets,
Err(error) => {
tracing::warn!(path = %self.root_dir.display(), error = %error, "Failed to initialize runtime stats");
return;
}
};
let mut runtime_buckets = HashMap::new();
for bucket in buckets {
let bucket_path = self.root_dir.join(&bucket.name);
match Self::scan_bucket_objects(&bucket_path).await {
Ok((object_count, total_size_bytes)) => {
runtime_buckets.insert(
bucket.name,
RuntimeBucketStats {
object_count,
total_size_bytes,
creation_date: Some(bucket.creation_date),
},
);
}
Err(error) => {
tracing::warn!(bucket = %bucket.name, error = %error, "Failed to scan bucket for runtime stats");
}
}
}
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.replace_buckets(runtime_buckets);
}
// ============================
// Bucket operations
// ============================
pub async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
let mut buckets = Vec::new();
let mut entries = fs::read_dir(&self.root_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let meta = entry.metadata().await?;
if meta.is_dir() {
let name = entry.file_name().to_string_lossy().to_string();
// Skip hidden dirs like .multipart
if name.starts_with('.') {
continue;
}
let creation_date: DateTime<Utc> = meta
.created()
.unwrap_or(meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH))
.into();
buckets.push(BucketInfo {
name,
creation_date,
});
}
}
buckets.sort_by(|a, b| a.name.cmp(&b.name));
Ok(buckets)
}
pub async fn bucket_exists(&self, bucket: &str) -> bool {
self.root_dir.join(bucket).is_dir()
}
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
let bucket_path = self.root_dir.join(bucket);
fs::create_dir_all(&bucket_path).await?;
self.track_bucket_created(bucket).await;
Ok(())
}
pub async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let bucket_path = self.root_dir.join(bucket);
if !bucket_path.is_dir() {
return Err(StorageError::no_such_bucket().into());
}
// Check if bucket is empty (ignore hidden files)
let mut entries = fs::read_dir(&bucket_path).await?;
while let Some(_entry) = entries.next_entry().await? {
return Err(StorageError::bucket_not_empty().into());
}
fs::remove_dir_all(&bucket_path).await?;
self.track_bucket_deleted(bucket).await;
Ok(())
}
// ============================
// Object operations
// ============================
pub async fn put_object(
&self,
bucket: &str,
key: &str,
body: Incoming,
metadata: HashMap<String, String>,
) -> Result<PutResult> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
}
let file = fs::File::create(&object_path).await?;
let mut writer = BufWriter::new(file);
let mut hasher = Md5::new();
// Stream body frames directly to file
let mut body = body;
loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
hasher.update(&data);
writer.write_all(&data).await?;
}
}
Some(Err(e)) => {
return Err(anyhow::anyhow!("Body read error: {}", e));
}
None => break,
}
}
writer.flush().await?;
drop(writer);
let md5_hex = format!("{:x}", hasher.finalize());
// Write MD5 sidecar
let md5_path = format!("{}.md5", object_path.display());
fs::write(&md5_path, &md5_hex).await?;
// Write metadata sidecar
let metadata_path = format!("{}.metadata.json", object_path.display());
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(&metadata_path, metadata_json).await?;
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(bucket, previous_size, object_size)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&object_path, data).await?;
let md5_hex = format!("{:x}", Md5::digest(data));
fs::write(format!("{}.md5", object_path.display()), &md5_hex).await?;
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(
format!("{}.metadata.json", object_path.display()),
metadata_json,
)
.await?;
self.track_object_upsert(bucket, previous_size, data.len() as u64)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object(
&self,
bucket: &str,
key: &str,
range: Option<(u64, u64)>,
) -> Result<GetResult> {
let object_path = self.object_path(bucket, key);
if !object_path.exists() {
return Err(StorageError::no_such_key().into());
}
let file_meta = fs::metadata(&object_path).await?;
let size = file_meta.len();
let last_modified: DateTime<Utc> = file_meta.modified()?.into();
let md5 = self.read_md5(&object_path).await;
let metadata = self.read_metadata(&object_path).await;
let mut file = fs::File::open(&object_path).await?;
let content_length = if let Some((start, end)) = range {
file.seek(std::io::SeekFrom::Start(start)).await?;
end - start + 1
} else {
size
};
Ok(GetResult {
size,
last_modified,
md5,
metadata,
body: file,
content_length,
})
}
pub async fn head_object(&self, bucket: &str, key: &str) -> Result<HeadResult> {
let object_path = self.object_path(bucket, key);
if !object_path.exists() {
return Err(StorageError::no_such_key().into());
}
// Only stat the file, don't open it
let file_meta = fs::metadata(&object_path).await?;
let size = file_meta.len();
let last_modified: DateTime<Utc> = file_meta.modified()?.into();
let md5 = self.read_md5(&object_path).await;
let metadata = self.read_metadata(&object_path).await;
Ok(HeadResult {
size,
last_modified,
md5,
metadata,
})
}
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
let existing_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
let md5_path = format!("{}.md5", object_path.display());
let metadata_path = format!("{}.metadata.json", object_path.display());
// S3 doesn't error if object doesn't exist
let _ = fs::remove_file(&object_path).await;
let _ = fs::remove_file(&md5_path).await;
let _ = fs::remove_file(&metadata_path).await;
// Clean up empty parent directories up to bucket level
let bucket_path = self.root_dir.join(bucket);
let mut current = object_path.parent().map(|p| p.to_path_buf());
while let Some(dir) = current {
if dir == bucket_path {
break;
}
if fs::read_dir(&dir).await.is_ok() {
let mut entries = fs::read_dir(&dir).await?;
if entries.next_entry().await?.is_none() {
let _ = fs::remove_dir(&dir).await;
} else {
break;
}
}
current = dir.parent().map(|p| p.to_path_buf());
}
self.track_object_deleted(bucket, existing_size).await;
Ok(())
}
pub async fn copy_object(
&self,
src_bucket: &str,
src_key: &str,
dest_bucket: &str,
dest_key: &str,
metadata_directive: &str,
new_metadata: Option<HashMap<String, String>>,
) -> Result<CopyResult> {
let src_path = self.object_path(src_bucket, src_key);
let dest_path = self.object_path(dest_bucket, dest_key);
if !src_path.exists() {
return Err(StorageError::no_such_key().into());
}
if !self.bucket_exists(dest_bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(dest_bucket, dest_key).await;
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent).await?;
}
// Copy object file
fs::copy(&src_path, &dest_path).await?;
// Handle metadata
if metadata_directive == "COPY" {
let src_meta_path = format!("{}.metadata.json", src_path.display());
let dest_meta_path = format!("{}.metadata.json", dest_path.display());
let _ = fs::copy(&src_meta_path, &dest_meta_path).await;
} else if let Some(meta) = new_metadata {
let dest_meta_path = format!("{}.metadata.json", dest_path.display());
let json = serde_json::to_string_pretty(&meta)?;
fs::write(&dest_meta_path, json).await?;
}
// Copy MD5
let src_md5_path = format!("{}.md5", src_path.display());
let dest_md5_path = format!("{}.md5", dest_path.display());
let _ = fs::copy(&src_md5_path, &dest_md5_path).await;
let file_meta = fs::metadata(&dest_path).await?;
let md5 = self.read_md5(&dest_path).await;
let last_modified: DateTime<Utc> = file_meta.modified()?.into();
self.track_object_upsert(dest_bucket, previous_size, file_meta.len())
.await;
Ok(CopyResult { md5, last_modified })
}
pub async fn list_objects(
&self,
bucket: &str,
prefix: &str,
delimiter: &str,
max_keys: usize,
continuation_token: Option<&str>,
) -> Result<ListObjectsResult> {
let bucket_path = self.root_dir.join(bucket);
if !bucket_path.is_dir() {
return Err(StorageError::no_such_bucket().into());
}
// Collect all object keys recursively
let mut keys = Vec::new();
self.collect_keys(&bucket_path, &bucket_path, &mut keys)
.await?;
// Apply prefix filter
if !prefix.is_empty() {
keys.retain(|k| k.starts_with(prefix));
}
keys.sort();
// Handle continuation token
if let Some(token) = continuation_token {
if let Some(pos) = keys.iter().position(|k| k.as_str() > token) {
keys = keys[pos..].to_vec();
} else {
keys.clear();
}
}
// Handle delimiter and pagination
let mut common_prefixes: Vec<String> = Vec::new();
let mut common_prefix_set = std::collections::HashSet::new();
let mut contents: Vec<ListObjectEntry> = Vec::new();
let mut is_truncated = false;
for key in &keys {
if !delimiter.is_empty() {
let remaining = &key[prefix.len()..];
if let Some(delim_idx) = remaining.find(delimiter) {
let cp = format!("{}{}", prefix, &remaining[..delim_idx + delimiter.len()]);
if common_prefix_set.insert(cp.clone()) {
common_prefixes.push(cp);
}
continue;
}
}
if contents.len() >= max_keys {
is_truncated = true;
break;
}
let object_path = self.object_path(bucket, key);
if let Ok(meta) = fs::metadata(&object_path).await {
let md5 = self.read_md5(&object_path).await;
let last_modified: DateTime<Utc> = meta
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
.into();
contents.push(ListObjectEntry {
key: key.clone(),
size: meta.len(),
last_modified,
md5,
});
}
}
let next_continuation_token = if is_truncated {
contents.last().map(|e| e.key.clone())
} else {
None
};
common_prefixes.sort();
Ok(ListObjectsResult {
contents,
common_prefixes,
is_truncated,
next_continuation_token,
prefix: prefix.to_string(),
delimiter: delimiter.to_string(),
max_keys,
})
}
// ============================
// Multipart operations
// ============================
fn multipart_dir(&self) -> PathBuf {
self.root_dir.join(".multipart")
}
pub async fn initiate_multipart(
&self,
bucket: &str,
key: &str,
metadata: HashMap<String, String>,
) -> Result<String> {
let upload_id = Uuid::new_v4().to_string().replace('-', "");
let upload_dir = self.multipart_dir().join(&upload_id);
fs::create_dir_all(&upload_dir).await?;
let meta = MultipartMetadata {
upload_id: upload_id.clone(),
bucket: bucket.to_string(),
key: key.to_string(),
initiated: Utc::now().to_rfc3339(),
metadata,
parts: Vec::new(),
};
let meta_path = upload_dir.join("metadata.json");
let json = serde_json::to_string_pretty(&meta)?;
fs::write(&meta_path, json).await?;
Ok(upload_id)
}
pub async fn upload_part(
&self,
upload_id: &str,
part_number: u32,
body: Incoming,
) -> Result<(String, u64)> {
let upload_dir = self.multipart_dir().join(upload_id);
if !upload_dir.is_dir() {
return Err(StorageError::no_such_upload().into());
}
let part_path = upload_dir.join(format!("part-{}", part_number));
let file = fs::File::create(&part_path).await?;
let mut writer = BufWriter::new(file);
let mut hasher = Md5::new();
let mut size: u64 = 0;
let mut body = body;
loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
hasher.update(&data);
size += data.len() as u64;
writer.write_all(&data).await?;
}
}
Some(Err(e)) => {
return Err(anyhow::anyhow!("Body read error: {}", e));
}
None => break,
}
}
writer.flush().await?;
drop(writer);
let etag = format!("{:x}", hasher.finalize());
// Update metadata
self.update_multipart_metadata(upload_id, part_number, &etag, size)
.await?;
Ok((etag, size))
}
async fn update_multipart_metadata(
&self,
upload_id: &str,
part_number: u32,
etag: &str,
size: u64,
) -> Result<()> {
let meta_path = self.multipart_dir().join(upload_id).join("metadata.json");
let content = fs::read_to_string(&meta_path).await?;
let mut meta: MultipartMetadata = serde_json::from_str(&content)?;
// Remove existing part with same number
meta.parts.retain(|p| p.part_number != part_number);
meta.parts.push(PartMetadata {
part_number,
etag: etag.to_string(),
size,
last_modified: Utc::now().to_rfc3339(),
});
meta.parts.sort_by_key(|p| p.part_number);
let json = serde_json::to_string_pretty(&meta)?;
fs::write(&meta_path, json).await?;
Ok(())
}
pub async fn complete_multipart(
&self,
upload_id: &str,
parts: &[(u32, String)],
) -> Result<CompleteMultipartResult> {
let upload_dir = self.multipart_dir().join(upload_id);
if !upload_dir.is_dir() {
return Err(StorageError::no_such_upload().into());
}
// Read metadata to get bucket/key
let meta_path = upload_dir.join("metadata.json");
let content = fs::read_to_string(&meta_path).await?;
let meta: MultipartMetadata = serde_json::from_str(&content)?;
let previous_size = self.object_size_if_exists(&meta.bucket, &meta.key).await;
let object_path = self.object_path(&meta.bucket, &meta.key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
}
// Concatenate parts into final object, stream each part
let dest_file = fs::File::create(&object_path).await?;
let mut writer = BufWriter::new(dest_file);
let mut hasher = Md5::new();
for (part_number, _etag) in parts {
let part_path = upload_dir.join(format!("part-{}", part_number));
if !part_path.exists() {
return Err(anyhow::anyhow!("Part {} not found", part_number));
}
let mut part_file = fs::File::open(&part_path).await?;
let mut buf = vec![0u8; 64 * 1024]; // 64KB buffer
loop {
let n = part_file.read(&mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
writer.write_all(&buf[..n]).await?;
}
}
writer.flush().await?;
drop(writer);
let etag = format!("{:x}", hasher.finalize());
// Write MD5 sidecar
let md5_path = format!("{}.md5", object_path.display());
fs::write(&md5_path, &etag).await?;
// Write metadata sidecar
let metadata_path = format!("{}.metadata.json", object_path.display());
let metadata_json = serde_json::to_string_pretty(&meta.metadata)?;
fs::write(&metadata_path, metadata_json).await?;
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(&meta.bucket, previous_size, object_size)
.await;
// Clean up multipart directory
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(CompleteMultipartResult { etag })
}
pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
let upload_dir = self.multipart_dir().join(upload_id);
if !upload_dir.is_dir() {
return Err(StorageError::no_such_upload().into());
}
fs::remove_dir_all(&upload_dir).await?;
Ok(())
}
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
let multipart_dir = self.multipart_dir();
if !multipart_dir.is_dir() {
return Ok(Vec::new());
}
let mut uploads = Vec::new();
let mut entries = fs::read_dir(&multipart_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if !entry.metadata().await?.is_dir() {
continue;
}
let meta_path = entry.path().join("metadata.json");
if let Ok(content) = fs::read_to_string(&meta_path).await {
if let Ok(meta) = serde_json::from_str::<MultipartMetadata>(&content) {
if meta.bucket == bucket {
let initiated = DateTime::parse_from_rfc3339(&meta.initiated)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
uploads.push(MultipartUploadInfo {
upload_id: meta.upload_id,
key: meta.key,
initiated,
});
}
}
}
}
Ok(uploads)
}
// ============================
// Helpers
// ============================
async fn scan_bucket_objects(bucket_path: &Path) -> Result<(u64, u64)> {
let mut object_count = 0u64;
let mut total_size_bytes = 0u64;
let mut directories = vec![bucket_path.to_path_buf()];
while let Some(directory) = directories.pop() {
let mut entries = match fs::read_dir(&directory).await {
Ok(entries) => entries,
Err(_) => continue,
};
while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
if metadata.is_dir() {
directories.push(entry.path());
continue;
}
let name = entry.file_name().to_string_lossy().to_string();
if name.ends_with("._storage_object") {
object_count += 1;
total_size_bytes += metadata.len();
}
}
}
Ok((object_count, total_size_bytes))
}
async fn bucket_creation_date(&self, bucket: &str) -> Option<DateTime<Utc>> {
let metadata = fs::metadata(self.root_dir.join(bucket)).await.ok()?;
let created_or_modified = metadata.created().unwrap_or(
metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
);
Some(created_or_modified.into())
}
async fn object_size_if_exists(&self, bucket: &str, key: &str) -> Option<u64> {
fs::metadata(self.object_path(bucket, key))
.await
.ok()
.map(|metadata| metadata.len())
}
async fn track_bucket_created(&self, bucket: &str) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
}
async fn track_bucket_deleted(&self, bucket: &str) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_bucket(bucket);
}
async fn track_object_upsert(&self, bucket: &str, previous_size: Option<u64>, new_size: u64) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
runtime_stats.upsert_object(bucket, previous_size, new_size);
}
async fn track_object_deleted(&self, bucket: &str, existing_size: Option<u64>) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_object(bucket, existing_size);
}
fn object_path(&self, bucket: &str, key: &str) -> PathBuf {
let encoded = encode_key(key);
self.root_dir
.join(bucket)
.join(format!("{}._storage_object", encoded))
}
async fn read_md5(&self, object_path: &Path) -> String {
let md5_path = format!("{}.md5", object_path.display());
match fs::read_to_string(&md5_path).await {
Ok(s) => s.trim().to_string(),
Err(_) => {
// Calculate MD5 if sidecar missing
match self.calculate_md5(object_path).await {
Ok(hash) => {
let _ = fs::write(&md5_path, &hash).await;
hash
}
Err(_) => String::new(),
}
}
}
}
async fn calculate_md5(&self, path: &Path) -> Result<String> {
let mut file = fs::File::open(path).await?;
let mut hasher = Md5::new();
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = file.read(&mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
Ok(format!("{:x}", hasher.finalize()))
}
async fn read_metadata(&self, object_path: &Path) -> HashMap<String, String> {
let meta_path = format!("{}.metadata.json", object_path.display());
match fs::read_to_string(&meta_path).await {
Ok(s) => serde_json::from_str(&s).unwrap_or_default(),
Err(_) => HashMap::new(),
}
}
fn collect_keys<'a>(
&'a self,
bucket_path: &'a Path,
dir: &'a Path,
keys: &'a mut Vec<String>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let mut entries = match fs::read_dir(dir).await {
Ok(e) => e,
Err(_) => return Ok(()),
};
while let Some(entry) = entries.next_entry().await? {
let meta = entry.metadata().await?;
let name = entry.file_name().to_string_lossy().to_string();
if meta.is_dir() {
self.collect_keys(bucket_path, &entry.path(), keys).await?;
} else if name.ends_with("._storage_object")
&& !name.ends_with(".metadata.json")
&& !name.ends_with(".md5")
{
let relative = entry
.path()
.strip_prefix(bucket_path)
.unwrap_or(Path::new(""))
.to_string_lossy()
.to_string();
let key = decode_key(relative.trim_end_matches("._storage_object"));
keys.push(key);
}
}
Ok(())
})
}
}
// ============================
// StorageBackend enum
// ============================
/// Unified storage backend that dispatches to either standalone (FileStore)
/// or clustered (DistributedStore) storage.
pub enum StorageBackend {
Standalone(FileStore),
Clustered(DistributedStore),
}
impl StorageBackend {
pub fn policies_dir(&self) -> std::path::PathBuf {
match self {
StorageBackend::Standalone(fs) => fs.policies_dir(),
StorageBackend::Clustered(ds) => ds.policies_dir(),
}
}
pub async fn get_cluster_health(&self) -> Result<ClusterHealth> {
match self {
StorageBackend::Standalone(_) => Ok(ClusterHealth {
enabled: false,
node_id: None,
quorum_healthy: None,
majority_healthy: None,
peers: None,
drives: None,
erasure: None,
repairs: None,
}),
StorageBackend::Clustered(ds) => ds.get_cluster_health().await,
}
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
match self {
StorageBackend::Standalone(fs) => fs.get_storage_stats().await,
StorageBackend::Clustered(ds) => ds.get_storage_stats().await,
}
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
match self {
StorageBackend::Standalone(fs) => fs.list_bucket_summaries().await,
StorageBackend::Clustered(ds) => ds.list_bucket_summaries().await,
}
}
pub async fn initialize(&self) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.initialize().await,
StorageBackend::Clustered(ds) => {
// Ensure policies directory exists
tokio::fs::create_dir_all(ds.policies_dir()).await?;
ds.initialize_runtime_stats().await;
Ok(())
}
}
}
pub async fn reset(&self) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.reset().await,
StorageBackend::Clustered(_) => Ok(()), // TODO: cluster reset
}
}
pub async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
match self {
StorageBackend::Standalone(fs) => fs.list_buckets().await,
StorageBackend::Clustered(ds) => ds.list_buckets().await,
}
}
pub async fn bucket_exists(&self, bucket: &str) -> bool {
match self {
StorageBackend::Standalone(fs) => fs.bucket_exists(bucket).await,
StorageBackend::Clustered(ds) => ds.bucket_exists(bucket).await,
}
}
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.create_bucket(bucket).await,
StorageBackend::Clustered(ds) => ds.create_bucket(bucket).await,
}
}
pub async fn delete_bucket(&self, bucket: &str) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.delete_bucket(bucket).await,
StorageBackend::Clustered(ds) => ds.delete_bucket(bucket).await,
}
}
pub async fn delete_bucket_recursive(&self, bucket: &str) -> Result<()> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
loop {
let objects = self.list_objects(bucket, "", "", 1000, None).await?;
if objects.contents.is_empty() {
break;
}
for object in objects.contents {
self.delete_object(bucket, &object.key).await?;
}
}
self.delete_bucket(bucket).await
}
pub async fn put_object(
&self,
bucket: &str,
key: &str,
body: Incoming,
metadata: HashMap<String, String>,
) -> Result<PutResult> {
match self {
StorageBackend::Standalone(fs) => fs.put_object(bucket, key, body, metadata).await,
StorageBackend::Clustered(ds) => ds.put_object(bucket, key, body, metadata).await,
}
}
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.put_object_bytes(bucket, key, data, metadata).await
}
StorageBackend::Clustered(ds) => ds.put_object_bytes(bucket, key, data, metadata).await,
}
}
pub async fn get_object(
&self,
bucket: &str,
key: &str,
range: Option<(u64, u64)>,
) -> Result<GetResult> {
match self {
StorageBackend::Standalone(fs) => fs.get_object(bucket, key, range).await,
StorageBackend::Clustered(ds) => ds.get_object(bucket, key, range).await,
}
}
pub async fn head_object(&self, bucket: &str, key: &str) -> Result<HeadResult> {
match self {
StorageBackend::Standalone(fs) => fs.head_object(bucket, key).await,
StorageBackend::Clustered(ds) => ds.head_object(bucket, key).await,
}
}
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.delete_object(bucket, key).await,
StorageBackend::Clustered(ds) => ds.delete_object(bucket, key).await,
}
}
pub async fn copy_object(
&self,
src_bucket: &str,
src_key: &str,
dest_bucket: &str,
dest_key: &str,
metadata_directive: &str,
new_metadata: Option<HashMap<String, String>>,
) -> Result<CopyResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
StorageBackend::Clustered(ds) => {
ds.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
}
}
pub async fn list_objects(
&self,
bucket: &str,
prefix: &str,
delimiter: &str,
max_keys: usize,
continuation_token: Option<&str>,
) -> Result<ListObjectsResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
StorageBackend::Clustered(ds) => {
ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
}
}
pub async fn initiate_multipart(
&self,
bucket: &str,
key: &str,
metadata: HashMap<String, String>,
) -> Result<String> {
match self {
StorageBackend::Standalone(fs) => fs.initiate_multipart(bucket, key, metadata).await,
StorageBackend::Clustered(ds) => ds.initiate_multipart(bucket, key, metadata).await,
}
}
pub async fn upload_part(
&self,
upload_id: &str,
part_number: u32,
body: Incoming,
) -> Result<(String, u64)> {
match self {
StorageBackend::Standalone(fs) => fs.upload_part(upload_id, part_number, body).await,
StorageBackend::Clustered(ds) => ds.upload_part(upload_id, part_number, body).await,
}
}
pub async fn complete_multipart(
&self,
upload_id: &str,
parts: &[(u32, String)],
) -> Result<CompleteMultipartResult> {
match self {
StorageBackend::Standalone(fs) => fs.complete_multipart(upload_id, parts).await,
StorageBackend::Clustered(ds) => ds.complete_multipart(upload_id, parts).await,
}
}
pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.abort_multipart(upload_id).await,
StorageBackend::Clustered(ds) => ds.abort_multipart(upload_id).await,
}
}
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
match self {
StorageBackend::Standalone(fs) => fs.list_multipart_uploads(bucket).await,
StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await,
}
}
pub async fn export_bucket(&self, bucket: &str) -> Result<BucketExport> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let objects = self.list_objects(bucket, "", "", usize::MAX, None).await?;
let mut exported_objects = Vec::with_capacity(objects.contents.len());
for object in objects.contents {
let result = self.get_object(bucket, &object.key, None).await?;
let mut file = result.body;
let mut data = Vec::with_capacity(result.size as usize);
file.read_to_end(&mut data).await?;
exported_objects.push(BucketExportObject {
key: object.key,
size: result.size,
md5: result.md5,
metadata: result.metadata,
data_hex: hex::encode(data),
});
}
Ok(BucketExport {
format: "smartstorage.bucket.v1".to_string(),
bucket_name: bucket.to_string(),
exported_at: Utc::now().timestamp_millis(),
objects: exported_objects,
})
}
pub async fn import_bucket(&self, bucket: &str, source: BucketExport) -> Result<()> {
if source.format != "smartstorage.bucket.v1" {
return Err(StorageError::invalid_request("Unsupported bucket export format.").into());
}
if !self.bucket_exists(bucket).await {
self.create_bucket(bucket).await?;
}
for object in source.objects {
let data = hex::decode(&object.data_hex)
.map_err(|error| StorageError::invalid_request(&error.to_string()))?;
self.put_object_bytes(bucket, &object.key, &data, object.metadata)
.await?;
}
Ok(())
}
}
// ============================
// Key encoding (identity on Linux)
// ============================
fn encode_key(key: &str) -> String {
if cfg!(windows) {
key.chars()
.map(|c| match c {
'<' | '>' | ':' | '"' | '\\' | '|' | '?' | '*' => {
format!("&{:02x}", c as u32)
}
_ => c.to_string(),
})
.collect()
} else {
key.to_string()
}
}
fn decode_key(encoded: &str) -> String {
if cfg!(windows) {
let mut result = String::new();
let mut chars = encoded.chars();
while let Some(c) = chars.next() {
if c == '&' {
let hex: String = chars.by_ref().take(2).collect();
if let Ok(byte) = u8::from_str_radix(&hex, 16) {
result.push(byte as char);
} else {
result.push('&');
result.push_str(&hex);
}
} else {
result.push(c);
}
}
result
} else {
encoded.to_string()
}
}