From 3eb004567674c6fba2187e92222a1a2b9860949e Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 21 Mar 2026 22:00:41 +0000 Subject: [PATCH] feat(cluster): add shard healing, drive health heartbeats, and clustered policy directory support --- changelog.md | 8 + rust/src/cluster/coordinator.rs | 481 +++++++++++++++++++++++++++++--- rust/src/cluster/healing.rs | 284 +++++++++++++++++-- rust/src/cluster/membership.rs | 46 ++- rust/src/server.rs | 21 +- rust/src/storage.rs | 8 +- ts/00_commitinfo_data.ts | 2 +- 7 files changed, 780 insertions(+), 70 deletions(-) diff --git a/changelog.md b/changelog.md index 9916f3f..a4acdd0 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-03-21 - 6.2.0 - feat(cluster) +add shard healing, drive health heartbeats, and clustered policy directory support + +- implements manifest-based healing that scans affected shards on offline nodes, reconstructs data with erasure coding, and rewrites recovered shards to local storage +- includes drive status reporting in membership heartbeats by wiring DriveManager health checks into cluster heartbeat messages +- adds clustered policies directory initialization and exposes policy storage paths from the distributed coordinator +- extends distributed coordinator support for remote shard read and delete operations plus multipart upload session metadata + ## 2026-03-21 - 6.1.0 - feat(cluster) add clustered storage backend with QUIC transport, erasure coding, and shard management diff --git a/rust/src/cluster/coordinator.rs b/rust/src/cluster/coordinator.rs index c1d13f3..d5a0bc9 100644 --- a/rust/src/cluster/coordinator.rs +++ b/rust/src/cluster/coordinator.rs @@ -13,7 +13,7 @@ use super::config::ErasureConfig; use super::erasure::ErasureCoder; use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement}; use super::placement::ErasureSet; -use super::protocol::ShardWriteRequest; +use super::protocol::{ClusterRequest, ShardDeleteRequest, ShardReadRequest, ShardWriteRequest}; use super::quic_transport::QuicTransport; use super::shard_store::{ShardId, ShardStore}; use super::state::ClusterState; @@ -22,6 +22,29 @@ use crate::storage::{ ListObjectsResult, MultipartUploadInfo, PutResult, }; +use serde::{Deserialize, Serialize}; + +/// Multipart upload session metadata. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct MultipartSession { + upload_id: String, + bucket: String, + key: String, + initiated: String, + metadata: HashMap, + parts: HashMap, // part_number -> etag +} + +/// Per-part info stored during multipart upload. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PartInfo { + part_number: u32, + etag: String, + size: u64, + part_key: String, + chunks: Vec, +} + /// Distributed storage coordinator. /// /// Handles S3 operations by distributing erasure-coded shards across @@ -36,6 +59,8 @@ pub struct DistributedStore { manifest_dir: PathBuf, /// Root directory for buckets metadata buckets_dir: PathBuf, + /// Root directory for bucket policies + policies_dir: PathBuf, erasure_config: ErasureConfig, } @@ -55,6 +80,8 @@ impl DistributedStore { .map(|p| Arc::new(ShardStore::new(p.clone()))) .collect(); + let policies_dir = buckets_dir.join(".policies"); + Ok(Self { state, transport, @@ -62,10 +89,16 @@ impl DistributedStore { local_shard_stores, manifest_dir, buckets_dir, + policies_dir, erasure_config, }) } + /// Get the policies directory path. + pub fn policies_dir(&self) -> PathBuf { + self.policies_dir.clone() + } + // ============================ // Object operations // ============================ @@ -187,7 +220,9 @@ impl DistributedStore { let mut full_data = Vec::new(); for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) { let chunk = &manifest.chunks[chunk_idx]; - let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?; + let reconstructed = self + .fetch_and_reconstruct_chunk_for_object(chunk, bucket, key) + .await?; full_data.extend_from_slice(&reconstructed); } @@ -248,26 +283,45 @@ impl DistributedStore { pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> { // Load manifest to find all shards if let Ok(manifest) = self.load_manifest(bucket, key).await { - // Delete shards from all drives + let local_id = self.state.local_node_id().to_string(); + + // Delete shards from all drives (local and remote) for chunk in &manifest.chunks { for placement in &chunk.shard_placements { - let shard_id = ShardId { - bucket: bucket.to_string(), - key: key.to_string(), - chunk_index: chunk.chunk_index, - shard_index: placement.shard_index, - }; - - if placement.node_id == self.state.local_node_id() { + if placement.node_id == local_id { // Local delete + let shard_id = ShardId { + bucket: bucket.to_string(), + key: key.to_string(), + chunk_index: chunk.chunk_index, + shard_index: placement.shard_index, + }; if let Some(store) = self .local_shard_stores .get(placement.drive_id.parse::().unwrap_or(0)) { let _ = store.delete_shard(&shard_id).await; } + } else { + // Remote delete via QUIC (best-effort, don't fail the delete) + if let Err(e) = self + .delete_shard_remote( + &placement.node_id, + bucket, + key, + chunk.chunk_index, + placement.shard_index, + ) + .await + { + tracing::warn!( + node = %placement.node_id, + shard = placement.shard_index, + error = %e, + "Failed to delete remote shard (will be cleaned up by healing)" + ); + } } - // TODO: send delete to remote nodes via QUIC } } } @@ -300,7 +354,9 @@ impl DistributedStore { // Read source object fully, then reconstruct let mut full_data = Vec::new(); for chunk in &src_manifest.chunks { - let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?; + let reconstructed = self + .fetch_and_reconstruct_chunk_for_object(chunk, src_bucket, src_key) + .await?; full_data.extend_from_slice(&reconstructed); } @@ -526,45 +582,305 @@ impl DistributedStore { } // ============================ - // Multipart (delegated to local temp storage for now) + // Multipart uploads // ============================ pub async fn initiate_multipart( &self, - _bucket: &str, - _key: &str, - _metadata: HashMap, + bucket: &str, + key: &str, + metadata: HashMap, ) -> Result { - // TODO: Implement distributed multipart - anyhow::bail!("Multipart uploads not yet supported in cluster mode") + if !self.bucket_exists(bucket).await { + return Err(crate::error::StorageError::no_such_bucket().into()); + } + + let upload_id = uuid::Uuid::new_v4().to_string().replace('-', ""); + let upload_dir = self.multipart_dir().join(&upload_id); + fs::create_dir_all(&upload_dir).await?; + + // Store multipart session metadata + let session = MultipartSession { + upload_id: upload_id.clone(), + bucket: bucket.to_string(), + key: key.to_string(), + initiated: Utc::now().to_rfc3339(), + metadata, + parts: HashMap::new(), + }; + let json = serde_json::to_string_pretty(&session)?; + fs::write(upload_dir.join("session.json"), json).await?; + + Ok(upload_id) } pub async fn upload_part( &self, - _upload_id: &str, - _part_number: u32, - _body: Incoming, + upload_id: &str, + part_number: u32, + body: Incoming, ) -> Result<(String, u64)> { - anyhow::bail!("Multipart uploads not yet supported in cluster mode") + let upload_dir = self.multipart_dir().join(upload_id); + if !upload_dir.is_dir() { + return Err(crate::error::StorageError::no_such_upload().into()); + } + + // Read session to get bucket/key + let session = self.load_multipart_session(upload_id).await?; + + let erasure_set = self + .state + .get_erasure_set_for_object(&session.bucket, &session.key) + .await + .ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?; + + // Buffer and erasure-code the part data + let chunk_size = self.erasure_config.chunk_size_bytes; + let mut chunk_buffer = Vec::with_capacity(chunk_size); + let mut chunk_index: u32 = 0; + let mut chunks = Vec::new(); + let mut total_size: u64 = 0; + let mut hasher = Md5::new(); + + // Use upload_id + part_number as a unique key prefix for shard storage + let part_key = format!("{}/_multipart/{}/part-{}", session.key, upload_id, part_number); + + let mut body = body; + loop { + match body.frame().await { + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + hasher.update(&data); + total_size += data.len() as u64; + chunk_buffer.extend_from_slice(&data); + + while chunk_buffer.len() >= chunk_size { + let chunk_data: Vec = + chunk_buffer.drain(..chunk_size).collect(); + let chunk_manifest = self + .encode_and_distribute_chunk( + &erasure_set, + &session.bucket, + &part_key, + chunk_index, + &chunk_data, + ) + .await?; + chunks.push(chunk_manifest); + chunk_index += 1; + } + } + } + Some(Err(e)) => return Err(anyhow::anyhow!("Body read error: {}", e)), + None => break, + } + } + + // Process final partial chunk + if !chunk_buffer.is_empty() { + let chunk_manifest = self + .encode_and_distribute_chunk( + &erasure_set, + &session.bucket, + &part_key, + chunk_index, + &chunk_buffer, + ) + .await?; + chunks.push(chunk_manifest); + } + + let etag = format!("{:x}", hasher.finalize()); + + // Save per-part manifest + let part_manifest = PartInfo { + part_number, + etag: etag.clone(), + size: total_size, + part_key: part_key.clone(), + chunks, + }; + let part_json = serde_json::to_string_pretty(&part_manifest)?; + fs::write( + upload_dir.join(format!("part-{}.json", part_number)), + part_json, + ) + .await?; + + Ok((etag, total_size)) } pub async fn complete_multipart( &self, - _upload_id: &str, - _parts: &[(u32, String)], + upload_id: &str, + parts: &[(u32, String)], ) -> Result { - anyhow::bail!("Multipart uploads not yet supported in cluster mode") + let session = self.load_multipart_session(upload_id).await?; + let upload_dir = self.multipart_dir().join(upload_id); + + // Read per-part manifests and concatenate chunks sequentially + let mut all_chunks = Vec::new(); + let mut total_size: u64 = 0; + let mut full_hasher = Md5::new(); + let mut global_chunk_index: u32 = 0; + + for (part_number, _etag) in parts { + let part_path = upload_dir.join(format!("part-{}.json", part_number)); + if !part_path.exists() { + return Err(anyhow::anyhow!("Part {} not found", part_number).into()); + } + + let part_json = fs::read_to_string(&part_path).await?; + let part_info: PartInfo = serde_json::from_str(&part_json)?; + + // Reconstruct part data to compute overall MD5 + for chunk in &part_info.chunks { + let reconstructed = self + .fetch_and_reconstruct_chunk_for_object( + chunk, + &session.bucket, + &part_info.part_key, + ) + .await?; + full_hasher.update(&reconstructed); + total_size += reconstructed.len() as u64; + + // Re-index chunks to be sequential in the final object + let mut adjusted_chunk = chunk.clone(); + adjusted_chunk.chunk_index = global_chunk_index; + all_chunks.push(adjusted_chunk); + global_chunk_index += 1; + } + } + + let etag = format!("{:x}", full_hasher.finalize()); + + // Build final object manifest + let manifest = ObjectManifest { + bucket: session.bucket.clone(), + key: session.key.clone(), + version_id: uuid::Uuid::new_v4().to_string(), + size: total_size, + content_md5: etag.clone(), + content_type: session + .metadata + .get("content-type") + .cloned() + .unwrap_or_else(|| "binary/octet-stream".to_string()), + metadata: session.metadata.clone(), + created_at: Utc::now().to_rfc3339(), + last_modified: Utc::now().to_rfc3339(), + data_shards: self.erasure_config.data_shards, + parity_shards: self.erasure_config.parity_shards, + chunk_size: self.erasure_config.chunk_size_bytes, + chunks: all_chunks, + }; + + self.store_manifest(&manifest).await?; + + // Clean up multipart upload directory + let _ = fs::remove_dir_all(&upload_dir).await; + + Ok(CompleteMultipartResult { etag }) } - pub async fn abort_multipart(&self, _upload_id: &str) -> Result<()> { - anyhow::bail!("Multipart uploads not yet supported in cluster mode") + pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> { + let upload_dir = self.multipart_dir().join(upload_id); + if !upload_dir.is_dir() { + return Err(crate::error::StorageError::no_such_upload().into()); + } + + // Load session to get bucket/key for shard cleanup + if let Ok(session) = self.load_multipart_session(upload_id).await { + // Read part manifests and delete their shards + let mut entries = fs::read_dir(&upload_dir).await?; + while let Some(entry) = entries.next_entry().await? { + let name = entry.file_name().to_string_lossy().to_string(); + if name.starts_with("part-") && name.ends_with(".json") { + if let Ok(content) = fs::read_to_string(entry.path()).await { + if let Ok(part_info) = serde_json::from_str::(&content) { + let local_id = self.state.local_node_id().to_string(); + for chunk in &part_info.chunks { + for placement in &chunk.shard_placements { + if placement.node_id == local_id { + let shard_id = ShardId { + bucket: session.bucket.clone(), + key: part_info.part_key.clone(), + chunk_index: chunk.chunk_index, + shard_index: placement.shard_index, + }; + if let Some(store) = self.local_shard_stores.get( + placement.drive_id.parse::().unwrap_or(0), + ) { + let _ = store.delete_shard(&shard_id).await; + } + } else { + let _ = self + .delete_shard_remote( + &placement.node_id, + &session.bucket, + &part_info.part_key, + chunk.chunk_index, + placement.shard_index, + ) + .await; + } + } + } + } + } + } + } + } + + let _ = fs::remove_dir_all(&upload_dir).await; + Ok(()) } pub async fn list_multipart_uploads( &self, - _bucket: &str, + bucket: &str, ) -> Result> { - Ok(Vec::new()) + let multipart_dir = self.multipart_dir(); + if !multipart_dir.is_dir() { + return Ok(Vec::new()); + } + + let mut uploads = Vec::new(); + let mut entries = fs::read_dir(&multipart_dir).await?; + + while let Some(entry) = entries.next_entry().await? { + if !entry.metadata().await?.is_dir() { + continue; + } + let session_path = entry.path().join("session.json"); + if let Ok(content) = fs::read_to_string(&session_path).await { + if let Ok(session) = serde_json::from_str::(&content) { + if session.bucket == bucket { + let initiated = DateTime::parse_from_rfc3339(&session.initiated) + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or_else(|_| Utc::now()); + uploads.push(MultipartUploadInfo { + upload_id: session.upload_id, + key: session.key, + initiated, + }); + } + } + } + } + + Ok(uploads) + } + + fn multipart_dir(&self) -> PathBuf { + self.manifest_dir.join(".multipart") + } + + async fn load_multipart_session(&self, upload_id: &str) -> Result { + let session_path = self.multipart_dir().join(upload_id).join("session.json"); + let content = fs::read_to_string(&session_path).await?; + Ok(serde_json::from_str(&content)?) } // ============================ @@ -721,43 +1037,62 @@ impl DistributedStore { // ============================ async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result> { + self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await + } + + /// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups. + async fn fetch_and_reconstruct_chunk_for_object( + &self, + chunk: &ChunkManifest, + bucket: &str, + key: &str, + ) -> Result> { let k = self.erasure_config.data_shards; let total = self.erasure_config.total_shards(); let mut shards: Vec>> = vec![None; total]; let mut succeeded = 0usize; - // Try to fetch shards (local first, then remote) - for placement in &chunk.shard_placements { + // Sort placements: local first for fast path + let mut sorted_placements = chunk.shard_placements.clone(); + let local_id = self.state.local_node_id().to_string(); + sorted_placements.sort_by_key(|p| if p.node_id == local_id { 0 } else { 1 }); + + for placement in &sorted_placements { + if succeeded >= k { + break; // Have enough shards + } + let shard_id = ShardId { - bucket: String::new(), // Not needed for read - key: String::new(), + bucket: bucket.to_string(), + key: key.to_string(), chunk_index: chunk.chunk_index, shard_index: placement.shard_index, }; - let result = if placement.node_id == self.state.local_node_id() { + let result = if placement.node_id == local_id { // Local read let store_idx = placement.drive_id.parse::().unwrap_or(0); if let Some(store) = self.local_shard_stores.get(store_idx) { - // Need to set proper bucket/key on shard_id for local reads - // We get this from the chunk's context, but we don't have it here. - // This will be passed through the manifest's shard placements. store.read_shard(&shard_id).await.ok() } else { None } } else { // Remote read via QUIC - // TODO: implement remote shard read - None + self.read_shard_remote( + &placement.node_id, + bucket, + key, + chunk.chunk_index, + placement.shard_index, + ) + .await + .ok() }; if let Some((data, _checksum)) = result { shards[placement.shard_index as usize] = Some(data); succeeded += 1; - if succeeded >= k { - break; // Have enough shards - } } } @@ -774,6 +1109,66 @@ impl DistributedStore { .decode_chunk(&mut shards, chunk.data_size) } + async fn read_shard_remote( + &self, + node_id: &str, + bucket: &str, + key: &str, + chunk_index: u32, + shard_index: u32, + ) -> Result<(Vec, u32)> { + let node_info = self + .state + .get_node(node_id) + .await + .ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?; + + let addr: SocketAddr = node_info.quic_addr.parse()?; + let conn = self.transport.get_connection(node_id, addr).await?; + + let request = ClusterRequest::ShardRead(ShardReadRequest { + request_id: uuid::Uuid::new_v4().to_string(), + bucket: bucket.to_string(), + key: key.to_string(), + chunk_index, + shard_index, + }); + + match self.transport.send_shard_read(&conn, &request).await? { + Some((data, checksum)) => Ok((data, checksum)), + None => anyhow::bail!("Shard not found on remote node"), + } + } + + async fn delete_shard_remote( + &self, + node_id: &str, + bucket: &str, + key: &str, + chunk_index: u32, + shard_index: u32, + ) -> Result<()> { + let node_info = self + .state + .get_node(node_id) + .await + .ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?; + + let addr: SocketAddr = node_info.quic_addr.parse()?; + let conn = self.transport.get_connection(node_id, addr).await?; + + let request = ClusterRequest::ShardDelete(ShardDeleteRequest { + request_id: uuid::Uuid::new_v4().to_string(), + bucket: bucket.to_string(), + key: key.to_string(), + chunk_index, + shard_index, + }); + + let _response = self.transport.send_request(&conn, &request).await?; + Ok(()) + } + // ============================ // Manifest storage (local filesystem) // ============================ diff --git a/rust/src/cluster/healing.rs b/rust/src/cluster/healing.rs index 08742d8..11a833e 100644 --- a/rust/src/cluster/healing.rs +++ b/rust/src/cluster/healing.rs @@ -1,23 +1,40 @@ use anyhow::Result; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use tokio::fs; -use super::coordinator::DistributedStore; +use super::config::ErasureConfig; +use super::erasure::ErasureCoder; +use super::metadata::ObjectManifest; +use super::shard_store::{ShardId, ShardStore}; use super::state::ClusterState; /// Background healing service that scans for under-replicated shards /// and reconstructs them. pub struct HealingService { state: Arc, + erasure_coder: ErasureCoder, + local_shard_stores: Vec>, + manifest_dir: PathBuf, scan_interval: Duration, } impl HealingService { - pub fn new(state: Arc, scan_interval_hours: u64) -> Self { - Self { + pub fn new( + state: Arc, + erasure_config: &ErasureConfig, + local_shard_stores: Vec>, + manifest_dir: PathBuf, + scan_interval_hours: u64, + ) -> Result { + Ok(Self { state, + erasure_coder: ErasureCoder::new(erasure_config)?, + local_shard_stores, + manifest_dir, scan_interval: Duration::from_secs(scan_interval_hours * 3600), - } + }) } /// Run the healing loop as a background task. @@ -53,7 +70,7 @@ impl HealingService { } } - /// Scan for offline nodes and identify objects that need healing. + /// Scan all manifests for shards on offline nodes, reconstruct and re-place them. async fn heal_scan(&self) -> Result { let mut stats = HealStats::default(); @@ -63,25 +80,260 @@ impl HealingService { return Ok(stats); } - tracing::info!( - "Found {} offline nodes, scanning for affected shards", - offline_nodes.len() - ); - - // Check that we have majority before healing - // (prevents healing during split-brain) + // Check that we have majority before healing (split-brain prevention) if !self.state.has_majority().await { tracing::warn!("No majority quorum, skipping heal to prevent split-brain"); return Ok(stats); } - // TODO: Iterate all manifests, find shards on offline nodes, - // reconstruct from remaining shards and place on healthy nodes. - // This requires access to the DistributedStore and manifest listing - // which will be wired in when the full healing pipeline is implemented. + tracing::info!( + "Found {} offline nodes, scanning for affected shards", + offline_nodes.len() + ); + + // Iterate all bucket directories under manifest_dir + let mut bucket_entries = match fs::read_dir(&self.manifest_dir).await { + Ok(e) => e, + Err(_) => return Ok(stats), + }; + + while let Some(bucket_entry) = bucket_entries.next_entry().await? { + if !bucket_entry.metadata().await?.is_dir() { + continue; + } + let bucket_name = bucket_entry.file_name().to_string_lossy().to_string(); + if bucket_name.starts_with('.') { + continue; + } + + // Scan manifests in this bucket + self.heal_bucket(&bucket_name, &offline_nodes, &mut stats) + .await; + + // Yield to avoid starving foreground I/O + tokio::task::yield_now().await; + } Ok(stats) } + + async fn heal_bucket( + &self, + bucket: &str, + offline_nodes: &[String], + stats: &mut HealStats, + ) { + let bucket_dir = self.manifest_dir.join(bucket); + let manifests = match self.collect_manifests(&bucket_dir).await { + Ok(m) => m, + Err(e) => { + tracing::warn!(bucket = bucket, error = %e, "Failed to list manifests"); + stats.errors += 1; + return; + } + }; + + let local_id = self.state.local_node_id().to_string(); + + for manifest in &manifests { + for chunk in &manifest.chunks { + // Check if any shard in this chunk is on an offline node + let affected: Vec<_> = chunk + .shard_placements + .iter() + .filter(|p| offline_nodes.contains(&p.node_id)) + .collect(); + + if affected.is_empty() { + continue; + } + + stats.shards_checked += chunk.shard_placements.len() as u64; + + // Try to reconstruct missing shards from available ones + let k = manifest.data_shards; + let total = manifest.data_shards + manifest.parity_shards; + + // Count available shards (those NOT on offline nodes) + let available_count = chunk + .shard_placements + .iter() + .filter(|p| !offline_nodes.contains(&p.node_id)) + .count(); + + if available_count < k { + tracing::error!( + bucket = manifest.bucket, + key = manifest.key, + chunk = chunk.chunk_index, + available = available_count, + needed = k, + "Cannot heal chunk: not enough available shards" + ); + stats.errors += 1; + continue; + } + + // Fetch available shards (only local ones for now) + let mut shards: Vec>> = vec![None; total]; + let mut fetched = 0usize; + + for placement in &chunk.shard_placements { + if offline_nodes.contains(&placement.node_id) { + continue; // Skip offline nodes + } + if fetched >= k { + break; + } + + if placement.node_id == local_id { + let shard_id = ShardId { + bucket: manifest.bucket.clone(), + key: manifest.key.clone(), + chunk_index: chunk.chunk_index, + shard_index: placement.shard_index, + }; + let store_idx = placement.drive_id.parse::().unwrap_or(0); + if let Some(store) = self.local_shard_stores.get(store_idx) { + if let Ok((data, _)) = store.read_shard(&shard_id).await { + shards[placement.shard_index as usize] = Some(data); + fetched += 1; + } + } + } + // TODO: fetch from other online remote nodes + } + + if fetched < k { + tracing::warn!( + bucket = manifest.bucket, + key = manifest.key, + chunk = chunk.chunk_index, + "Not enough local shards to heal, skipping" + ); + continue; + } + + // Reconstruct all shards + let reconstructed = match self.erasure_coder.decode_chunk( + &mut shards, + chunk.data_size, + ) { + Ok(_) => true, + Err(e) => { + tracing::error!( + bucket = manifest.bucket, + key = manifest.key, + chunk = chunk.chunk_index, + error = %e, + "Reconstruction failed" + ); + stats.errors += 1; + false + } + }; + + if !reconstructed { + continue; + } + + // Re-encode to get all shards back (including the missing ones) + let full_data_size = chunk.data_size; + let mut data_buf = Vec::with_capacity(full_data_size); + for i in 0..k { + if let Some(ref shard) = shards[i] { + data_buf.extend_from_slice(shard); + } + } + data_buf.truncate(full_data_size); + + let all_shards = match self.erasure_coder.encode_chunk(&data_buf) { + Ok(s) => s, + Err(e) => { + tracing::error!(error = %e, "Re-encoding for heal failed"); + stats.errors += 1; + continue; + } + }; + + // Write the missing shards to the first available local drive + for affected_placement in &affected { + let shard_idx = affected_placement.shard_index as usize; + if shard_idx < all_shards.len() { + let shard_data = &all_shards[shard_idx]; + let checksum = crc32c::crc32c(shard_data); + + let shard_id = ShardId { + bucket: manifest.bucket.clone(), + key: manifest.key.clone(), + chunk_index: chunk.chunk_index, + shard_index: affected_placement.shard_index, + }; + + // Place on first available local drive + if let Some(store) = self.local_shard_stores.first() { + match store.write_shard(&shard_id, shard_data, checksum).await { + Ok(()) => { + stats.shards_healed += 1; + tracing::info!( + bucket = manifest.bucket, + key = manifest.key, + chunk = chunk.chunk_index, + shard = affected_placement.shard_index, + "Shard healed successfully" + ); + } + Err(e) => { + tracing::error!(error = %e, "Failed to write healed shard"); + stats.errors += 1; + } + } + } + } + } + + tokio::task::yield_now().await; + } + } + } + + /// Collect all manifests under a bucket directory. + async fn collect_manifests(&self, dir: &std::path::Path) -> Result> { + let mut manifests = Vec::new(); + self.collect_manifests_recursive(dir, &mut manifests).await?; + Ok(manifests) + } + + fn collect_manifests_recursive<'a>( + &'a self, + dir: &'a std::path::Path, + manifests: &'a mut Vec, + ) -> std::pin::Pin> + Send + 'a>> { + Box::pin(async move { + let mut entries = match fs::read_dir(dir).await { + Ok(e) => e, + Err(_) => return Ok(()), + }; + + while let Some(entry) = entries.next_entry().await? { + let meta = entry.metadata().await?; + let name = entry.file_name().to_string_lossy().to_string(); + + if meta.is_dir() { + self.collect_manifests_recursive(&entry.path(), manifests) + .await?; + } else if name.ends_with(".manifest.json") { + if let Ok(content) = fs::read_to_string(entry.path()).await { + if let Ok(manifest) = serde_json::from_str::(&content) { + manifests.push(manifest); + } + } + } + } + + Ok(()) + }) + } } #[derive(Debug, Default)] diff --git a/rust/src/cluster/membership.rs b/rust/src/cluster/membership.rs index 20392a1..42891a6 100644 --- a/rust/src/cluster/membership.rs +++ b/rust/src/cluster/membership.rs @@ -3,8 +3,12 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; + +use super::drive_manager::{DriveManager, DriveStatus}; use super::protocol::{ - ClusterRequest, ClusterResponse, HeartbeatMessage, JoinRequestMessage, NodeInfo, + ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage, + NodeInfo, }; use super::quic_transport::QuicTransport; use super::state::ClusterState; @@ -15,6 +19,7 @@ pub struct MembershipManager { transport: Arc, heartbeat_interval: Duration, local_node_info: NodeInfo, + drive_manager: Option>>, } impl MembershipManager { @@ -29,9 +34,16 @@ impl MembershipManager { transport, heartbeat_interval: Duration::from_millis(heartbeat_interval_ms), local_node_info, + drive_manager: None, } } + /// Set the drive manager for health reporting in heartbeats. + pub fn with_drive_manager(mut self, dm: Arc>) -> Self { + self.drive_manager = Some(dm); + self + } + /// Join the cluster by contacting seed nodes. /// Sends a JoinRequest to each seed node until one accepts. pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> { @@ -129,6 +141,9 @@ impl MembershipManager { let topology_version = self.state.version().await; let mut responded = Vec::new(); + // Collect drive health states + let drive_states = self.collect_drive_states().await; + for peer in &peers { let addr: SocketAddr = match peer.quic_addr.parse() { Ok(a) => a, @@ -138,7 +153,7 @@ impl MembershipManager { let heartbeat = ClusterRequest::Heartbeat(HeartbeatMessage { node_id: self.local_node_info.node_id.clone(), timestamp: chrono::Utc::now().to_rfc3339(), - drive_states: Vec::new(), // TODO: populate from DriveManager + drive_states: drive_states.clone(), topology_version, }); @@ -181,4 +196,31 @@ impl MembershipManager { let _response = self.transport.send_request(&conn, heartbeat).await?; Ok(()) } + + /// Collect drive health states from the DriveManager, if available. + async fn collect_drive_states(&self) -> Vec { + let dm = match &self.drive_manager { + Some(dm) => dm, + None => return Vec::new(), + }; + + let mut manager = dm.lock().await; + let results = manager.check_all_drives().await; + + results + .into_iter() + .map(|(idx, status)| { + let status_str = match status { + DriveStatus::Online => "online", + DriveStatus::Degraded => "degraded", + DriveStatus::Offline => "offline", + DriveStatus::Healing => "healing", + }; + DriveStateInfo { + drive_index: idx as u32, + status: status_str.to_string(), + } + }) + .collect() + } } diff --git a/rust/src/server.rs b/rust/src/server.rs index bcafaed..6ab4bcf 100644 --- a/rust/src/server.rs +++ b/rust/src/server.rs @@ -25,6 +25,7 @@ use crate::policy::{self, PolicyDecision, PolicyStore}; use crate::error::StorageError; use crate::cluster::coordinator::DistributedStore; use crate::cluster::config::ErasureConfig; +use crate::cluster::drive_manager::DriveManager; use crate::cluster::membership::MembershipManager; use crate::cluster::placement; use crate::cluster::protocol::NodeInfo; @@ -217,13 +218,21 @@ impl StorageServer { }; cluster_state.add_node(local_node_info.clone()).await; - // Join cluster if seed nodes are configured - let membership = Arc::new(MembershipManager::new( - cluster_state.clone(), - transport.clone(), - cluster_config.heartbeat_interval_ms, - local_node_info, + // Initialize drive manager for health monitoring + let drive_manager = Arc::new(tokio::sync::Mutex::new( + DriveManager::new(&cluster_config.drives).await?, )); + + // Join cluster if seed nodes are configured + let membership = Arc::new( + MembershipManager::new( + cluster_state.clone(), + transport.clone(), + cluster_config.heartbeat_interval_ms, + local_node_info, + ) + .with_drive_manager(drive_manager), + ); membership .join_cluster(&cluster_config.seed_nodes) .await?; diff --git a/rust/src/storage.rs b/rust/src/storage.rs index 55c379e..dd92320 100644 --- a/rust/src/storage.rs +++ b/rust/src/storage.rs @@ -811,14 +811,18 @@ impl StorageBackend { pub fn policies_dir(&self) -> std::path::PathBuf { match self { StorageBackend::Standalone(fs) => fs.policies_dir(), - StorageBackend::Clustered(_) => PathBuf::from(".policies"), // TODO: proper policies in cluster mode + StorageBackend::Clustered(ds) => ds.policies_dir(), } } pub async fn initialize(&self) -> Result<()> { match self { StorageBackend::Standalone(fs) => fs.initialize().await, - StorageBackend::Clustered(_) => Ok(()), // Cluster init happens separately + StorageBackend::Clustered(ds) => { + // Ensure policies directory exists + tokio::fs::create_dir_all(ds.policies_dir()).await?; + Ok(()) + } } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 0480ae7..e29fcd6 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstorage', - version: '6.1.0', + version: '6.2.0', description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.' }