use anyhow::Result; use chrono::{DateTime, Utc}; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use tokio::fs; use tokio::sync::RwLock; use super::config::ErasureConfig; use super::erasure::ErasureCoder; use super::metadata::ObjectManifest; use super::shard_store::{ShardId, ShardStore}; use super::state::ClusterState; /// Background healing service that scans for under-replicated shards /// and reconstructs them. pub struct HealingService { state: Arc, erasure_coder: ErasureCoder, local_shard_stores: Vec>, manifest_dir: PathBuf, scan_interval: Duration, runtime_state: Arc>, } impl HealingService { pub fn new( state: Arc, erasure_config: &ErasureConfig, local_shard_stores: Vec>, manifest_dir: PathBuf, scan_interval_hours: u64, runtime_state: Arc>, ) -> Result { let scan_interval = Duration::from_secs(scan_interval_hours * 3600); if let Ok(mut state_guard) = runtime_state.try_write() { state_guard.scan_interval_ms = scan_interval.as_millis() as u64; } Ok(Self { state, erasure_coder: ErasureCoder::new(erasure_config)?, local_shard_stores, manifest_dir, scan_interval, runtime_state, }) } pub fn runtime_state(&self) -> Arc> { self.runtime_state.clone() } /// Run the healing loop as a background task. pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver) { let mut interval = tokio::time::interval(self.scan_interval); // Skip the first immediate tick interval.tick().await; loop { tokio::select! { _ = interval.tick() => { let started_at = Utc::now(); self.mark_healing_started(started_at).await; tracing::info!("Starting healing scan"); match self.heal_scan().await { Ok(stats) => { self.mark_healing_finished(started_at, Some(stats.clone()), None).await; tracing::info!( checked = stats.shards_checked, healed = stats.shards_healed, errors = stats.errors, "Healing scan completed" ); } Err(e) => { self.mark_healing_finished(started_at, None, Some(e.to_string())).await; tracing::error!("Healing scan failed: {}", e); } } } _ = shutdown.changed() => { tracing::info!("Healing service shutting down"); break; } } } } async fn mark_healing_started(&self, started_at: DateTime) { let mut runtime_state = self.runtime_state.write().await; runtime_state.active = true; runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64; runtime_state.last_run_started_at = Some(started_at); runtime_state.last_error = None; } async fn mark_healing_finished( &self, started_at: DateTime, stats: Option, last_error: Option, ) { let finished_at = Utc::now(); let mut runtime_state = self.runtime_state.write().await; runtime_state.active = false; runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64; runtime_state.last_run_completed_at = Some(finished_at); runtime_state.last_duration_ms = Some( finished_at .signed_duration_since(started_at) .num_milliseconds() .max(0) as u64, ); if let Some(stats) = stats { runtime_state.last_stats = Some(stats); } runtime_state.last_error = last_error; } /// Scan all manifests for shards on offline nodes, reconstruct and re-place them. async fn heal_scan(&self) -> Result { let mut stats = HealStats::default(); let offline_nodes = self.state.offline_nodes().await; if offline_nodes.is_empty() { tracing::debug!("No offline nodes, skipping heal scan"); return Ok(stats); } // Check that we have majority before healing (split-brain prevention) if !self.state.has_majority().await { tracing::warn!("No majority quorum, skipping heal to prevent split-brain"); return Ok(stats); } tracing::info!( "Found {} offline nodes, scanning for affected shards", offline_nodes.len() ); // Iterate all bucket directories under manifest_dir let mut bucket_entries = match fs::read_dir(&self.manifest_dir).await { Ok(e) => e, Err(_) => return Ok(stats), }; while let Some(bucket_entry) = bucket_entries.next_entry().await? { if !bucket_entry.metadata().await?.is_dir() { continue; } let bucket_name = bucket_entry.file_name().to_string_lossy().to_string(); if bucket_name.starts_with('.') { continue; } // Scan manifests in this bucket self.heal_bucket(&bucket_name, &offline_nodes, &mut stats) .await; // Yield to avoid starving foreground I/O tokio::task::yield_now().await; } Ok(stats) } async fn heal_bucket(&self, bucket: &str, offline_nodes: &[String], stats: &mut HealStats) { let bucket_dir = self.manifest_dir.join(bucket); let manifests = match self.collect_manifests(&bucket_dir).await { Ok(m) => m, Err(e) => { tracing::warn!(bucket = bucket, error = %e, "Failed to list manifests"); stats.errors += 1; return; } }; let local_id = self.state.local_node_id().to_string(); for manifest in &manifests { for chunk in &manifest.chunks { // Check if any shard in this chunk is on an offline node let affected: Vec<_> = chunk .shard_placements .iter() .filter(|p| offline_nodes.contains(&p.node_id)) .collect(); if affected.is_empty() { continue; } stats.shards_checked += chunk.shard_placements.len() as u64; // Try to reconstruct missing shards from available ones let k = manifest.data_shards; let total = manifest.data_shards + manifest.parity_shards; // Count available shards (those NOT on offline nodes) let available_count = chunk .shard_placements .iter() .filter(|p| !offline_nodes.contains(&p.node_id)) .count(); if available_count < k { tracing::error!( bucket = manifest.bucket, key = manifest.key, chunk = chunk.chunk_index, available = available_count, needed = k, "Cannot heal chunk: not enough available shards" ); stats.errors += 1; continue; } // Fetch available shards (only local ones for now) let mut shards: Vec>> = vec![None; total]; let mut fetched = 0usize; for placement in &chunk.shard_placements { if offline_nodes.contains(&placement.node_id) { continue; // Skip offline nodes } if fetched >= k { break; } if placement.node_id == local_id { let shard_id = ShardId { bucket: manifest.bucket.clone(), key: manifest.key.clone(), chunk_index: chunk.chunk_index, shard_index: placement.shard_index, }; let store_idx = placement.drive_id.parse::().unwrap_or(0); if let Some(store) = self.local_shard_stores.get(store_idx) { if let Ok((data, _)) = store.read_shard(&shard_id).await { shards[placement.shard_index as usize] = Some(data); fetched += 1; } } } // TODO: fetch from other online remote nodes } if fetched < k { tracing::warn!( bucket = manifest.bucket, key = manifest.key, chunk = chunk.chunk_index, "Not enough local shards to heal, skipping" ); continue; } // Reconstruct all shards let reconstructed = match self .erasure_coder .decode_chunk(&mut shards, chunk.data_size) { Ok(_) => true, Err(e) => { tracing::error!( bucket = manifest.bucket, key = manifest.key, chunk = chunk.chunk_index, error = %e, "Reconstruction failed" ); stats.errors += 1; false } }; if !reconstructed { continue; } // Re-encode to get all shards back (including the missing ones) let full_data_size = chunk.data_size; let mut data_buf = Vec::with_capacity(full_data_size); for i in 0..k { if let Some(ref shard) = shards[i] { data_buf.extend_from_slice(shard); } } data_buf.truncate(full_data_size); let all_shards = match self.erasure_coder.encode_chunk(&data_buf) { Ok(s) => s, Err(e) => { tracing::error!(error = %e, "Re-encoding for heal failed"); stats.errors += 1; continue; } }; // Verify reconstructed shards are consistent if !self.erasure_coder.verify(&all_shards).unwrap_or(false) { tracing::error!( bucket = manifest.bucket, key = manifest.key, chunk = chunk.chunk_index, "Shard verification failed after reconstruction" ); stats.errors += 1; continue; } // Write the missing shards to the first available local drive for affected_placement in &affected { let shard_idx = affected_placement.shard_index as usize; if shard_idx < all_shards.len() { let shard_data = &all_shards[shard_idx]; let checksum = crc32c::crc32c(shard_data); let shard_id = ShardId { bucket: manifest.bucket.clone(), key: manifest.key.clone(), chunk_index: chunk.chunk_index, shard_index: affected_placement.shard_index, }; // Place on first available local drive if let Some(store) = self.local_shard_stores.first() { match store.write_shard(&shard_id, shard_data, checksum).await { Ok(()) => { stats.shards_healed += 1; tracing::info!( bucket = manifest.bucket, key = manifest.key, chunk = chunk.chunk_index, shard = affected_placement.shard_index, "Shard healed successfully" ); } Err(e) => { tracing::error!(error = %e, "Failed to write healed shard"); stats.errors += 1; } } } } } tokio::task::yield_now().await; } } } /// Collect all manifests under a bucket directory. async fn collect_manifests(&self, dir: &std::path::Path) -> Result> { let mut manifests = Vec::new(); self.collect_manifests_recursive(dir, &mut manifests) .await?; Ok(manifests) } fn collect_manifests_recursive<'a>( &'a self, dir: &'a std::path::Path, manifests: &'a mut Vec, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { let mut entries = match fs::read_dir(dir).await { Ok(e) => e, Err(_) => return Ok(()), }; while let Some(entry) = entries.next_entry().await? { let meta = entry.metadata().await?; let name = entry.file_name().to_string_lossy().to_string(); if meta.is_dir() { self.collect_manifests_recursive(&entry.path(), manifests) .await?; } else if name.ends_with(".manifest.json") { if let Ok(content) = fs::read_to_string(entry.path()).await { if let Ok(manifest) = serde_json::from_str::(&content) { manifests.push(manifest); } } } } Ok(()) }) } } #[derive(Debug, Clone, Default)] pub struct HealStats { pub shards_checked: u64, pub shards_healed: u64, pub errors: u64, } #[derive(Debug, Clone, Default)] pub struct HealingRuntimeState { pub active: bool, pub scan_interval_ms: u64, pub last_run_started_at: Option>, pub last_run_completed_at: Option>, pub last_duration_ms: Option, pub last_stats: Option, pub last_error: Option, }