fix(cluster): improve shard reconstruction validation and start background healing service

This commit is contained in:
2026-03-21 22:19:51 +00:00
parent a009d990d0
commit cea3407777
8 changed files with 57 additions and 27 deletions

View File

@@ -1,5 +1,14 @@
# Changelog
## 2026-03-21 - 6.3.1 - fix(cluster)
improve shard reconstruction validation and start background healing service
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
- verify reconstructed shards before writing healed data back to disk
- start the healing service during server initialization with shared local shard stores
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
- clean up unused variables and imports across cluster modules
## 2026-03-21 - 6.3.0 - feat(readme)
document distributed cluster mode, erasure coding, and QUIC-based architecture

View File

@@ -205,7 +205,7 @@ impl DistributedStore {
// Determine which chunks to fetch based on range
let chunk_size = manifest.chunk_size as u64;
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) =
let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
if let Some((start, end)) = range {
let first = (start / chunk_size) as usize;
let last = (end / chunk_size) as usize;
@@ -1036,10 +1036,6 @@ impl DistributedStore {
// Internal: fetch + reconstruct
// ============================
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
}
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
async fn fetch_and_reconstruct_chunk_for_object(
&self,
@@ -1047,7 +1043,7 @@ impl DistributedStore {
bucket: &str,
key: &str,
) -> Result<Vec<u8>> {
let k = self.erasure_config.data_shards;
let k = self.erasure_config.read_quorum();
let total = self.erasure_config.total_shards();
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut succeeded = 0usize;

View File

@@ -256,6 +256,18 @@ impl HealingService {
}
};
// Verify reconstructed shards are consistent
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Shard verification failed after reconstruction"
);
stats.errors += 1;
continue;
}
// Write the missing shards to the first available local drive
for affected_placement in &affected {
let shard_idx = affected_placement.shard_index as usize;

View File

@@ -1,3 +1,7 @@
// Cluster modules contain forward-looking public API that is incrementally wired.
// Allow dead_code for methods/structs not yet called from the main server path.
#![allow(dead_code)]
pub mod config;
pub mod coordinator;
pub mod drive_manager;

View File

@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
};
@@ -225,20 +223,14 @@ impl QuicTransport {
mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>,
) -> Result<()> {
// Read the length-prefixed request header
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await?;
let msg_len = u32::from_le_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
recv.read_exact(&mut msg_buf).await?;
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
// Read the full request (length-prefixed bincode + optional trailing data)
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
let (request, header_len) = protocol::decode_request(&raw)?;
match request {
ClusterRequest::ShardWrite(write_req) => {
// Read shard data from the stream
let mut shard_data = vec![0u8; write_req.shard_data_length as usize];
recv.read_exact(&mut shard_data).await?;
// Shard data follows the header in the raw buffer
let shard_data = &raw[header_len..];
let shard_id = ShardId {
bucket: write_req.bucket,
@@ -348,8 +340,6 @@ impl QuicTransport {
// will be handled by the membership and coordinator modules.
// For now, send a generic ack.
_ => {
let response_data = recv.read_to_end(0).await.unwrap_or_default();
drop(response_data);
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "NotImplemented".to_string(),

View File

@@ -1,6 +1,6 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use tokio::fs;
use tokio::io::AsyncWriteExt;

View File

@@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore;
use crate::cluster::config::ErasureConfig;
use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager;
use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo;
@@ -237,9 +237,15 @@ impl StorageServer {
.join_cluster(&cluster_config.seed_nodes)
.await?;
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
.iter()
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
// Start QUIC accept loop for incoming connections
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone()));
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let shard_store_for_accept = local_shard_stores[0].clone();
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone();
tokio::spawn(async move {
transport_clone
@@ -249,11 +255,24 @@ impl StorageServer {
// Start heartbeat loop
let membership_clone = membership.clone();
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
});
// Start healing service
let healing_service = HealingService::new(
cluster_state.clone(),
&erasure_config,
local_shard_stores.clone(),
manifest_dir.clone(),
24, // scan every 24 hours
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
// Create distributed store
let distributed_store = DistributedStore::new(
cluster_state,

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstorage',
version: '6.3.0',
version: '6.3.1',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
}