use anyhow::Result; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use tokio::fs; use tokio::io::AsyncWriteExt; /// Identifies a specific shard on disk. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct ShardId { pub bucket: String, pub key: String, pub chunk_index: u32, pub shard_index: u32, } /// Per-shard metadata stored alongside shard data. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ShardMeta { pub shard_index: u32, pub chunk_index: u32, pub data_size: u64, pub checksum: u32, // crc32c } /// Manages shard storage on a single drive. /// /// Layout on disk: /// ```text /// {base_path}/.smartstorage/data/{bucket}/{key_prefix}/{key}/ /// chunk-{N}/shard-{M}.dat (shard data) /// chunk-{N}/shard-{M}.meta (shard metadata JSON) /// ``` pub struct ShardStore { base_path: PathBuf, } impl ShardStore { pub fn new(base_path: PathBuf) -> Self { Self { base_path } } /// Write a shard to disk atomically (write to temp file, then rename). pub async fn write_shard( &self, shard_id: &ShardId, data: &[u8], checksum: u32, ) -> Result<()> { let shard_path = self.shard_data_path(shard_id); let meta_path = self.shard_meta_path(shard_id); // Ensure parent directory exists if let Some(parent) = shard_path.parent() { fs::create_dir_all(parent).await?; } // Write data atomically via temp file + rename let temp_data_path = shard_path.with_extension("dat.tmp"); { let mut file = fs::File::create(&temp_data_path).await?; file.write_all(data).await?; file.flush().await?; file.sync_all().await?; } fs::rename(&temp_data_path, &shard_path).await?; // Write metadata let meta = ShardMeta { shard_index: shard_id.shard_index, chunk_index: shard_id.chunk_index, data_size: data.len() as u64, checksum, }; let meta_json = serde_json::to_string(&meta)?; let temp_meta_path = meta_path.with_extension("meta.tmp"); fs::write(&temp_meta_path, meta_json).await?; fs::rename(&temp_meta_path, &meta_path).await?; Ok(()) } /// Read a shard's data from disk. pub async fn read_shard(&self, shard_id: &ShardId) -> Result<(Vec, u32)> { let shard_path = self.shard_data_path(shard_id); let meta_path = self.shard_meta_path(shard_id); let data = fs::read(&shard_path).await?; let meta_json = fs::read_to_string(&meta_path).await?; let meta: ShardMeta = serde_json::from_str(&meta_json)?; Ok((data, meta.checksum)) } /// Check if a shard exists and return its metadata. pub async fn head_shard(&self, shard_id: &ShardId) -> Result> { let meta_path = self.shard_meta_path(shard_id); if !meta_path.exists() { return Ok(None); } let meta_json = fs::read_to_string(&meta_path).await?; let meta: ShardMeta = serde_json::from_str(&meta_json)?; Ok(Some(meta)) } /// Delete a shard and its metadata. pub async fn delete_shard(&self, shard_id: &ShardId) -> Result<()> { let shard_path = self.shard_data_path(shard_id); let meta_path = self.shard_meta_path(shard_id); let _ = fs::remove_file(&shard_path).await; let _ = fs::remove_file(&meta_path).await; // Clean up empty parent directories self.cleanup_empty_dirs(shard_id).await; Ok(()) } /// List all shard IDs for a given bucket and key (across all chunks). pub async fn list_shards_for_object( &self, bucket: &str, key: &str, ) -> Result> { let key_dir = self.key_dir(bucket, key); if !key_dir.exists() { return Ok(Vec::new()); } let mut result = Vec::new(); let mut entries = fs::read_dir(&key_dir).await?; while let Some(entry) = entries.next_entry().await? { let name = entry.file_name().to_string_lossy().to_string(); if !name.starts_with("chunk-") || !entry.metadata().await?.is_dir() { continue; } let chunk_index: u32 = match name.strip_prefix("chunk-").and_then(|s| s.parse().ok()) { Some(idx) => idx, None => continue, }; let mut chunk_entries = fs::read_dir(entry.path()).await?; while let Some(shard_entry) = chunk_entries.next_entry().await? { let shard_name = shard_entry.file_name().to_string_lossy().to_string(); if shard_name.starts_with("shard-") && shard_name.ends_with(".dat") { let shard_index: u32 = match shard_name .strip_prefix("shard-") .and_then(|s| s.strip_suffix(".dat")) .and_then(|s| s.parse().ok()) { Some(idx) => idx, None => continue, }; result.push(ShardId { bucket: bucket.to_string(), key: key.to_string(), chunk_index, shard_index, }); } } } result.sort_by(|a, b| { a.chunk_index .cmp(&b.chunk_index) .then(a.shard_index.cmp(&b.shard_index)) }); Ok(result) } // ============================ // Path helpers // ============================ fn data_root(&self) -> PathBuf { self.base_path.join(".smartstorage").join("data") } fn key_prefix(key: &str) -> String { // Use first 2 hex chars of a simple hash for directory fan-out let hash = xxhash_rust::xxh64::xxh64(key.as_bytes(), 0); format!("{:02x}", hash & 0xFF) } fn key_dir(&self, bucket: &str, key: &str) -> PathBuf { self.data_root() .join(bucket) .join(Self::key_prefix(key)) .join(key) } fn chunk_dir(&self, shard_id: &ShardId) -> PathBuf { self.key_dir(&shard_id.bucket, &shard_id.key) .join(format!("chunk-{}", shard_id.chunk_index)) } fn shard_data_path(&self, shard_id: &ShardId) -> PathBuf { self.chunk_dir(shard_id) .join(format!("shard-{}.dat", shard_id.shard_index)) } fn shard_meta_path(&self, shard_id: &ShardId) -> PathBuf { self.chunk_dir(shard_id) .join(format!("shard-{}.meta", shard_id.shard_index)) } async fn cleanup_empty_dirs(&self, shard_id: &ShardId) { // Try to remove chunk dir if empty let chunk_dir = self.chunk_dir(shard_id); let _ = fs::remove_dir(&chunk_dir).await; // fails silently if not empty // Try to remove key dir if empty let key_dir = self.key_dir(&shard_id.bucket, &shard_id.key); let _ = fs::remove_dir(&key_dir).await; // Try to remove prefix dir if empty if let Some(prefix_dir) = key_dir.parent() { let _ = fs::remove_dir(prefix_dir).await; } } }