use anyhow::Result; use chrono::{DateTime, Utc}; use http_body_util::BodyExt; use hyper::body::Incoming; use md5::{Digest, Md5}; use std::collections::HashMap; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; use tokio::fs; use super::config::ErasureConfig; use super::erasure::ErasureCoder; use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement}; use super::placement::ErasureSet; use super::protocol::{ClusterRequest, ShardDeleteRequest, ShardReadRequest, ShardWriteRequest}; use super::quic_transport::QuicTransport; use super::shard_store::{ShardId, ShardStore}; use super::state::ClusterState; use crate::storage::{ BucketInfo, CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry, ListObjectsResult, MultipartUploadInfo, PutResult, }; use serde::{Deserialize, Serialize}; /// Multipart upload session metadata. #[derive(Debug, Clone, Serialize, Deserialize)] struct MultipartSession { upload_id: String, bucket: String, key: String, initiated: String, metadata: HashMap, parts: HashMap, // part_number -> etag } /// Per-part info stored during multipart upload. #[derive(Debug, Clone, Serialize, Deserialize)] struct PartInfo { part_number: u32, etag: String, size: u64, part_key: String, chunks: Vec, } /// Distributed storage coordinator. /// /// Handles S3 operations by distributing erasure-coded shards across /// the cluster via QUIC, with quorum-based consistency. pub struct DistributedStore { state: Arc, transport: Arc, erasure_coder: ErasureCoder, /// Local shard stores, one per drive. Index = drive index. local_shard_stores: Vec>, /// Root directory for manifests on this node manifest_dir: PathBuf, /// Root directory for buckets metadata buckets_dir: PathBuf, /// Root directory for bucket policies policies_dir: PathBuf, erasure_config: ErasureConfig, } impl DistributedStore { pub fn new( state: Arc, transport: Arc, erasure_config: ErasureConfig, drive_paths: Vec, manifest_dir: PathBuf, buckets_dir: PathBuf, ) -> Result { let erasure_coder = ErasureCoder::new(&erasure_config)?; let local_shard_stores = drive_paths .iter() .map(|p| Arc::new(ShardStore::new(p.clone()))) .collect(); let policies_dir = buckets_dir.join(".policies"); Ok(Self { state, transport, erasure_coder, local_shard_stores, manifest_dir, buckets_dir, policies_dir, erasure_config, }) } /// Get the policies directory path. pub fn policies_dir(&self) -> PathBuf { self.policies_dir.clone() } // ============================ // Object operations // ============================ pub async fn put_object( &self, bucket: &str, key: &str, body: Incoming, metadata: HashMap, ) -> Result { if !self.bucket_exists(bucket).await { return Err(crate::error::StorageError::no_such_bucket().into()); } let erasure_set = self .state .get_erasure_set_for_object(bucket, key) .await .ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?; let chunk_size = self.erasure_config.chunk_size_bytes; let mut chunk_buffer = Vec::with_capacity(chunk_size); let mut chunk_index: u32 = 0; let mut chunks = Vec::new(); let mut total_size: u64 = 0; let mut full_hasher = Md5::new(); // Stream body, processing one chunk at a time let mut body = body; loop { match body.frame().await { Some(Ok(frame)) => { if let Ok(data) = frame.into_data() { full_hasher.update(&data); total_size += data.len() as u64; chunk_buffer.extend_from_slice(&data); // Process complete chunks while chunk_buffer.len() >= chunk_size { let chunk_data: Vec = chunk_buffer.drain(..chunk_size).collect(); let chunk_manifest = self .encode_and_distribute_chunk( &erasure_set, bucket, key, chunk_index, &chunk_data, ) .await?; chunks.push(chunk_manifest); chunk_index += 1; } } } Some(Err(e)) => return Err(anyhow::anyhow!("Body read error: {}", e)), None => break, } } // Process final partial chunk if !chunk_buffer.is_empty() { let chunk_manifest = self .encode_and_distribute_chunk(&erasure_set, bucket, key, chunk_index, &chunk_buffer) .await?; chunks.push(chunk_manifest); } let md5_hex = format!("{:x}", full_hasher.finalize()); // Build and store manifest let manifest = ObjectManifest { bucket: bucket.to_string(), key: key.to_string(), version_id: uuid::Uuid::new_v4().to_string(), size: total_size, content_md5: md5_hex.clone(), content_type: metadata .get("content-type") .cloned() .unwrap_or_else(|| "binary/octet-stream".to_string()), metadata, created_at: Utc::now().to_rfc3339(), last_modified: Utc::now().to_rfc3339(), data_shards: self.erasure_config.data_shards, parity_shards: self.erasure_config.parity_shards, chunk_size: self.erasure_config.chunk_size_bytes, chunks, }; self.store_manifest(&manifest).await?; Ok(PutResult { md5: md5_hex }) } pub async fn get_object( &self, bucket: &str, key: &str, range: Option<(u64, u64)>, ) -> Result { let manifest = self.load_manifest(bucket, key).await?; // Determine which chunks to fetch based on range let chunk_size = manifest.chunk_size as u64; let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) = if let Some((start, end)) = range { let first = (start / chunk_size) as usize; let last = (end / chunk_size) as usize; let offset = (start % chunk_size) as usize; let end_in_last = (end % chunk_size) as usize + 1; (first, last, offset, end_in_last) } else { (0, manifest.chunks.len() - 1, 0, 0) }; // Reconstruct the needed chunks let mut full_data = Vec::new(); for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) { let chunk = &manifest.chunks[chunk_idx]; let reconstructed = self .fetch_and_reconstruct_chunk_for_object(chunk, bucket, key) .await?; full_data.extend_from_slice(&reconstructed); } // Apply range if requested let (response_data, content_length) = if let Some((start, end)) = range { let adjusted_start = byte_offset_in_first; let total_range_bytes = (end - start + 1) as usize; let adjusted_end = adjusted_start + total_range_bytes; let sliced = full_data[adjusted_start..adjusted_end.min(full_data.len())].to_vec(); let len = sliced.len() as u64; (sliced, len) } else { let len = full_data.len() as u64; (full_data, len) }; // Write to a temp file for streaming (matches FileStore's GetResult interface) let temp_path = self.manifest_dir.join(format!( ".tmp_get_{}_{}", uuid::Uuid::new_v4(), key.replace('/', "_") )); fs::write(&temp_path, &response_data).await?; let file = fs::File::open(&temp_path).await?; // Clean up temp file after opening (Unix: file stays accessible via fd) let _ = fs::remove_file(&temp_path).await; let last_modified: DateTime = manifest .last_modified .parse() .unwrap_or_else(|_| Utc::now()); Ok(GetResult { size: manifest.size, last_modified, md5: manifest.content_md5.clone(), metadata: manifest.metadata.clone(), body: file, content_length, }) } pub async fn head_object(&self, bucket: &str, key: &str) -> Result { let manifest = self.load_manifest(bucket, key).await?; let last_modified: DateTime = manifest .last_modified .parse() .unwrap_or_else(|_| Utc::now()); Ok(HeadResult { size: manifest.size, last_modified, md5: manifest.content_md5, metadata: manifest.metadata, }) } pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> { // Load manifest to find all shards if let Ok(manifest) = self.load_manifest(bucket, key).await { let local_id = self.state.local_node_id().to_string(); // Delete shards from all drives (local and remote) for chunk in &manifest.chunks { for placement in &chunk.shard_placements { if placement.node_id == local_id { // Local delete let shard_id = ShardId { bucket: bucket.to_string(), key: key.to_string(), chunk_index: chunk.chunk_index, shard_index: placement.shard_index, }; if let Some(store) = self .local_shard_stores .get(placement.drive_id.parse::().unwrap_or(0)) { let _ = store.delete_shard(&shard_id).await; } } else { // Remote delete via QUIC (best-effort, don't fail the delete) if let Err(e) = self .delete_shard_remote( &placement.node_id, bucket, key, chunk.chunk_index, placement.shard_index, ) .await { tracing::warn!( node = %placement.node_id, shard = placement.shard_index, error = %e, "Failed to delete remote shard (will be cleaned up by healing)" ); } } } } } // Delete manifest self.delete_manifest(bucket, key).await?; Ok(()) } pub async fn copy_object( &self, src_bucket: &str, src_key: &str, dest_bucket: &str, dest_key: &str, _metadata_directive: &str, new_metadata: Option>, ) -> Result { // Load source manifest let src_manifest = self.load_manifest(src_bucket, src_key).await?; // Determine metadata let metadata = if let Some(meta) = new_metadata { meta } else { src_manifest.metadata.clone() }; // Read source object fully, then reconstruct let mut full_data = Vec::new(); for chunk in &src_manifest.chunks { let reconstructed = self .fetch_and_reconstruct_chunk_for_object(chunk, src_bucket, src_key) .await?; full_data.extend_from_slice(&reconstructed); } // Compute MD5 of full data let mut hasher = Md5::new(); hasher.update(&full_data); let md5_hex = format!("{:x}", hasher.finalize()); // Get erasure set for destination let erasure_set = self .state .get_erasure_set_for_object(dest_bucket, dest_key) .await .ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?; // Re-encode and distribute in chunks let chunk_size = self.erasure_config.chunk_size_bytes; let mut chunks = Vec::new(); let mut chunk_index = 0u32; for chunk_data in full_data.chunks(chunk_size) { let chunk_manifest = self .encode_and_distribute_chunk( &erasure_set, dest_bucket, dest_key, chunk_index, chunk_data, ) .await?; chunks.push(chunk_manifest); chunk_index += 1; } let last_modified = Utc::now(); // Build and store manifest let manifest = ObjectManifest { bucket: dest_bucket.to_string(), key: dest_key.to_string(), version_id: uuid::Uuid::new_v4().to_string(), size: full_data.len() as u64, content_md5: md5_hex.clone(), content_type: metadata .get("content-type") .cloned() .unwrap_or_else(|| "binary/octet-stream".to_string()), metadata, created_at: last_modified.to_rfc3339(), last_modified: last_modified.to_rfc3339(), data_shards: self.erasure_config.data_shards, parity_shards: self.erasure_config.parity_shards, chunk_size: self.erasure_config.chunk_size_bytes, chunks, }; self.store_manifest(&manifest).await?; Ok(CopyResult { md5: md5_hex, last_modified, }) } pub async fn list_objects( &self, bucket: &str, prefix: &str, delimiter: &str, max_keys: usize, continuation_token: Option<&str>, ) -> Result { if !self.bucket_exists(bucket).await { return Err(crate::error::StorageError::no_such_bucket().into()); } // List manifests for this bucket let manifest_bucket_dir = self.manifest_dir.join(bucket); let mut keys = Vec::new(); if manifest_bucket_dir.is_dir() { self.collect_manifest_keys(&manifest_bucket_dir, &manifest_bucket_dir, &mut keys) .await?; } // Apply prefix filter if !prefix.is_empty() { keys.retain(|k| k.starts_with(prefix)); } keys.sort(); // Handle continuation token if let Some(token) = continuation_token { if let Some(pos) = keys.iter().position(|k| k.as_str() > token) { keys = keys[pos..].to_vec(); } else { keys.clear(); } } // Handle delimiter and pagination let mut common_prefixes: Vec = Vec::new(); let mut common_prefix_set = std::collections::HashSet::new(); let mut contents: Vec = Vec::new(); let mut is_truncated = false; for key in &keys { if !delimiter.is_empty() { let remaining = &key[prefix.len()..]; if let Some(delim_idx) = remaining.find(delimiter) { let cp = format!( "{}{}", prefix, &remaining[..delim_idx + delimiter.len()] ); if common_prefix_set.insert(cp.clone()) { common_prefixes.push(cp); } continue; } } if contents.len() >= max_keys { is_truncated = true; break; } if let Ok(manifest) = self.load_manifest(bucket, key).await { let last_modified: DateTime = manifest .last_modified .parse() .unwrap_or_else(|_| Utc::now()); contents.push(ListObjectEntry { key: key.clone(), size: manifest.size, last_modified, md5: manifest.content_md5, }); } } let next_continuation_token = if is_truncated { contents.last().map(|e| e.key.clone()) } else { None }; common_prefixes.sort(); Ok(ListObjectsResult { contents, common_prefixes, is_truncated, next_continuation_token, prefix: prefix.to_string(), delimiter: delimiter.to_string(), max_keys, }) } // ============================ // Bucket operations // ============================ pub async fn list_buckets(&self) -> Result> { let mut buckets = Vec::new(); if !self.buckets_dir.is_dir() { return Ok(buckets); } let mut entries = fs::read_dir(&self.buckets_dir).await?; while let Some(entry) = entries.next_entry().await? { let meta = entry.metadata().await?; if meta.is_dir() { let name = entry.file_name().to_string_lossy().to_string(); if name.starts_with('.') { continue; } let creation_date: DateTime = meta .created() .unwrap_or(meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH)) .into(); buckets.push(BucketInfo { name, creation_date, }); } } buckets.sort_by(|a, b| a.name.cmp(&b.name)); Ok(buckets) } pub async fn bucket_exists(&self, bucket: &str) -> bool { self.buckets_dir.join(bucket).is_dir() } pub async fn create_bucket(&self, bucket: &str) -> Result<()> { let bucket_path = self.buckets_dir.join(bucket); fs::create_dir_all(&bucket_path).await?; // Also create manifest bucket dir let manifest_bucket = self.manifest_dir.join(bucket); fs::create_dir_all(&manifest_bucket).await?; Ok(()) } pub async fn delete_bucket(&self, bucket: &str) -> Result<()> { let bucket_path = self.buckets_dir.join(bucket); if !bucket_path.is_dir() { return Err(crate::error::StorageError::no_such_bucket().into()); } // Check if empty (check manifests) let manifest_bucket = self.manifest_dir.join(bucket); if manifest_bucket.is_dir() { let mut entries = fs::read_dir(&manifest_bucket).await?; if entries.next_entry().await?.is_some() { return Err(crate::error::StorageError::bucket_not_empty().into()); } } let _ = fs::remove_dir_all(&bucket_path).await; let _ = fs::remove_dir_all(&manifest_bucket).await; Ok(()) } // ============================ // Multipart uploads // ============================ pub async fn initiate_multipart( &self, bucket: &str, key: &str, metadata: HashMap, ) -> Result { if !self.bucket_exists(bucket).await { return Err(crate::error::StorageError::no_such_bucket().into()); } let upload_id = uuid::Uuid::new_v4().to_string().replace('-', ""); let upload_dir = self.multipart_dir().join(&upload_id); fs::create_dir_all(&upload_dir).await?; // Store multipart session metadata let session = MultipartSession { upload_id: upload_id.clone(), bucket: bucket.to_string(), key: key.to_string(), initiated: Utc::now().to_rfc3339(), metadata, parts: HashMap::new(), }; let json = serde_json::to_string_pretty(&session)?; fs::write(upload_dir.join("session.json"), json).await?; Ok(upload_id) } pub async fn upload_part( &self, upload_id: &str, part_number: u32, body: Incoming, ) -> Result<(String, u64)> { let upload_dir = self.multipart_dir().join(upload_id); if !upload_dir.is_dir() { return Err(crate::error::StorageError::no_such_upload().into()); } // Read session to get bucket/key let session = self.load_multipart_session(upload_id).await?; let erasure_set = self .state .get_erasure_set_for_object(&session.bucket, &session.key) .await .ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?; // Buffer and erasure-code the part data let chunk_size = self.erasure_config.chunk_size_bytes; let mut chunk_buffer = Vec::with_capacity(chunk_size); let mut chunk_index: u32 = 0; let mut chunks = Vec::new(); let mut total_size: u64 = 0; let mut hasher = Md5::new(); // Use upload_id + part_number as a unique key prefix for shard storage let part_key = format!("{}/_multipart/{}/part-{}", session.key, upload_id, part_number); let mut body = body; loop { match body.frame().await { Some(Ok(frame)) => { if let Ok(data) = frame.into_data() { hasher.update(&data); total_size += data.len() as u64; chunk_buffer.extend_from_slice(&data); while chunk_buffer.len() >= chunk_size { let chunk_data: Vec = chunk_buffer.drain(..chunk_size).collect(); let chunk_manifest = self .encode_and_distribute_chunk( &erasure_set, &session.bucket, &part_key, chunk_index, &chunk_data, ) .await?; chunks.push(chunk_manifest); chunk_index += 1; } } } Some(Err(e)) => return Err(anyhow::anyhow!("Body read error: {}", e)), None => break, } } // Process final partial chunk if !chunk_buffer.is_empty() { let chunk_manifest = self .encode_and_distribute_chunk( &erasure_set, &session.bucket, &part_key, chunk_index, &chunk_buffer, ) .await?; chunks.push(chunk_manifest); } let etag = format!("{:x}", hasher.finalize()); // Save per-part manifest let part_manifest = PartInfo { part_number, etag: etag.clone(), size: total_size, part_key: part_key.clone(), chunks, }; let part_json = serde_json::to_string_pretty(&part_manifest)?; fs::write( upload_dir.join(format!("part-{}.json", part_number)), part_json, ) .await?; Ok((etag, total_size)) } pub async fn complete_multipart( &self, upload_id: &str, parts: &[(u32, String)], ) -> Result { let session = self.load_multipart_session(upload_id).await?; let upload_dir = self.multipart_dir().join(upload_id); // Read per-part manifests and concatenate chunks sequentially let mut all_chunks = Vec::new(); let mut total_size: u64 = 0; let mut full_hasher = Md5::new(); let mut global_chunk_index: u32 = 0; for (part_number, _etag) in parts { let part_path = upload_dir.join(format!("part-{}.json", part_number)); if !part_path.exists() { return Err(anyhow::anyhow!("Part {} not found", part_number).into()); } let part_json = fs::read_to_string(&part_path).await?; let part_info: PartInfo = serde_json::from_str(&part_json)?; // Reconstruct part data to compute overall MD5 for chunk in &part_info.chunks { let reconstructed = self .fetch_and_reconstruct_chunk_for_object( chunk, &session.bucket, &part_info.part_key, ) .await?; full_hasher.update(&reconstructed); total_size += reconstructed.len() as u64; // Re-index chunks to be sequential in the final object let mut adjusted_chunk = chunk.clone(); adjusted_chunk.chunk_index = global_chunk_index; all_chunks.push(adjusted_chunk); global_chunk_index += 1; } } let etag = format!("{:x}", full_hasher.finalize()); // Build final object manifest let manifest = ObjectManifest { bucket: session.bucket.clone(), key: session.key.clone(), version_id: uuid::Uuid::new_v4().to_string(), size: total_size, content_md5: etag.clone(), content_type: session .metadata .get("content-type") .cloned() .unwrap_or_else(|| "binary/octet-stream".to_string()), metadata: session.metadata.clone(), created_at: Utc::now().to_rfc3339(), last_modified: Utc::now().to_rfc3339(), data_shards: self.erasure_config.data_shards, parity_shards: self.erasure_config.parity_shards, chunk_size: self.erasure_config.chunk_size_bytes, chunks: all_chunks, }; self.store_manifest(&manifest).await?; // Clean up multipart upload directory let _ = fs::remove_dir_all(&upload_dir).await; Ok(CompleteMultipartResult { etag }) } pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> { let upload_dir = self.multipart_dir().join(upload_id); if !upload_dir.is_dir() { return Err(crate::error::StorageError::no_such_upload().into()); } // Load session to get bucket/key for shard cleanup if let Ok(session) = self.load_multipart_session(upload_id).await { // Read part manifests and delete their shards let mut entries = fs::read_dir(&upload_dir).await?; while let Some(entry) = entries.next_entry().await? { let name = entry.file_name().to_string_lossy().to_string(); if name.starts_with("part-") && name.ends_with(".json") { if let Ok(content) = fs::read_to_string(entry.path()).await { if let Ok(part_info) = serde_json::from_str::(&content) { let local_id = self.state.local_node_id().to_string(); for chunk in &part_info.chunks { for placement in &chunk.shard_placements { if placement.node_id == local_id { let shard_id = ShardId { bucket: session.bucket.clone(), key: part_info.part_key.clone(), chunk_index: chunk.chunk_index, shard_index: placement.shard_index, }; if let Some(store) = self.local_shard_stores.get( placement.drive_id.parse::().unwrap_or(0), ) { let _ = store.delete_shard(&shard_id).await; } } else { let _ = self .delete_shard_remote( &placement.node_id, &session.bucket, &part_info.part_key, chunk.chunk_index, placement.shard_index, ) .await; } } } } } } } } let _ = fs::remove_dir_all(&upload_dir).await; Ok(()) } pub async fn list_multipart_uploads( &self, bucket: &str, ) -> Result> { let multipart_dir = self.multipart_dir(); if !multipart_dir.is_dir() { return Ok(Vec::new()); } let mut uploads = Vec::new(); let mut entries = fs::read_dir(&multipart_dir).await?; while let Some(entry) = entries.next_entry().await? { if !entry.metadata().await?.is_dir() { continue; } let session_path = entry.path().join("session.json"); if let Ok(content) = fs::read_to_string(&session_path).await { if let Ok(session) = serde_json::from_str::(&content) { if session.bucket == bucket { let initiated = DateTime::parse_from_rfc3339(&session.initiated) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()); uploads.push(MultipartUploadInfo { upload_id: session.upload_id, key: session.key, initiated, }); } } } } Ok(uploads) } fn multipart_dir(&self) -> PathBuf { self.manifest_dir.join(".multipart") } async fn load_multipart_session(&self, upload_id: &str) -> Result { let session_path = self.multipart_dir().join(upload_id).join("session.json"); let content = fs::read_to_string(&session_path).await?; Ok(serde_json::from_str(&content)?) } // ============================ // Internal: erasure encode + distribute // ============================ async fn encode_and_distribute_chunk( &self, erasure_set: &ErasureSet, bucket: &str, key: &str, chunk_index: u32, chunk_data: &[u8], ) -> Result { let shards = self.erasure_coder.encode_chunk(chunk_data)?; let quorum = self.erasure_config.write_quorum(); let total = shards.len(); let mut shard_placements = Vec::with_capacity(total); let mut successes = 0u32; let mut failures = 0u32; // Distribute shards to drives in the erasure set for (shard_idx, shard_data) in shards.iter().enumerate() { let drive = erasure_set .drives .get(shard_idx) .ok_or_else(|| anyhow::anyhow!("Not enough drives in erasure set"))?; let checksum = crc32c::crc32c(shard_data); let shard_id = ShardId { bucket: bucket.to_string(), key: key.to_string(), chunk_index, shard_index: shard_idx as u32, }; let result = if drive.node_id == self.state.local_node_id() { // Local write if let Some(store) = self.local_shard_stores.get(drive.drive_index as usize) { store.write_shard(&shard_id, shard_data, checksum).await } else { Err(anyhow::anyhow!("Local drive {} not found", drive.drive_index)) } } else { // Remote write via QUIC self.write_shard_remote( &drive.node_id, bucket, key, chunk_index, shard_idx as u32, shard_data, checksum, ) .await }; match result { Ok(()) => { successes += 1; shard_placements.push(ShardPlacement { shard_index: shard_idx as u32, node_id: drive.node_id.clone(), drive_id: drive.drive_index.to_string(), checksum, shard_size: shard_data.len(), }); } Err(e) => { failures += 1; tracing::warn!( shard_index = shard_idx, node = %drive.node_id, error = %e, "Shard write failed" ); if failures as usize > total - quorum { anyhow::bail!( "Write quorum not achievable: {}/{} failures", failures, total ); } } } } if (successes as usize) < quorum { anyhow::bail!( "Write quorum not met: only {}/{} succeeded (need {})", successes, total, quorum ); } Ok(ChunkManifest { chunk_index, data_size: chunk_data.len(), shard_placements, }) } async fn write_shard_remote( &self, node_id: &str, bucket: &str, key: &str, chunk_index: u32, shard_index: u32, data: &[u8], checksum: u32, ) -> Result<()> { let node_info = self .state .get_node(node_id) .await .ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?; let addr: SocketAddr = node_info.quic_addr.parse()?; let conn = self.transport.get_connection(node_id, addr).await?; let request = ShardWriteRequest { request_id: uuid::Uuid::new_v4().to_string(), bucket: bucket.to_string(), key: key.to_string(), chunk_index, shard_index, shard_data_length: data.len() as u64, checksum, object_metadata: HashMap::new(), }; let ack = self .transport .send_shard_write(&conn, request, data) .await?; if ack.success { Ok(()) } else { anyhow::bail!( "Remote shard write failed: {}", ack.error.unwrap_or_default() ) } } // ============================ // Internal: fetch + reconstruct // ============================ /// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups. async fn fetch_and_reconstruct_chunk_for_object( &self, chunk: &ChunkManifest, bucket: &str, key: &str, ) -> Result> { let k = self.erasure_config.read_quorum(); let total = self.erasure_config.total_shards(); let mut shards: Vec>> = vec![None; total]; let mut succeeded = 0usize; // Sort placements: local first for fast path let mut sorted_placements = chunk.shard_placements.clone(); let local_id = self.state.local_node_id().to_string(); sorted_placements.sort_by_key(|p| if p.node_id == local_id { 0 } else { 1 }); for placement in &sorted_placements { if succeeded >= k { break; // Have enough shards } let shard_id = ShardId { bucket: bucket.to_string(), key: key.to_string(), chunk_index: chunk.chunk_index, shard_index: placement.shard_index, }; let result = if placement.node_id == local_id { // Local read let store_idx = placement.drive_id.parse::().unwrap_or(0); if let Some(store) = self.local_shard_stores.get(store_idx) { store.read_shard(&shard_id).await.ok() } else { None } } else { // Remote read via QUIC self.read_shard_remote( &placement.node_id, bucket, key, chunk.chunk_index, placement.shard_index, ) .await .ok() }; if let Some((data, _checksum)) = result { shards[placement.shard_index as usize] = Some(data); succeeded += 1; } } if succeeded < k { anyhow::bail!( "Read quorum not met: only {}/{} shards available for chunk {}", succeeded, k, chunk.chunk_index ); } self.erasure_coder .decode_chunk(&mut shards, chunk.data_size) } async fn read_shard_remote( &self, node_id: &str, bucket: &str, key: &str, chunk_index: u32, shard_index: u32, ) -> Result<(Vec, u32)> { let node_info = self .state .get_node(node_id) .await .ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?; let addr: SocketAddr = node_info.quic_addr.parse()?; let conn = self.transport.get_connection(node_id, addr).await?; let request = ClusterRequest::ShardRead(ShardReadRequest { request_id: uuid::Uuid::new_v4().to_string(), bucket: bucket.to_string(), key: key.to_string(), chunk_index, shard_index, }); match self.transport.send_shard_read(&conn, &request).await? { Some((data, checksum)) => Ok((data, checksum)), None => anyhow::bail!("Shard not found on remote node"), } } async fn delete_shard_remote( &self, node_id: &str, bucket: &str, key: &str, chunk_index: u32, shard_index: u32, ) -> Result<()> { let node_info = self .state .get_node(node_id) .await .ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?; let addr: SocketAddr = node_info.quic_addr.parse()?; let conn = self.transport.get_connection(node_id, addr).await?; let request = ClusterRequest::ShardDelete(ShardDeleteRequest { request_id: uuid::Uuid::new_v4().to_string(), bucket: bucket.to_string(), key: key.to_string(), chunk_index, shard_index, }); let _response = self.transport.send_request(&conn, &request).await?; Ok(()) } // ============================ // Manifest storage (local filesystem) // ============================ async fn store_manifest(&self, manifest: &ObjectManifest) -> Result<()> { let path = self.manifest_path(&manifest.bucket, &manifest.key); if let Some(parent) = path.parent() { fs::create_dir_all(parent).await?; } let json = serde_json::to_string_pretty(manifest)?; // Atomic write via temp + rename let temp_path = path.with_extension("manifest.tmp"); fs::write(&temp_path, json).await?; fs::rename(&temp_path, &path).await?; Ok(()) } async fn load_manifest(&self, bucket: &str, key: &str) -> Result { let path = self.manifest_path(bucket, key); if !path.exists() { return Err(crate::error::StorageError::no_such_key().into()); } let json = fs::read_to_string(&path).await?; let manifest: ObjectManifest = serde_json::from_str(&json)?; Ok(manifest) } async fn delete_manifest(&self, bucket: &str, key: &str) -> Result<()> { let path = self.manifest_path(bucket, key); let _ = fs::remove_file(&path).await; // Clean up empty parent dirs if let Some(parent) = path.parent() { let _ = fs::remove_dir(parent).await; } Ok(()) } fn manifest_path(&self, bucket: &str, key: &str) -> PathBuf { self.manifest_dir .join(bucket) .join(format!("{}.manifest.json", key)) } async fn collect_manifest_keys( &self, base_dir: &std::path::Path, dir: &std::path::Path, keys: &mut Vec, ) -> Result<()> { let mut entries = match fs::read_dir(dir).await { Ok(e) => e, Err(_) => return Ok(()), }; while let Some(entry) = entries.next_entry().await? { let meta = entry.metadata().await?; let name = entry.file_name().to_string_lossy().to_string(); if meta.is_dir() { Box::pin(self.collect_manifest_keys(base_dir, &entry.path(), keys)).await?; } else if name.ends_with(".manifest.json") { let relative = entry .path() .strip_prefix(base_dir) .unwrap_or(std::path::Path::new("")) .to_string_lossy() .to_string(); let key = relative.trim_end_matches(".manifest.json").to_string(); keys.push(key); } } Ok(()) } }