Files
smartstorage/rust/src/cluster/drive_manager.rs
T

243 lines
6.9 KiB
Rust
Raw Normal View History

use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tokio::fs;
use super::config::DriveConfig;
// ============================
// Drive format (on-disk metadata)
// ============================
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DriveFormat {
pub cluster_id: String,
pub erasure_set_id: u32,
pub drive_index_in_set: u32,
pub format_version: u32,
}
// ============================
// Drive state tracking
// ============================
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DriveStatus {
Online,
Degraded,
Offline,
Healing,
}
#[derive(Debug, Clone)]
pub struct DriveStats {
pub total_bytes: u64,
pub used_bytes: u64,
pub avg_write_latency_us: u64,
pub avg_read_latency_us: u64,
pub error_count: u64,
pub last_error: Option<String>,
pub last_check: DateTime<Utc>,
}
impl Default for DriveStats {
fn default() -> Self {
Self {
total_bytes: 0,
used_bytes: 0,
avg_write_latency_us: 0,
avg_read_latency_us: 0,
error_count: 0,
last_error: None,
last_check: Utc::now(),
}
}
}
#[derive(Debug)]
pub struct DriveState {
pub path: PathBuf,
pub format: Option<DriveFormat>,
pub status: DriveStatus,
pub stats: DriveStats,
}
// ============================
// Drive manager
// ============================
pub struct DriveManager {
drives: Vec<DriveState>,
}
impl DriveManager {
/// Initialize drive manager with configured drive paths.
pub async fn new(config: &DriveConfig) -> Result<Self> {
let mut drives = Vec::with_capacity(config.paths.len());
for path_str in &config.paths {
let path = PathBuf::from(path_str);
let storage_dir = path.join(".smartstorage");
// Ensure the drive directory exists
fs::create_dir_all(&storage_dir).await?;
// Try to read existing format
let format = Self::read_format(&storage_dir).await;
let status = if path.exists() {
DriveStatus::Online
} else {
DriveStatus::Offline
};
drives.push(DriveState {
path,
format,
status,
stats: DriveStats::default(),
});
}
Ok(Self { drives })
}
/// Format drives for a new cluster. Stamps each drive with cluster and erasure set info.
pub async fn format_drives(
&mut self,
cluster_id: &str,
erasure_set_assignments: &[(u32, u32)], // (erasure_set_id, drive_index_in_set)
) -> Result<()> {
if erasure_set_assignments.len() != self.drives.len() {
anyhow::bail!(
"Erasure set assignments count ({}) doesn't match drive count ({})",
erasure_set_assignments.len(),
self.drives.len()
);
}
for (drive, (set_id, drive_idx)) in
self.drives.iter_mut().zip(erasure_set_assignments.iter())
{
let format = DriveFormat {
cluster_id: cluster_id.to_string(),
erasure_set_id: *set_id,
drive_index_in_set: *drive_idx,
format_version: 1,
};
let storage_dir = drive.path.join(".smartstorage");
fs::create_dir_all(&storage_dir).await?;
let format_path = storage_dir.join("format.json");
let json = serde_json::to_string_pretty(&format)?;
fs::write(&format_path, json).await?;
drive.format = Some(format);
}
Ok(())
}
/// Get the number of drives managed.
pub fn drive_count(&self) -> usize {
self.drives.len()
}
/// Get a drive's state by index.
pub fn drive(&self, index: usize) -> Option<&DriveState> {
self.drives.get(index)
}
/// Get all drives.
pub fn drives(&self) -> &[DriveState] {
&self.drives
}
/// Get drives that are online.
pub fn online_drives(&self) -> Vec<usize> {
self.drives
.iter()
.enumerate()
.filter(|(_, d)| d.status == DriveStatus::Online)
.map(|(i, _)| i)
.collect()
}
/// Check health of a specific drive by writing and reading a probe file.
pub async fn check_drive_health(&mut self, index: usize) -> Result<DriveStatus> {
let drive = self
.drives
.get_mut(index)
.ok_or_else(|| anyhow::anyhow!("Drive index {} out of range", index))?;
let probe_path = drive.path.join(".smartstorage").join(".health_probe");
let start = std::time::Instant::now();
// Write probe
match fs::write(&probe_path, b"health_check").await {
Ok(()) => {}
Err(e) => {
drive.stats.error_count += 1;
drive.stats.last_error = Some(e.to_string());
drive.status = DriveStatus::Offline;
drive.stats.last_check = Utc::now();
return Ok(DriveStatus::Offline);
}
}
// Read probe
match fs::read(&probe_path).await {
Ok(_) => {}
Err(e) => {
drive.stats.error_count += 1;
drive.stats.last_error = Some(e.to_string());
drive.status = DriveStatus::Offline;
drive.stats.last_check = Utc::now();
return Ok(DriveStatus::Offline);
}
}
// Clean up probe
let _ = fs::remove_file(&probe_path).await;
let latency = start.elapsed();
drive.stats.avg_write_latency_us = latency.as_micros() as u64;
drive.stats.last_check = Utc::now();
// Mark degraded if latency is too high (>5 seconds)
if latency.as_secs() > 5 {
drive.status = DriveStatus::Degraded;
} else {
drive.status = DriveStatus::Online;
}
Ok(drive.status.clone())
}
/// Run health checks on all drives.
pub async fn check_all_drives(&mut self) -> Vec<(usize, DriveStatus)> {
let mut results = Vec::new();
let count = self.drives.len();
for i in 0..count {
match self.check_drive_health(i).await {
Ok(status) => results.push((i, status)),
Err(e) => {
tracing::error!(drive = i, error = %e, "Drive health check failed");
results.push((i, DriveStatus::Offline));
}
}
}
results
}
// Internal helpers
async fn read_format(storage_dir: &Path) -> Option<DriveFormat> {
let format_path = storage_dir.join("format.json");
let content = fs::read_to_string(&format_path).await.ok()?;
serde_json::from_str(&content).ok()
}
}