Compare commits

..

4 Commits
v6.3.0 ... main

Author SHA1 Message Date
c683b02e8c v6.3.2
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-23 21:21:50 +00:00
b64be03c2f fix(docs): update license ownership and correct README license file reference 2026-03-23 21:21:50 +00:00
494dac1267 v6.3.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:19:51 +00:00
cea3407777 fix(cluster): improve shard reconstruction validation and start background healing service 2026-03-21 22:19:51 +00:00
11 changed files with 66 additions and 30 deletions

View File

@@ -1,5 +1,20 @@
# Changelog # Changelog
## 2026-03-23 - 6.3.2 - fix(docs)
update license ownership and correct README license file reference
- Adjusts the copyright holder name in the license file
- Fixes the README link to match the lowercase license filename
## 2026-03-21 - 6.3.1 - fix(cluster)
improve shard reconstruction validation and start background healing service
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
- verify reconstructed shards before writing healed data back to disk
- start the healing service during server initialization with shared local shard stores
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
- clean up unused variables and imports across cluster modules
## 2026-03-21 - 6.3.0 - feat(readme) ## 2026-03-21 - 6.3.0 - feat(readme)
document distributed cluster mode, erasure coding, and QUIC-based architecture document distributed cluster mode, erasure coding, and QUIC-based architecture

View File

@@ -1,4 +1,4 @@
Copyright (c) 2021 Lossless GmbH (hello@lossless.com) Copyright (c) 2021 Task Venture Capital GmbH (hello@task.vc)
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartstorage", "name": "@push.rocks/smartstorage",
"version": "6.3.0", "version": "6.3.2",
"private": false, "private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.", "description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

View File

@@ -578,7 +578,7 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
## License and Legal Information ## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file. This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file. **Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

View File

@@ -205,7 +205,7 @@ impl DistributedStore {
// Determine which chunks to fetch based on range // Determine which chunks to fetch based on range
let chunk_size = manifest.chunk_size as u64; let chunk_size = manifest.chunk_size as u64;
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) = let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
if let Some((start, end)) = range { if let Some((start, end)) = range {
let first = (start / chunk_size) as usize; let first = (start / chunk_size) as usize;
let last = (end / chunk_size) as usize; let last = (end / chunk_size) as usize;
@@ -1036,10 +1036,6 @@ impl DistributedStore {
// Internal: fetch + reconstruct // Internal: fetch + reconstruct
// ============================ // ============================
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
}
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups. /// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
async fn fetch_and_reconstruct_chunk_for_object( async fn fetch_and_reconstruct_chunk_for_object(
&self, &self,
@@ -1047,7 +1043,7 @@ impl DistributedStore {
bucket: &str, bucket: &str,
key: &str, key: &str,
) -> Result<Vec<u8>> { ) -> Result<Vec<u8>> {
let k = self.erasure_config.data_shards; let k = self.erasure_config.read_quorum();
let total = self.erasure_config.total_shards(); let total = self.erasure_config.total_shards();
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total]; let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut succeeded = 0usize; let mut succeeded = 0usize;

View File

@@ -256,6 +256,18 @@ impl HealingService {
} }
}; };
// Verify reconstructed shards are consistent
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Shard verification failed after reconstruction"
);
stats.errors += 1;
continue;
}
// Write the missing shards to the first available local drive // Write the missing shards to the first available local drive
for affected_placement in &affected { for affected_placement in &affected {
let shard_idx = affected_placement.shard_index as usize; let shard_idx = affected_placement.shard_index as usize;

View File

@@ -1,3 +1,7 @@
// Cluster modules contain forward-looking public API that is incrementally wired.
// Allow dead_code for methods/structs not yet called from the main server path.
#![allow(dead_code)]
pub mod config; pub mod config;
pub mod coordinator; pub mod coordinator;
pub mod drive_manager; pub mod drive_manager;

View File

@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::protocol::{ use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest, self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
}; };
@@ -225,20 +223,14 @@ impl QuicTransport {
mut recv: quinn::RecvStream, mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>, shard_store: Arc<ShardStore>,
) -> Result<()> { ) -> Result<()> {
// Read the length-prefixed request header // Read the full request (length-prefixed bincode + optional trailing data)
let mut len_buf = [0u8; 4]; let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
recv.read_exact(&mut len_buf).await?; let (request, header_len) = protocol::decode_request(&raw)?;
let msg_len = u32::from_le_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
recv.read_exact(&mut msg_buf).await?;
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
match request { match request {
ClusterRequest::ShardWrite(write_req) => { ClusterRequest::ShardWrite(write_req) => {
// Read shard data from the stream // Shard data follows the header in the raw buffer
let mut shard_data = vec![0u8; write_req.shard_data_length as usize]; let shard_data = &raw[header_len..];
recv.read_exact(&mut shard_data).await?;
let shard_id = ShardId { let shard_id = ShardId {
bucket: write_req.bucket, bucket: write_req.bucket,
@@ -348,8 +340,6 @@ impl QuicTransport {
// will be handled by the membership and coordinator modules. // will be handled by the membership and coordinator modules.
// For now, send a generic ack. // For now, send a generic ack.
_ => { _ => {
let response_data = recv.read_to_end(0).await.unwrap_or_default();
drop(response_data);
let err = protocol::ErrorResponse { let err = protocol::ErrorResponse {
request_id: String::new(), request_id: String::new(),
code: "NotImplemented".to_string(), code: "NotImplemented".to_string(),

View File

@@ -1,6 +1,6 @@
use anyhow::Result; use anyhow::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;

View File

@@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore}; use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError; use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore; use crate::cluster::coordinator::DistributedStore;
use crate::cluster::config::ErasureConfig;
use crate::cluster::drive_manager::DriveManager; use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager; use crate::cluster::membership::MembershipManager;
use crate::cluster::placement; use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo; use crate::cluster::protocol::NodeInfo;
@@ -237,9 +237,15 @@ impl StorageServer {
.join_cluster(&cluster_config.seed_nodes) .join_cluster(&cluster_config.seed_nodes)
.await?; .await?;
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
.iter()
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
// Start QUIC accept loop for incoming connections // Start QUIC accept loop for incoming connections
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone())); let shard_store_for_accept = local_shard_stores[0].clone();
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone(); let transport_clone = transport.clone();
tokio::spawn(async move { tokio::spawn(async move {
transport_clone transport_clone
@@ -249,11 +255,24 @@ impl StorageServer {
// Start heartbeat loop // Start heartbeat loop
let membership_clone = membership.clone(); let membership_clone = membership.clone();
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move { tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await; membership_clone.heartbeat_loop(hb_shutdown_rx).await;
}); });
// Start healing service
let healing_service = HealingService::new(
cluster_state.clone(),
&erasure_config,
local_shard_stores.clone(),
manifest_dir.clone(),
24, // scan every 24 hours
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
// Create distributed store // Create distributed store
let distributed_store = DistributedStore::new( let distributed_store = DistributedStore::new(
cluster_state, cluster_state,

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstorage', name: '@push.rocks/smartstorage',
version: '6.3.0', version: '6.3.2',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.' description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
} }