Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 494dac1267 | |||
| cea3407777 |
@@ -1,5 +1,14 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-21 - 6.3.1 - fix(cluster)
|
||||||
|
improve shard reconstruction validation and start background healing service
|
||||||
|
|
||||||
|
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
|
||||||
|
- verify reconstructed shards before writing healed data back to disk
|
||||||
|
- start the healing service during server initialization with shared local shard stores
|
||||||
|
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
|
||||||
|
- clean up unused variables and imports across cluster modules
|
||||||
|
|
||||||
## 2026-03-21 - 6.3.0 - feat(readme)
|
## 2026-03-21 - 6.3.0 - feat(readme)
|
||||||
document distributed cluster mode, erasure coding, and QUIC-based architecture
|
document distributed cluster mode, erasure coding, and QUIC-based architecture
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartstorage",
|
"name": "@push.rocks/smartstorage",
|
||||||
"version": "6.3.0",
|
"version": "6.3.1",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
@@ -205,7 +205,7 @@ impl DistributedStore {
|
|||||||
|
|
||||||
// Determine which chunks to fetch based on range
|
// Determine which chunks to fetch based on range
|
||||||
let chunk_size = manifest.chunk_size as u64;
|
let chunk_size = manifest.chunk_size as u64;
|
||||||
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) =
|
let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
|
||||||
if let Some((start, end)) = range {
|
if let Some((start, end)) = range {
|
||||||
let first = (start / chunk_size) as usize;
|
let first = (start / chunk_size) as usize;
|
||||||
let last = (end / chunk_size) as usize;
|
let last = (end / chunk_size) as usize;
|
||||||
@@ -1036,10 +1036,6 @@ impl DistributedStore {
|
|||||||
// Internal: fetch + reconstruct
|
// Internal: fetch + reconstruct
|
||||||
// ============================
|
// ============================
|
||||||
|
|
||||||
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
|
|
||||||
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
|
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
|
||||||
async fn fetch_and_reconstruct_chunk_for_object(
|
async fn fetch_and_reconstruct_chunk_for_object(
|
||||||
&self,
|
&self,
|
||||||
@@ -1047,7 +1043,7 @@ impl DistributedStore {
|
|||||||
bucket: &str,
|
bucket: &str,
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Vec<u8>> {
|
) -> Result<Vec<u8>> {
|
||||||
let k = self.erasure_config.data_shards;
|
let k = self.erasure_config.read_quorum();
|
||||||
let total = self.erasure_config.total_shards();
|
let total = self.erasure_config.total_shards();
|
||||||
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
|
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
|
||||||
let mut succeeded = 0usize;
|
let mut succeeded = 0usize;
|
||||||
|
|||||||
@@ -256,6 +256,18 @@ impl HealingService {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Verify reconstructed shards are consistent
|
||||||
|
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
|
||||||
|
tracing::error!(
|
||||||
|
bucket = manifest.bucket,
|
||||||
|
key = manifest.key,
|
||||||
|
chunk = chunk.chunk_index,
|
||||||
|
"Shard verification failed after reconstruction"
|
||||||
|
);
|
||||||
|
stats.errors += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Write the missing shards to the first available local drive
|
// Write the missing shards to the first available local drive
|
||||||
for affected_placement in &affected {
|
for affected_placement in &affected {
|
||||||
let shard_idx = affected_placement.shard_index as usize;
|
let shard_idx = affected_placement.shard_index as usize;
|
||||||
|
|||||||
@@ -1,3 +1,7 @@
|
|||||||
|
// Cluster modules contain forward-looking public API that is incrementally wired.
|
||||||
|
// Allow dead_code for methods/structs not yet called from the main server path.
|
||||||
|
#![allow(dead_code)]
|
||||||
|
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod coordinator;
|
pub mod coordinator;
|
||||||
pub mod drive_manager;
|
pub mod drive_manager;
|
||||||
|
|||||||
@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
|
|||||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
|
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
||||||
|
|
||||||
use super::protocol::{
|
use super::protocol::{
|
||||||
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
||||||
};
|
};
|
||||||
@@ -225,20 +223,14 @@ impl QuicTransport {
|
|||||||
mut recv: quinn::RecvStream,
|
mut recv: quinn::RecvStream,
|
||||||
shard_store: Arc<ShardStore>,
|
shard_store: Arc<ShardStore>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// Read the length-prefixed request header
|
// Read the full request (length-prefixed bincode + optional trailing data)
|
||||||
let mut len_buf = [0u8; 4];
|
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
|
||||||
recv.read_exact(&mut len_buf).await?;
|
let (request, header_len) = protocol::decode_request(&raw)?;
|
||||||
let msg_len = u32::from_le_bytes(len_buf) as usize;
|
|
||||||
|
|
||||||
let mut msg_buf = vec![0u8; msg_len];
|
|
||||||
recv.read_exact(&mut msg_buf).await?;
|
|
||||||
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
|
|
||||||
|
|
||||||
match request {
|
match request {
|
||||||
ClusterRequest::ShardWrite(write_req) => {
|
ClusterRequest::ShardWrite(write_req) => {
|
||||||
// Read shard data from the stream
|
// Shard data follows the header in the raw buffer
|
||||||
let mut shard_data = vec![0u8; write_req.shard_data_length as usize];
|
let shard_data = &raw[header_len..];
|
||||||
recv.read_exact(&mut shard_data).await?;
|
|
||||||
|
|
||||||
let shard_id = ShardId {
|
let shard_id = ShardId {
|
||||||
bucket: write_req.bucket,
|
bucket: write_req.bucket,
|
||||||
@@ -348,8 +340,6 @@ impl QuicTransport {
|
|||||||
// will be handled by the membership and coordinator modules.
|
// will be handled by the membership and coordinator modules.
|
||||||
// For now, send a generic ack.
|
// For now, send a generic ack.
|
||||||
_ => {
|
_ => {
|
||||||
let response_data = recv.read_to_end(0).await.unwrap_or_default();
|
|
||||||
drop(response_data);
|
|
||||||
let err = protocol::ErrorResponse {
|
let err = protocol::ErrorResponse {
|
||||||
request_id: String::new(),
|
request_id: String::new(),
|
||||||
code: "NotImplemented".to_string(),
|
code: "NotImplemented".to_string(),
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::PathBuf;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::AsyncWriteExt;
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
|||||||
@@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig;
|
|||||||
use crate::policy::{self, PolicyDecision, PolicyStore};
|
use crate::policy::{self, PolicyDecision, PolicyStore};
|
||||||
use crate::error::StorageError;
|
use crate::error::StorageError;
|
||||||
use crate::cluster::coordinator::DistributedStore;
|
use crate::cluster::coordinator::DistributedStore;
|
||||||
use crate::cluster::config::ErasureConfig;
|
|
||||||
use crate::cluster::drive_manager::DriveManager;
|
use crate::cluster::drive_manager::DriveManager;
|
||||||
|
use crate::cluster::healing::HealingService;
|
||||||
use crate::cluster::membership::MembershipManager;
|
use crate::cluster::membership::MembershipManager;
|
||||||
use crate::cluster::placement;
|
use crate::cluster::placement;
|
||||||
use crate::cluster::protocol::NodeInfo;
|
use crate::cluster::protocol::NodeInfo;
|
||||||
@@ -237,9 +237,15 @@ impl StorageServer {
|
|||||||
.join_cluster(&cluster_config.seed_nodes)
|
.join_cluster(&cluster_config.seed_nodes)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
// Build local shard stores (one per drive) for shared use
|
||||||
|
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
|
||||||
|
.iter()
|
||||||
|
.map(|p| Arc::new(ShardStore::new(p.clone())))
|
||||||
|
.collect();
|
||||||
|
|
||||||
// Start QUIC accept loop for incoming connections
|
// Start QUIC accept loop for incoming connections
|
||||||
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone()));
|
let shard_store_for_accept = local_shard_stores[0].clone();
|
||||||
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
||||||
let transport_clone = transport.clone();
|
let transport_clone = transport.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
transport_clone
|
transport_clone
|
||||||
@@ -249,11 +255,24 @@ impl StorageServer {
|
|||||||
|
|
||||||
// Start heartbeat loop
|
// Start heartbeat loop
|
||||||
let membership_clone = membership.clone();
|
let membership_clone = membership.clone();
|
||||||
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
|
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Start healing service
|
||||||
|
let healing_service = HealingService::new(
|
||||||
|
cluster_state.clone(),
|
||||||
|
&erasure_config,
|
||||||
|
local_shard_stores.clone(),
|
||||||
|
manifest_dir.clone(),
|
||||||
|
24, // scan every 24 hours
|
||||||
|
)?;
|
||||||
|
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
healing_service.run(heal_shutdown_rx).await;
|
||||||
|
});
|
||||||
|
|
||||||
// Create distributed store
|
// Create distributed store
|
||||||
let distributed_store = DistributedStore::new(
|
let distributed_store = DistributedStore::new(
|
||||||
cluster_state,
|
cluster_state,
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartstorage',
|
name: '@push.rocks/smartstorage',
|
||||||
version: '6.3.0',
|
version: '6.3.1',
|
||||||
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user