Compare commits

..

2 Commits

Author SHA1 Message Date
494dac1267 v6.3.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:19:51 +00:00
cea3407777 fix(cluster): improve shard reconstruction validation and start background healing service 2026-03-21 22:19:51 +00:00
9 changed files with 58 additions and 28 deletions

View File

@@ -1,5 +1,14 @@
# Changelog # Changelog
## 2026-03-21 - 6.3.1 - fix(cluster)
improve shard reconstruction validation and start background healing service
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
- verify reconstructed shards before writing healed data back to disk
- start the healing service during server initialization with shared local shard stores
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
- clean up unused variables and imports across cluster modules
## 2026-03-21 - 6.3.0 - feat(readme) ## 2026-03-21 - 6.3.0 - feat(readme)
document distributed cluster mode, erasure coding, and QUIC-based architecture document distributed cluster mode, erasure coding, and QUIC-based architecture

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartstorage", "name": "@push.rocks/smartstorage",
"version": "6.3.0", "version": "6.3.1",
"private": false, "private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.", "description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

View File

@@ -205,7 +205,7 @@ impl DistributedStore {
// Determine which chunks to fetch based on range // Determine which chunks to fetch based on range
let chunk_size = manifest.chunk_size as u64; let chunk_size = manifest.chunk_size as u64;
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) = let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
if let Some((start, end)) = range { if let Some((start, end)) = range {
let first = (start / chunk_size) as usize; let first = (start / chunk_size) as usize;
let last = (end / chunk_size) as usize; let last = (end / chunk_size) as usize;
@@ -1036,10 +1036,6 @@ impl DistributedStore {
// Internal: fetch + reconstruct // Internal: fetch + reconstruct
// ============================ // ============================
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
}
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups. /// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
async fn fetch_and_reconstruct_chunk_for_object( async fn fetch_and_reconstruct_chunk_for_object(
&self, &self,
@@ -1047,7 +1043,7 @@ impl DistributedStore {
bucket: &str, bucket: &str,
key: &str, key: &str,
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
let k = self.erasure_config.data_shards; let k = self.erasure_config.read_quorum();
let total = self.erasure_config.total_shards(); let total = self.erasure_config.total_shards();
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total]; let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut succeeded = 0usize; let mut succeeded = 0usize;

View File

@@ -256,6 +256,18 @@ impl HealingService {
} }
}; };
// Verify reconstructed shards are consistent
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Shard verification failed after reconstruction"
);
stats.errors += 1;
continue;
}
// Write the missing shards to the first available local drive // Write the missing shards to the first available local drive
for affected_placement in &affected { for affected_placement in &affected {
let shard_idx = affected_placement.shard_index as usize; let shard_idx = affected_placement.shard_index as usize;

View File

@@ -1,3 +1,7 @@
// Cluster modules contain forward-looking public API that is incrementally wired.
// Allow dead_code for methods/structs not yet called from the main server path.
#![allow(dead_code)]
pub mod config; pub mod config;
pub mod coordinator; pub mod coordinator;
pub mod drive_manager; pub mod drive_manager;

View File

@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::protocol::{ use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest, self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
}; };
@@ -225,20 +223,14 @@ impl QuicTransport {
mut recv: quinn::RecvStream, mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>, shard_store: Arc<ShardStore>,
) -> Result<()> { ) -> Result<()> {
// Read the length-prefixed request header // Read the full request (length-prefixed bincode + optional trailing data)
let mut len_buf = [0u8; 4]; let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
recv.read_exact(&mut len_buf).await?; let (request, header_len) = protocol::decode_request(&raw)?;
let msg_len = u32::from_le_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
recv.read_exact(&mut msg_buf).await?;
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
match request { match request {
ClusterRequest::ShardWrite(write_req) => { ClusterRequest::ShardWrite(write_req) => {
// Read shard data from the stream // Shard data follows the header in the raw buffer
let mut shard_data = vec![0u8; write_req.shard_data_length as usize]; let shard_data = &raw[header_len..];
recv.read_exact(&mut shard_data).await?;
let shard_id = ShardId { let shard_id = ShardId {
bucket: write_req.bucket, bucket: write_req.bucket,
@@ -348,8 +340,6 @@ impl QuicTransport {
// will be handled by the membership and coordinator modules. // will be handled by the membership and coordinator modules.
// For now, send a generic ack. // For now, send a generic ack.
_ => { _ => {
let response_data = recv.read_to_end(0).await.unwrap_or_default();
drop(response_data);
let err = protocol::ErrorResponse { let err = protocol::ErrorResponse {
request_id: String::new(), request_id: String::new(),
code: "NotImplemented".to_string(), code: "NotImplemented".to_string(),

View File

@@ -1,6 +1,6 @@
use anyhow::Result; use anyhow::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;

View File

@@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore}; use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError; use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore; use crate::cluster::coordinator::DistributedStore;
use crate::cluster::config::ErasureConfig;
use crate::cluster::drive_manager::DriveManager; use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager; use crate::cluster::membership::MembershipManager;
use crate::cluster::placement; use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo; use crate::cluster::protocol::NodeInfo;
@@ -237,9 +237,15 @@ impl StorageServer {
.join_cluster(&cluster_config.seed_nodes) .join_cluster(&cluster_config.seed_nodes)
.await?; .await?;
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
.iter()
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
// Start QUIC accept loop for incoming connections // Start QUIC accept loop for incoming connections
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone())); let shard_store_for_accept = local_shard_stores[0].clone();
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone(); let transport_clone = transport.clone();
tokio::spawn(async move { tokio::spawn(async move {
transport_clone transport_clone
@@ -249,11 +255,24 @@ impl StorageServer {
// Start heartbeat loop // Start heartbeat loop
let membership_clone = membership.clone(); let membership_clone = membership.clone();
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move { tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await; membership_clone.heartbeat_loop(hb_shutdown_rx).await;
}); });
// Start healing service
let healing_service = HealingService::new(
cluster_state.clone(),
&erasure_config,
local_shard_stores.clone(),
manifest_dir.clone(),
24, // scan every 24 hours
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
// Create distributed store // Create distributed store
let distributed_store = DistributedStore::new( let distributed_store = DistributedStore::new(
cluster_state, cluster_state,

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstorage', name: '@push.rocks/smartstorage',
version: '6.3.0', version: '6.3.1',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.' description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
} }