Compare commits

...

4 Commits

Author SHA1 Message Date
c683b02e8c v6.3.2
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-23 21:21:50 +00:00
b64be03c2f fix(docs): update license ownership and correct README license file reference 2026-03-23 21:21:50 +00:00
494dac1267 v6.3.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:19:51 +00:00
cea3407777 fix(cluster): improve shard reconstruction validation and start background healing service 2026-03-21 22:19:51 +00:00
11 changed files with 66 additions and 30 deletions

View File

@@ -1,5 +1,20 @@
# Changelog
## 2026-03-23 - 6.3.2 - fix(docs)
update license ownership and correct README license file reference
- Adjusts the copyright holder name in the license file
- Fixes the README link to match the lowercase license filename
## 2026-03-21 - 6.3.1 - fix(cluster)
improve shard reconstruction validation and start background healing service
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
- verify reconstructed shards before writing healed data back to disk
- start the healing service during server initialization with shared local shard stores
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
- clean up unused variables and imports across cluster modules
## 2026-03-21 - 6.3.0 - feat(readme)
document distributed cluster mode, erasure coding, and QUIC-based architecture

View File

@@ -1,4 +1,4 @@
Copyright (c) 2021 Lossless GmbH (hello@lossless.com)
Copyright (c) 2021 Task Venture Capital GmbH (hello@task.vc)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartstorage",
"version": "6.3.0",
"version": "6.3.2",
"private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js",

View File

@@ -578,7 +578,7 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

View File

@@ -205,7 +205,7 @@ impl DistributedStore {
// Determine which chunks to fetch based on range
let chunk_size = manifest.chunk_size as u64;
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) =
let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
if let Some((start, end)) = range {
let first = (start / chunk_size) as usize;
let last = (end / chunk_size) as usize;
@@ -1036,10 +1036,6 @@ impl DistributedStore {
// Internal: fetch + reconstruct
// ============================
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
}
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
async fn fetch_and_reconstruct_chunk_for_object(
&self,
@@ -1047,7 +1043,7 @@ impl DistributedStore {
bucket: &str,
key: &str,
) -> Result<Vec<u8>> {
let k = self.erasure_config.data_shards;
let k = self.erasure_config.read_quorum();
let total = self.erasure_config.total_shards();
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut succeeded = 0usize;

View File

@@ -256,6 +256,18 @@ impl HealingService {
}
};
// Verify reconstructed shards are consistent
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Shard verification failed after reconstruction"
);
stats.errors += 1;
continue;
}
// Write the missing shards to the first available local drive
for affected_placement in &affected {
let shard_idx = affected_placement.shard_index as usize;

View File

@@ -1,3 +1,7 @@
// Cluster modules contain forward-looking public API that is incrementally wired.
// Allow dead_code for methods/structs not yet called from the main server path.
#![allow(dead_code)]
pub mod config;
pub mod coordinator;
pub mod drive_manager;

View File

@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
};
@@ -225,20 +223,14 @@ impl QuicTransport {
mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>,
) -> Result<()> {
// Read the length-prefixed request header
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await?;
let msg_len = u32::from_le_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
recv.read_exact(&mut msg_buf).await?;
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
// Read the full request (length-prefixed bincode + optional trailing data)
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
let (request, header_len) = protocol::decode_request(&raw)?;
match request {
ClusterRequest::ShardWrite(write_req) => {
// Read shard data from the stream
let mut shard_data = vec![0u8; write_req.shard_data_length as usize];
recv.read_exact(&mut shard_data).await?;
// Shard data follows the header in the raw buffer
let shard_data = &raw[header_len..];
let shard_id = ShardId {
bucket: write_req.bucket,
@@ -348,8 +340,6 @@ impl QuicTransport {
// will be handled by the membership and coordinator modules.
// For now, send a generic ack.
_ => {
let response_data = recv.read_to_end(0).await.unwrap_or_default();
drop(response_data);
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "NotImplemented".to_string(),

View File

@@ -1,6 +1,6 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use tokio::fs;
use tokio::io::AsyncWriteExt;

View File

@@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore;
use crate::cluster::config::ErasureConfig;
use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager;
use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo;
@@ -237,9 +237,15 @@ impl StorageServer {
.join_cluster(&cluster_config.seed_nodes)
.await?;
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
.iter()
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
// Start QUIC accept loop for incoming connections
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone()));
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let shard_store_for_accept = local_shard_stores[0].clone();
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone();
tokio::spawn(async move {
transport_clone
@@ -249,11 +255,24 @@ impl StorageServer {
// Start heartbeat loop
let membership_clone = membership.clone();
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
});
// Start healing service
let healing_service = HealingService::new(
cluster_state.clone(),
&erasure_config,
local_shard_stores.clone(),
manifest_dir.clone(),
24, // scan every 24 hours
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
// Create distributed store
let distributed_store = DistributedStore::new(
cluster_state,

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstorage',
version: '6.3.0',
version: '6.3.2',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
}