feat(cluster): add shard healing, drive health heartbeats, and clustered policy directory support

This commit is contained in:
2026-03-21 22:00:41 +00:00
parent 639eb5d36c
commit 3eb0045676
7 changed files with 780 additions and 70 deletions

View File

@@ -1,5 +1,13 @@
# Changelog
## 2026-03-21 - 6.2.0 - feat(cluster)
add shard healing, drive health heartbeats, and clustered policy directory support
- implements manifest-based healing that scans affected shards on offline nodes, reconstructs data with erasure coding, and rewrites recovered shards to local storage
- includes drive status reporting in membership heartbeats by wiring DriveManager health checks into cluster heartbeat messages
- adds clustered policies directory initialization and exposes policy storage paths from the distributed coordinator
- extends distributed coordinator support for remote shard read and delete operations plus multipart upload session metadata
## 2026-03-21 - 6.1.0 - feat(cluster)
add clustered storage backend with QUIC transport, erasure coding, and shard management

View File

@@ -13,7 +13,7 @@ use super::config::ErasureConfig;
use super::erasure::ErasureCoder;
use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement};
use super::placement::ErasureSet;
use super::protocol::ShardWriteRequest;
use super::protocol::{ClusterRequest, ShardDeleteRequest, ShardReadRequest, ShardWriteRequest};
use super::quic_transport::QuicTransport;
use super::shard_store::{ShardId, ShardStore};
use super::state::ClusterState;
@@ -22,6 +22,29 @@ use crate::storage::{
ListObjectsResult, MultipartUploadInfo, PutResult,
};
use serde::{Deserialize, Serialize};
/// Multipart upload session metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MultipartSession {
upload_id: String,
bucket: String,
key: String,
initiated: String,
metadata: HashMap<String, String>,
parts: HashMap<u32, String>, // part_number -> etag
}
/// Per-part info stored during multipart upload.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PartInfo {
part_number: u32,
etag: String,
size: u64,
part_key: String,
chunks: Vec<ChunkManifest>,
}
/// Distributed storage coordinator.
///
/// Handles S3 operations by distributing erasure-coded shards across
@@ -36,6 +59,8 @@ pub struct DistributedStore {
manifest_dir: PathBuf,
/// Root directory for buckets metadata
buckets_dir: PathBuf,
/// Root directory for bucket policies
policies_dir: PathBuf,
erasure_config: ErasureConfig,
}
@@ -55,6 +80,8 @@ impl DistributedStore {
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
let policies_dir = buckets_dir.join(".policies");
Ok(Self {
state,
transport,
@@ -62,10 +89,16 @@ impl DistributedStore {
local_shard_stores,
manifest_dir,
buckets_dir,
policies_dir,
erasure_config,
})
}
/// Get the policies directory path.
pub fn policies_dir(&self) -> PathBuf {
self.policies_dir.clone()
}
// ============================
// Object operations
// ============================
@@ -187,7 +220,9 @@ impl DistributedStore {
let mut full_data = Vec::new();
for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) {
let chunk = &manifest.chunks[chunk_idx];
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?;
let reconstructed = self
.fetch_and_reconstruct_chunk_for_object(chunk, bucket, key)
.await?;
full_data.extend_from_slice(&reconstructed);
}
@@ -248,26 +283,45 @@ impl DistributedStore {
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
// Load manifest to find all shards
if let Ok(manifest) = self.load_manifest(bucket, key).await {
// Delete shards from all drives
let local_id = self.state.local_node_id().to_string();
// Delete shards from all drives (local and remote)
for chunk in &manifest.chunks {
for placement in &chunk.shard_placements {
let shard_id = ShardId {
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if placement.node_id == self.state.local_node_id() {
if placement.node_id == local_id {
// Local delete
let shard_id = ShardId {
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if let Some(store) = self
.local_shard_stores
.get(placement.drive_id.parse::<usize>().unwrap_or(0))
{
let _ = store.delete_shard(&shard_id).await;
}
} else {
// Remote delete via QUIC (best-effort, don't fail the delete)
if let Err(e) = self
.delete_shard_remote(
&placement.node_id,
bucket,
key,
chunk.chunk_index,
placement.shard_index,
)
.await
{
tracing::warn!(
node = %placement.node_id,
shard = placement.shard_index,
error = %e,
"Failed to delete remote shard (will be cleaned up by healing)"
);
}
}
// TODO: send delete to remote nodes via QUIC
}
}
}
@@ -300,7 +354,9 @@ impl DistributedStore {
// Read source object fully, then reconstruct
let mut full_data = Vec::new();
for chunk in &src_manifest.chunks {
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?;
let reconstructed = self
.fetch_and_reconstruct_chunk_for_object(chunk, src_bucket, src_key)
.await?;
full_data.extend_from_slice(&reconstructed);
}
@@ -526,45 +582,305 @@ impl DistributedStore {
}
// ============================
// Multipart (delegated to local temp storage for now)
// Multipart uploads
// ============================
pub async fn initiate_multipart(
&self,
_bucket: &str,
_key: &str,
_metadata: HashMap<String, String>,
bucket: &str,
key: &str,
metadata: HashMap<String, String>,
) -> Result<String> {
// TODO: Implement distributed multipart
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
if !self.bucket_exists(bucket).await {
return Err(crate::error::StorageError::no_such_bucket().into());
}
let upload_id = uuid::Uuid::new_v4().to_string().replace('-', "");
let upload_dir = self.multipart_dir().join(&upload_id);
fs::create_dir_all(&upload_dir).await?;
// Store multipart session metadata
let session = MultipartSession {
upload_id: upload_id.clone(),
bucket: bucket.to_string(),
key: key.to_string(),
initiated: Utc::now().to_rfc3339(),
metadata,
parts: HashMap::new(),
};
let json = serde_json::to_string_pretty(&session)?;
fs::write(upload_dir.join("session.json"), json).await?;
Ok(upload_id)
}
pub async fn upload_part(
&self,
_upload_id: &str,
_part_number: u32,
_body: Incoming,
upload_id: &str,
part_number: u32,
body: Incoming,
) -> Result<(String, u64)> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
let upload_dir = self.multipart_dir().join(upload_id);
if !upload_dir.is_dir() {
return Err(crate::error::StorageError::no_such_upload().into());
}
// Read session to get bucket/key
let session = self.load_multipart_session(upload_id).await?;
let erasure_set = self
.state
.get_erasure_set_for_object(&session.bucket, &session.key)
.await
.ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?;
// Buffer and erasure-code the part data
let chunk_size = self.erasure_config.chunk_size_bytes;
let mut chunk_buffer = Vec::with_capacity(chunk_size);
let mut chunk_index: u32 = 0;
let mut chunks = Vec::new();
let mut total_size: u64 = 0;
let mut hasher = Md5::new();
// Use upload_id + part_number as a unique key prefix for shard storage
let part_key = format!("{}/_multipart/{}/part-{}", session.key, upload_id, part_number);
let mut body = body;
loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
hasher.update(&data);
total_size += data.len() as u64;
chunk_buffer.extend_from_slice(&data);
while chunk_buffer.len() >= chunk_size {
let chunk_data: Vec<u8> =
chunk_buffer.drain(..chunk_size).collect();
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
&session.bucket,
&part_key,
chunk_index,
&chunk_data,
)
.await?;
chunks.push(chunk_manifest);
chunk_index += 1;
}
}
}
Some(Err(e)) => return Err(anyhow::anyhow!("Body read error: {}", e)),
None => break,
}
}
// Process final partial chunk
if !chunk_buffer.is_empty() {
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
&session.bucket,
&part_key,
chunk_index,
&chunk_buffer,
)
.await?;
chunks.push(chunk_manifest);
}
let etag = format!("{:x}", hasher.finalize());
// Save per-part manifest
let part_manifest = PartInfo {
part_number,
etag: etag.clone(),
size: total_size,
part_key: part_key.clone(),
chunks,
};
let part_json = serde_json::to_string_pretty(&part_manifest)?;
fs::write(
upload_dir.join(format!("part-{}.json", part_number)),
part_json,
)
.await?;
Ok((etag, total_size))
}
pub async fn complete_multipart(
&self,
_upload_id: &str,
_parts: &[(u32, String)],
upload_id: &str,
parts: &[(u32, String)],
) -> Result<CompleteMultipartResult> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
let session = self.load_multipart_session(upload_id).await?;
let upload_dir = self.multipart_dir().join(upload_id);
// Read per-part manifests and concatenate chunks sequentially
let mut all_chunks = Vec::new();
let mut total_size: u64 = 0;
let mut full_hasher = Md5::new();
let mut global_chunk_index: u32 = 0;
for (part_number, _etag) in parts {
let part_path = upload_dir.join(format!("part-{}.json", part_number));
if !part_path.exists() {
return Err(anyhow::anyhow!("Part {} not found", part_number).into());
}
let part_json = fs::read_to_string(&part_path).await?;
let part_info: PartInfo = serde_json::from_str(&part_json)?;
// Reconstruct part data to compute overall MD5
for chunk in &part_info.chunks {
let reconstructed = self
.fetch_and_reconstruct_chunk_for_object(
chunk,
&session.bucket,
&part_info.part_key,
)
.await?;
full_hasher.update(&reconstructed);
total_size += reconstructed.len() as u64;
// Re-index chunks to be sequential in the final object
let mut adjusted_chunk = chunk.clone();
adjusted_chunk.chunk_index = global_chunk_index;
all_chunks.push(adjusted_chunk);
global_chunk_index += 1;
}
}
let etag = format!("{:x}", full_hasher.finalize());
// Build final object manifest
let manifest = ObjectManifest {
bucket: session.bucket.clone(),
key: session.key.clone(),
version_id: uuid::Uuid::new_v4().to_string(),
size: total_size,
content_md5: etag.clone(),
content_type: session
.metadata
.get("content-type")
.cloned()
.unwrap_or_else(|| "binary/octet-stream".to_string()),
metadata: session.metadata.clone(),
created_at: Utc::now().to_rfc3339(),
last_modified: Utc::now().to_rfc3339(),
data_shards: self.erasure_config.data_shards,
parity_shards: self.erasure_config.parity_shards,
chunk_size: self.erasure_config.chunk_size_bytes,
chunks: all_chunks,
};
self.store_manifest(&manifest).await?;
// Clean up multipart upload directory
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(CompleteMultipartResult { etag })
}
pub async fn abort_multipart(&self, _upload_id: &str) -> Result<()> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
let upload_dir = self.multipart_dir().join(upload_id);
if !upload_dir.is_dir() {
return Err(crate::error::StorageError::no_such_upload().into());
}
// Load session to get bucket/key for shard cleanup
if let Ok(session) = self.load_multipart_session(upload_id).await {
// Read part manifests and delete their shards
let mut entries = fs::read_dir(&upload_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with("part-") && name.ends_with(".json") {
if let Ok(content) = fs::read_to_string(entry.path()).await {
if let Ok(part_info) = serde_json::from_str::<PartInfo>(&content) {
let local_id = self.state.local_node_id().to_string();
for chunk in &part_info.chunks {
for placement in &chunk.shard_placements {
if placement.node_id == local_id {
let shard_id = ShardId {
bucket: session.bucket.clone(),
key: part_info.part_key.clone(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if let Some(store) = self.local_shard_stores.get(
placement.drive_id.parse::<usize>().unwrap_or(0),
) {
let _ = store.delete_shard(&shard_id).await;
}
} else {
let _ = self
.delete_shard_remote(
&placement.node_id,
&session.bucket,
&part_info.part_key,
chunk.chunk_index,
placement.shard_index,
)
.await;
}
}
}
}
}
}
}
}
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(())
}
pub async fn list_multipart_uploads(
&self,
_bucket: &str,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
Ok(Vec::new())
let multipart_dir = self.multipart_dir();
if !multipart_dir.is_dir() {
return Ok(Vec::new());
}
let mut uploads = Vec::new();
let mut entries = fs::read_dir(&multipart_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if !entry.metadata().await?.is_dir() {
continue;
}
let session_path = entry.path().join("session.json");
if let Ok(content) = fs::read_to_string(&session_path).await {
if let Ok(session) = serde_json::from_str::<MultipartSession>(&content) {
if session.bucket == bucket {
let initiated = DateTime::parse_from_rfc3339(&session.initiated)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
uploads.push(MultipartUploadInfo {
upload_id: session.upload_id,
key: session.key,
initiated,
});
}
}
}
}
Ok(uploads)
}
fn multipart_dir(&self) -> PathBuf {
self.manifest_dir.join(".multipart")
}
async fn load_multipart_session(&self, upload_id: &str) -> Result<MultipartSession> {
let session_path = self.multipart_dir().join(upload_id).join("session.json");
let content = fs::read_to_string(&session_path).await?;
Ok(serde_json::from_str(&content)?)
}
// ============================
@@ -721,43 +1037,62 @@ impl DistributedStore {
// ============================
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
}
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
async fn fetch_and_reconstruct_chunk_for_object(
&self,
chunk: &ChunkManifest,
bucket: &str,
key: &str,
) -> Result<Vec<u8>> {
let k = self.erasure_config.data_shards;
let total = self.erasure_config.total_shards();
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut succeeded = 0usize;
// Try to fetch shards (local first, then remote)
for placement in &chunk.shard_placements {
// Sort placements: local first for fast path
let mut sorted_placements = chunk.shard_placements.clone();
let local_id = self.state.local_node_id().to_string();
sorted_placements.sort_by_key(|p| if p.node_id == local_id { 0 } else { 1 });
for placement in &sorted_placements {
if succeeded >= k {
break; // Have enough shards
}
let shard_id = ShardId {
bucket: String::new(), // Not needed for read
key: String::new(),
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
let result = if placement.node_id == self.state.local_node_id() {
let result = if placement.node_id == local_id {
// Local read
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
if let Some(store) = self.local_shard_stores.get(store_idx) {
// Need to set proper bucket/key on shard_id for local reads
// We get this from the chunk's context, but we don't have it here.
// This will be passed through the manifest's shard placements.
store.read_shard(&shard_id).await.ok()
} else {
None
}
} else {
// Remote read via QUIC
// TODO: implement remote shard read
None
self.read_shard_remote(
&placement.node_id,
bucket,
key,
chunk.chunk_index,
placement.shard_index,
)
.await
.ok()
};
if let Some((data, _checksum)) = result {
shards[placement.shard_index as usize] = Some(data);
succeeded += 1;
if succeeded >= k {
break; // Have enough shards
}
}
}
@@ -774,6 +1109,66 @@ impl DistributedStore {
.decode_chunk(&mut shards, chunk.data_size)
}
async fn read_shard_remote(
&self,
node_id: &str,
bucket: &str,
key: &str,
chunk_index: u32,
shard_index: u32,
) -> Result<(Vec<u8>, u32)> {
let node_info = self
.state
.get_node(node_id)
.await
.ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?;
let addr: SocketAddr = node_info.quic_addr.parse()?;
let conn = self.transport.get_connection(node_id, addr).await?;
let request = ClusterRequest::ShardRead(ShardReadRequest {
request_id: uuid::Uuid::new_v4().to_string(),
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index,
shard_index,
});
match self.transport.send_shard_read(&conn, &request).await? {
Some((data, checksum)) => Ok((data, checksum)),
None => anyhow::bail!("Shard not found on remote node"),
}
}
async fn delete_shard_remote(
&self,
node_id: &str,
bucket: &str,
key: &str,
chunk_index: u32,
shard_index: u32,
) -> Result<()> {
let node_info = self
.state
.get_node(node_id)
.await
.ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?;
let addr: SocketAddr = node_info.quic_addr.parse()?;
let conn = self.transport.get_connection(node_id, addr).await?;
let request = ClusterRequest::ShardDelete(ShardDeleteRequest {
request_id: uuid::Uuid::new_v4().to_string(),
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index,
shard_index,
});
let _response = self.transport.send_request(&conn, &request).await?;
Ok(())
}
// ============================
// Manifest storage (local filesystem)
// ============================

View File

@@ -1,23 +1,40 @@
use anyhow::Result;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use super::coordinator::DistributedStore;
use super::config::ErasureConfig;
use super::erasure::ErasureCoder;
use super::metadata::ObjectManifest;
use super::shard_store::{ShardId, ShardStore};
use super::state::ClusterState;
/// Background healing service that scans for under-replicated shards
/// and reconstructs them.
pub struct HealingService {
state: Arc<ClusterState>,
erasure_coder: ErasureCoder,
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval: Duration,
}
impl HealingService {
pub fn new(state: Arc<ClusterState>, scan_interval_hours: u64) -> Self {
Self {
pub fn new(
state: Arc<ClusterState>,
erasure_config: &ErasureConfig,
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval_hours: u64,
) -> Result<Self> {
Ok(Self {
state,
erasure_coder: ErasureCoder::new(erasure_config)?,
local_shard_stores,
manifest_dir,
scan_interval: Duration::from_secs(scan_interval_hours * 3600),
}
})
}
/// Run the healing loop as a background task.
@@ -53,7 +70,7 @@ impl HealingService {
}
}
/// Scan for offline nodes and identify objects that need healing.
/// Scan all manifests for shards on offline nodes, reconstruct and re-place them.
async fn heal_scan(&self) -> Result<HealStats> {
let mut stats = HealStats::default();
@@ -63,25 +80,260 @@ impl HealingService {
return Ok(stats);
}
tracing::info!(
"Found {} offline nodes, scanning for affected shards",
offline_nodes.len()
);
// Check that we have majority before healing
// (prevents healing during split-brain)
// Check that we have majority before healing (split-brain prevention)
if !self.state.has_majority().await {
tracing::warn!("No majority quorum, skipping heal to prevent split-brain");
return Ok(stats);
}
// TODO: Iterate all manifests, find shards on offline nodes,
// reconstruct from remaining shards and place on healthy nodes.
// This requires access to the DistributedStore and manifest listing
// which will be wired in when the full healing pipeline is implemented.
tracing::info!(
"Found {} offline nodes, scanning for affected shards",
offline_nodes.len()
);
// Iterate all bucket directories under manifest_dir
let mut bucket_entries = match fs::read_dir(&self.manifest_dir).await {
Ok(e) => e,
Err(_) => return Ok(stats),
};
while let Some(bucket_entry) = bucket_entries.next_entry().await? {
if !bucket_entry.metadata().await?.is_dir() {
continue;
}
let bucket_name = bucket_entry.file_name().to_string_lossy().to_string();
if bucket_name.starts_with('.') {
continue;
}
// Scan manifests in this bucket
self.heal_bucket(&bucket_name, &offline_nodes, &mut stats)
.await;
// Yield to avoid starving foreground I/O
tokio::task::yield_now().await;
}
Ok(stats)
}
async fn heal_bucket(
&self,
bucket: &str,
offline_nodes: &[String],
stats: &mut HealStats,
) {
let bucket_dir = self.manifest_dir.join(bucket);
let manifests = match self.collect_manifests(&bucket_dir).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(bucket = bucket, error = %e, "Failed to list manifests");
stats.errors += 1;
return;
}
};
let local_id = self.state.local_node_id().to_string();
for manifest in &manifests {
for chunk in &manifest.chunks {
// Check if any shard in this chunk is on an offline node
let affected: Vec<_> = chunk
.shard_placements
.iter()
.filter(|p| offline_nodes.contains(&p.node_id))
.collect();
if affected.is_empty() {
continue;
}
stats.shards_checked += chunk.shard_placements.len() as u64;
// Try to reconstruct missing shards from available ones
let k = manifest.data_shards;
let total = manifest.data_shards + manifest.parity_shards;
// Count available shards (those NOT on offline nodes)
let available_count = chunk
.shard_placements
.iter()
.filter(|p| !offline_nodes.contains(&p.node_id))
.count();
if available_count < k {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
available = available_count,
needed = k,
"Cannot heal chunk: not enough available shards"
);
stats.errors += 1;
continue;
}
// Fetch available shards (only local ones for now)
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut fetched = 0usize;
for placement in &chunk.shard_placements {
if offline_nodes.contains(&placement.node_id) {
continue; // Skip offline nodes
}
if fetched >= k {
break;
}
if placement.node_id == local_id {
let shard_id = ShardId {
bucket: manifest.bucket.clone(),
key: manifest.key.clone(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
if let Some(store) = self.local_shard_stores.get(store_idx) {
if let Ok((data, _)) = store.read_shard(&shard_id).await {
shards[placement.shard_index as usize] = Some(data);
fetched += 1;
}
}
}
// TODO: fetch from other online remote nodes
}
if fetched < k {
tracing::warn!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Not enough local shards to heal, skipping"
);
continue;
}
// Reconstruct all shards
let reconstructed = match self.erasure_coder.decode_chunk(
&mut shards,
chunk.data_size,
) {
Ok(_) => true,
Err(e) => {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
error = %e,
"Reconstruction failed"
);
stats.errors += 1;
false
}
};
if !reconstructed {
continue;
}
// Re-encode to get all shards back (including the missing ones)
let full_data_size = chunk.data_size;
let mut data_buf = Vec::with_capacity(full_data_size);
for i in 0..k {
if let Some(ref shard) = shards[i] {
data_buf.extend_from_slice(shard);
}
}
data_buf.truncate(full_data_size);
let all_shards = match self.erasure_coder.encode_chunk(&data_buf) {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "Re-encoding for heal failed");
stats.errors += 1;
continue;
}
};
// Write the missing shards to the first available local drive
for affected_placement in &affected {
let shard_idx = affected_placement.shard_index as usize;
if shard_idx < all_shards.len() {
let shard_data = &all_shards[shard_idx];
let checksum = crc32c::crc32c(shard_data);
let shard_id = ShardId {
bucket: manifest.bucket.clone(),
key: manifest.key.clone(),
chunk_index: chunk.chunk_index,
shard_index: affected_placement.shard_index,
};
// Place on first available local drive
if let Some(store) = self.local_shard_stores.first() {
match store.write_shard(&shard_id, shard_data, checksum).await {
Ok(()) => {
stats.shards_healed += 1;
tracing::info!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
shard = affected_placement.shard_index,
"Shard healed successfully"
);
}
Err(e) => {
tracing::error!(error = %e, "Failed to write healed shard");
stats.errors += 1;
}
}
}
}
}
tokio::task::yield_now().await;
}
}
}
/// Collect all manifests under a bucket directory.
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
let mut manifests = Vec::new();
self.collect_manifests_recursive(dir, &mut manifests).await?;
Ok(manifests)
}
fn collect_manifests_recursive<'a>(
&'a self,
dir: &'a std::path::Path,
manifests: &'a mut Vec<ObjectManifest>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let mut entries = match fs::read_dir(dir).await {
Ok(e) => e,
Err(_) => return Ok(()),
};
while let Some(entry) = entries.next_entry().await? {
let meta = entry.metadata().await?;
let name = entry.file_name().to_string_lossy().to_string();
if meta.is_dir() {
self.collect_manifests_recursive(&entry.path(), manifests)
.await?;
} else if name.ends_with(".manifest.json") {
if let Ok(content) = fs::read_to_string(entry.path()).await {
if let Ok(manifest) = serde_json::from_str::<ObjectManifest>(&content) {
manifests.push(manifest);
}
}
}
}
Ok(())
})
}
}
#[derive(Debug, Default)]

View File

@@ -3,8 +3,12 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use super::drive_manager::{DriveManager, DriveStatus};
use super::protocol::{
ClusterRequest, ClusterResponse, HeartbeatMessage, JoinRequestMessage, NodeInfo,
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage,
NodeInfo,
};
use super::quic_transport::QuicTransport;
use super::state::ClusterState;
@@ -15,6 +19,7 @@ pub struct MembershipManager {
transport: Arc<QuicTransport>,
heartbeat_interval: Duration,
local_node_info: NodeInfo,
drive_manager: Option<Arc<Mutex<DriveManager>>>,
}
impl MembershipManager {
@@ -29,9 +34,16 @@ impl MembershipManager {
transport,
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
local_node_info,
drive_manager: None,
}
}
/// Set the drive manager for health reporting in heartbeats.
pub fn with_drive_manager(mut self, dm: Arc<Mutex<DriveManager>>) -> Self {
self.drive_manager = Some(dm);
self
}
/// Join the cluster by contacting seed nodes.
/// Sends a JoinRequest to each seed node until one accepts.
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
@@ -129,6 +141,9 @@ impl MembershipManager {
let topology_version = self.state.version().await;
let mut responded = Vec::new();
// Collect drive health states
let drive_states = self.collect_drive_states().await;
for peer in &peers {
let addr: SocketAddr = match peer.quic_addr.parse() {
Ok(a) => a,
@@ -138,7 +153,7 @@ impl MembershipManager {
let heartbeat = ClusterRequest::Heartbeat(HeartbeatMessage {
node_id: self.local_node_info.node_id.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
drive_states: Vec::new(), // TODO: populate from DriveManager
drive_states: drive_states.clone(),
topology_version,
});
@@ -181,4 +196,31 @@ impl MembershipManager {
let _response = self.transport.send_request(&conn, heartbeat).await?;
Ok(())
}
/// Collect drive health states from the DriveManager, if available.
async fn collect_drive_states(&self) -> Vec<DriveStateInfo> {
let dm = match &self.drive_manager {
Some(dm) => dm,
None => return Vec::new(),
};
let mut manager = dm.lock().await;
let results = manager.check_all_drives().await;
results
.into_iter()
.map(|(idx, status)| {
let status_str = match status {
DriveStatus::Online => "online",
DriveStatus::Degraded => "degraded",
DriveStatus::Offline => "offline",
DriveStatus::Healing => "healing",
};
DriveStateInfo {
drive_index: idx as u32,
status: status_str.to_string(),
}
})
.collect()
}
}

View File

@@ -25,6 +25,7 @@ use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore;
use crate::cluster::config::ErasureConfig;
use crate::cluster::drive_manager::DriveManager;
use crate::cluster::membership::MembershipManager;
use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo;
@@ -217,13 +218,21 @@ impl StorageServer {
};
cluster_state.add_node(local_node_info.clone()).await;
// Join cluster if seed nodes are configured
let membership = Arc::new(MembershipManager::new(
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
local_node_info,
// Initialize drive manager for health monitoring
let drive_manager = Arc::new(tokio::sync::Mutex::new(
DriveManager::new(&cluster_config.drives).await?,
));
// Join cluster if seed nodes are configured
let membership = Arc::new(
MembershipManager::new(
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
local_node_info,
)
.with_drive_manager(drive_manager),
);
membership
.join_cluster(&cluster_config.seed_nodes)
.await?;

View File

@@ -811,14 +811,18 @@ impl StorageBackend {
pub fn policies_dir(&self) -> std::path::PathBuf {
match self {
StorageBackend::Standalone(fs) => fs.policies_dir(),
StorageBackend::Clustered(_) => PathBuf::from(".policies"), // TODO: proper policies in cluster mode
StorageBackend::Clustered(ds) => ds.policies_dir(),
}
}
pub async fn initialize(&self) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.initialize().await,
StorageBackend::Clustered(_) => Ok(()), // Cluster init happens separately
StorageBackend::Clustered(ds) => {
// Ensure policies directory exists
tokio::fs::create_dir_all(ds.policies_dir()).await?;
Ok(())
}
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstorage',
version: '6.1.0',
version: '6.2.0',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
}