Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 494dac1267 | |||
| cea3407777 |
@@ -1,5 +1,14 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-21 - 6.3.1 - fix(cluster)
|
||||
improve shard reconstruction validation and start background healing service
|
||||
|
||||
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
|
||||
- verify reconstructed shards before writing healed data back to disk
|
||||
- start the healing service during server initialization with shared local shard stores
|
||||
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
|
||||
- clean up unused variables and imports across cluster modules
|
||||
|
||||
## 2026-03-21 - 6.3.0 - feat(readme)
|
||||
document distributed cluster mode, erasure coding, and QUIC-based architecture
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstorage",
|
||||
"version": "6.3.0",
|
||||
"version": "6.3.1",
|
||||
"private": false,
|
||||
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
@@ -205,7 +205,7 @@ impl DistributedStore {
|
||||
|
||||
// Determine which chunks to fetch based on range
|
||||
let chunk_size = manifest.chunk_size as u64;
|
||||
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) =
|
||||
let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
|
||||
if let Some((start, end)) = range {
|
||||
let first = (start / chunk_size) as usize;
|
||||
let last = (end / chunk_size) as usize;
|
||||
@@ -1036,10 +1036,6 @@ impl DistributedStore {
|
||||
// Internal: fetch + reconstruct
|
||||
// ============================
|
||||
|
||||
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
|
||||
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
|
||||
}
|
||||
|
||||
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
|
||||
async fn fetch_and_reconstruct_chunk_for_object(
|
||||
&self,
|
||||
@@ -1047,7 +1043,7 @@ impl DistributedStore {
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
) -> Result<Vec<u8>> {
|
||||
let k = self.erasure_config.data_shards;
|
||||
let k = self.erasure_config.read_quorum();
|
||||
let total = self.erasure_config.total_shards();
|
||||
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
|
||||
let mut succeeded = 0usize;
|
||||
|
||||
@@ -256,6 +256,18 @@ impl HealingService {
|
||||
}
|
||||
};
|
||||
|
||||
// Verify reconstructed shards are consistent
|
||||
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
|
||||
tracing::error!(
|
||||
bucket = manifest.bucket,
|
||||
key = manifest.key,
|
||||
chunk = chunk.chunk_index,
|
||||
"Shard verification failed after reconstruction"
|
||||
);
|
||||
stats.errors += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Write the missing shards to the first available local drive
|
||||
for affected_placement in &affected {
|
||||
let shard_idx = affected_placement.shard_index as usize;
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
// Cluster modules contain forward-looking public API that is incrementally wired.
|
||||
// Allow dead_code for methods/structs not yet called from the main server path.
|
||||
#![allow(dead_code)]
|
||||
|
||||
pub mod config;
|
||||
pub mod coordinator;
|
||||
pub mod drive_manager;
|
||||
|
||||
@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use super::protocol::{
|
||||
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
||||
};
|
||||
@@ -225,20 +223,14 @@ impl QuicTransport {
|
||||
mut recv: quinn::RecvStream,
|
||||
shard_store: Arc<ShardStore>,
|
||||
) -> Result<()> {
|
||||
// Read the length-prefixed request header
|
||||
let mut len_buf = [0u8; 4];
|
||||
recv.read_exact(&mut len_buf).await?;
|
||||
let msg_len = u32::from_le_bytes(len_buf) as usize;
|
||||
|
||||
let mut msg_buf = vec![0u8; msg_len];
|
||||
recv.read_exact(&mut msg_buf).await?;
|
||||
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
|
||||
// Read the full request (length-prefixed bincode + optional trailing data)
|
||||
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
|
||||
let (request, header_len) = protocol::decode_request(&raw)?;
|
||||
|
||||
match request {
|
||||
ClusterRequest::ShardWrite(write_req) => {
|
||||
// Read shard data from the stream
|
||||
let mut shard_data = vec![0u8; write_req.shard_data_length as usize];
|
||||
recv.read_exact(&mut shard_data).await?;
|
||||
// Shard data follows the header in the raw buffer
|
||||
let shard_data = &raw[header_len..];
|
||||
|
||||
let shard_id = ShardId {
|
||||
bucket: write_req.bucket,
|
||||
@@ -348,8 +340,6 @@ impl QuicTransport {
|
||||
// will be handled by the membership and coordinator modules.
|
||||
// For now, send a generic ack.
|
||||
_ => {
|
||||
let response_data = recv.read_to_end(0).await.unwrap_or_default();
|
||||
drop(response_data);
|
||||
let err = protocol::ErrorResponse {
|
||||
request_id: String::new(),
|
||||
code: "NotImplemented".to_string(),
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
|
||||
@@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig;
|
||||
use crate::policy::{self, PolicyDecision, PolicyStore};
|
||||
use crate::error::StorageError;
|
||||
use crate::cluster::coordinator::DistributedStore;
|
||||
use crate::cluster::config::ErasureConfig;
|
||||
use crate::cluster::drive_manager::DriveManager;
|
||||
use crate::cluster::healing::HealingService;
|
||||
use crate::cluster::membership::MembershipManager;
|
||||
use crate::cluster::placement;
|
||||
use crate::cluster::protocol::NodeInfo;
|
||||
@@ -237,9 +237,15 @@ impl StorageServer {
|
||||
.join_cluster(&cluster_config.seed_nodes)
|
||||
.await?;
|
||||
|
||||
// Build local shard stores (one per drive) for shared use
|
||||
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
|
||||
.iter()
|
||||
.map(|p| Arc::new(ShardStore::new(p.clone())))
|
||||
.collect();
|
||||
|
||||
// Start QUIC accept loop for incoming connections
|
||||
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone()));
|
||||
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
||||
let shard_store_for_accept = local_shard_stores[0].clone();
|
||||
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
||||
let transport_clone = transport.clone();
|
||||
tokio::spawn(async move {
|
||||
transport_clone
|
||||
@@ -249,11 +255,24 @@ impl StorageServer {
|
||||
|
||||
// Start heartbeat loop
|
||||
let membership_clone = membership.clone();
|
||||
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
||||
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
||||
tokio::spawn(async move {
|
||||
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
|
||||
});
|
||||
|
||||
// Start healing service
|
||||
let healing_service = HealingService::new(
|
||||
cluster_state.clone(),
|
||||
&erasure_config,
|
||||
local_shard_stores.clone(),
|
||||
manifest_dir.clone(),
|
||||
24, // scan every 24 hours
|
||||
)?;
|
||||
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
|
||||
tokio::spawn(async move {
|
||||
healing_service.run(heal_shutdown_rx).await;
|
||||
});
|
||||
|
||||
// Create distributed store
|
||||
let distributed_store = DistributedStore::new(
|
||||
cluster_state,
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstorage',
|
||||
version: '6.3.0',
|
||||
version: '6.3.1',
|
||||
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user