From cea34077773cf3ea8b3f2dce7576eae0dd4278ef Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 21 Mar 2026 22:19:51 +0000 Subject: [PATCH] fix(cluster): improve shard reconstruction validation and start background healing service --- changelog.md | 9 +++++++++ rust/src/cluster/coordinator.rs | 8 ++------ rust/src/cluster/healing.rs | 12 ++++++++++++ rust/src/cluster/mod.rs | 4 ++++ rust/src/cluster/quic_transport.rs | 20 +++++--------------- rust/src/cluster/shard_store.rs | 2 +- rust/src/server.rs | 27 +++++++++++++++++++++++---- ts/00_commitinfo_data.ts | 2 +- 8 files changed, 57 insertions(+), 27 deletions(-) diff --git a/changelog.md b/changelog.md index 47fb150..55985f7 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-03-21 - 6.3.1 - fix(cluster) +improve shard reconstruction validation and start background healing service + +- use the erasure read quorum when reconstructing chunks instead of assuming data shard count +- verify reconstructed shards before writing healed data back to disk +- start the healing service during server initialization with shared local shard stores +- simplify QUIC request handling by decoding the full request buffer including trailing shard data +- clean up unused variables and imports across cluster modules + ## 2026-03-21 - 6.3.0 - feat(readme) document distributed cluster mode, erasure coding, and QUIC-based architecture diff --git a/rust/src/cluster/coordinator.rs b/rust/src/cluster/coordinator.rs index d5a0bc9..b426eeb 100644 --- a/rust/src/cluster/coordinator.rs +++ b/rust/src/cluster/coordinator.rs @@ -205,7 +205,7 @@ impl DistributedStore { // Determine which chunks to fetch based on range let chunk_size = manifest.chunk_size as u64; - let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) = + let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) = if let Some((start, end)) = range { let first = (start / chunk_size) as usize; let last = (end / chunk_size) as usize; @@ -1036,10 +1036,6 @@ impl DistributedStore { // Internal: fetch + reconstruct // ============================ - async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result> { - self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await - } - /// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups. async fn fetch_and_reconstruct_chunk_for_object( &self, @@ -1047,7 +1043,7 @@ impl DistributedStore { bucket: &str, key: &str, ) -> Result> { - let k = self.erasure_config.data_shards; + let k = self.erasure_config.read_quorum(); let total = self.erasure_config.total_shards(); let mut shards: Vec>> = vec![None; total]; let mut succeeded = 0usize; diff --git a/rust/src/cluster/healing.rs b/rust/src/cluster/healing.rs index 11a833e..51b1b35 100644 --- a/rust/src/cluster/healing.rs +++ b/rust/src/cluster/healing.rs @@ -256,6 +256,18 @@ impl HealingService { } }; + // Verify reconstructed shards are consistent + if !self.erasure_coder.verify(&all_shards).unwrap_or(false) { + tracing::error!( + bucket = manifest.bucket, + key = manifest.key, + chunk = chunk.chunk_index, + "Shard verification failed after reconstruction" + ); + stats.errors += 1; + continue; + } + // Write the missing shards to the first available local drive for affected_placement in &affected { let shard_idx = affected_placement.shard_index as usize; diff --git a/rust/src/cluster/mod.rs b/rust/src/cluster/mod.rs index 36a5b5a..87a280c 100644 --- a/rust/src/cluster/mod.rs +++ b/rust/src/cluster/mod.rs @@ -1,3 +1,7 @@ +// Cluster modules contain forward-looking public API that is incrementally wired. +// Allow dead_code for methods/structs not yet called from the main server path. +#![allow(dead_code)] + pub mod config; pub mod coordinator; pub mod drive_manager; diff --git a/rust/src/cluster/quic_transport.rs b/rust/src/cluster/quic_transport.rs index d2e89f1..16485e1 100644 --- a/rust/src/cluster/quic_transport.rs +++ b/rust/src/cluster/quic_transport.rs @@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use std::net::SocketAddr; use std::sync::Arc; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - use super::protocol::{ self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest, }; @@ -225,20 +223,14 @@ impl QuicTransport { mut recv: quinn::RecvStream, shard_store: Arc, ) -> Result<()> { - // Read the length-prefixed request header - let mut len_buf = [0u8; 4]; - recv.read_exact(&mut len_buf).await?; - let msg_len = u32::from_le_bytes(len_buf) as usize; - - let mut msg_buf = vec![0u8; msg_len]; - recv.read_exact(&mut msg_buf).await?; - let request: ClusterRequest = bincode::deserialize(&msg_buf)?; + // Read the full request (length-prefixed bincode + optional trailing data) + let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max + let (request, header_len) = protocol::decode_request(&raw)?; match request { ClusterRequest::ShardWrite(write_req) => { - // Read shard data from the stream - let mut shard_data = vec![0u8; write_req.shard_data_length as usize]; - recv.read_exact(&mut shard_data).await?; + // Shard data follows the header in the raw buffer + let shard_data = &raw[header_len..]; let shard_id = ShardId { bucket: write_req.bucket, @@ -348,8 +340,6 @@ impl QuicTransport { // will be handled by the membership and coordinator modules. // For now, send a generic ack. _ => { - let response_data = recv.read_to_end(0).await.unwrap_or_default(); - drop(response_data); let err = protocol::ErrorResponse { request_id: String::new(), code: "NotImplemented".to_string(), diff --git a/rust/src/cluster/shard_store.rs b/rust/src/cluster/shard_store.rs index c6feb55..37aa7c2 100644 --- a/rust/src/cluster/shard_store.rs +++ b/rust/src/cluster/shard_store.rs @@ -1,6 +1,6 @@ use anyhow::Result; use serde::{Deserialize, Serialize}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use tokio::fs; use tokio::io::AsyncWriteExt; diff --git a/rust/src/server.rs b/rust/src/server.rs index 6ab4bcf..5764e5d 100644 --- a/rust/src/server.rs +++ b/rust/src/server.rs @@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig; use crate::policy::{self, PolicyDecision, PolicyStore}; use crate::error::StorageError; use crate::cluster::coordinator::DistributedStore; -use crate::cluster::config::ErasureConfig; use crate::cluster::drive_manager::DriveManager; +use crate::cluster::healing::HealingService; use crate::cluster::membership::MembershipManager; use crate::cluster::placement; use crate::cluster::protocol::NodeInfo; @@ -237,9 +237,15 @@ impl StorageServer { .join_cluster(&cluster_config.seed_nodes) .await?; + // Build local shard stores (one per drive) for shared use + let local_shard_stores: Vec> = drive_paths + .iter() + .map(|p| Arc::new(ShardStore::new(p.clone()))) + .collect(); + // Start QUIC accept loop for incoming connections - let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone())); - let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); + let shard_store_for_accept = local_shard_stores[0].clone(); + let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); let transport_clone = transport.clone(); tokio::spawn(async move { transport_clone @@ -249,11 +255,24 @@ impl StorageServer { // Start heartbeat loop let membership_clone = membership.clone(); - let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); + let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); tokio::spawn(async move { membership_clone.heartbeat_loop(hb_shutdown_rx).await; }); + // Start healing service + let healing_service = HealingService::new( + cluster_state.clone(), + &erasure_config, + local_shard_stores.clone(), + manifest_dir.clone(), + 24, // scan every 24 hours + )?; + let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false); + tokio::spawn(async move { + healing_service.run(heal_shutdown_rx).await; + }); + // Create distributed store let distributed_store = DistributedStore::new( cluster_state, diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 406986b..122be41 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstorage', - version: '6.3.0', + version: '6.3.1', description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.' }