Compare commits

...

8 Commits
v6.1.0 ... main

Author SHA1 Message Date
c683b02e8c v6.3.2
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-23 21:21:50 +00:00
b64be03c2f fix(docs): update license ownership and correct README license file reference 2026-03-23 21:21:50 +00:00
494dac1267 v6.3.1
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:19:51 +00:00
cea3407777 fix(cluster): improve shard reconstruction validation and start background healing service 2026-03-21 22:19:51 +00:00
a009d990d0 v6.3.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:04:36 +00:00
08d545f5db feat(readme): document distributed cluster mode, erasure coding, and QUIC-based architecture 2026-03-21 22:04:36 +00:00
a0a282c712 v6.2.0
Some checks failed
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:00:41 +00:00
3eb0045676 feat(cluster): add shard healing, drive health heartbeats, and clustered policy directory support 2026-03-21 22:00:41 +00:00
13 changed files with 1042 additions and 142 deletions

View File

@@ -1,5 +1,35 @@
# Changelog # Changelog
## 2026-03-23 - 6.3.2 - fix(docs)
update license ownership and correct README license file reference
- Adjusts the copyright holder name in the license file
- Fixes the README link to match the lowercase license filename
## 2026-03-21 - 6.3.1 - fix(cluster)
improve shard reconstruction validation and start background healing service
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
- verify reconstructed shards before writing healed data back to disk
- start the healing service during server initialization with shared local shard stores
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
- clean up unused variables and imports across cluster modules
## 2026-03-21 - 6.3.0 - feat(readme)
document distributed cluster mode, erasure coding, and QUIC-based architecture
- Expand README overview and feature matrix to highlight clustering, multi-drive awareness, and distributed storage capabilities
- Add standalone and cluster mode usage examples plus cluster configuration options
- Document clustering internals including erasure coding, quorum behavior, QUIC transport, self-healing, and on-disk layout
## 2026-03-21 - 6.2.0 - feat(cluster)
add shard healing, drive health heartbeats, and clustered policy directory support
- implements manifest-based healing that scans affected shards on offline nodes, reconstructs data with erasure coding, and rewrites recovered shards to local storage
- includes drive status reporting in membership heartbeats by wiring DriveManager health checks into cluster heartbeat messages
- adds clustered policies directory initialization and exposes policy storage paths from the distributed coordinator
- extends distributed coordinator support for remote shard read and delete operations plus multipart upload session metadata
## 2026-03-21 - 6.1.0 - feat(cluster) ## 2026-03-21 - 6.1.0 - feat(cluster)
add clustered storage backend with QUIC transport, erasure coding, and shard management add clustered storage backend with QUIC transport, erasure coding, and shard management

View File

@@ -1,4 +1,4 @@
Copyright (c) 2021 Lossless GmbH (hello@lossless.com) Copyright (c) 2021 Task Venture Capital GmbH (hello@task.vc)
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal

View File

@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartstorage", "name": "@push.rocks/smartstorage",
"version": "6.1.0", "version": "6.3.2",
"private": false, "private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.", "description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js", "main": "dist_ts/index.js",

241
readme.md
View File

@@ -1,6 +1,6 @@
# @push.rocks/smartstorage # @push.rocks/smartstorage
A high-performance, S3-compatible local storage server powered by a **Rust core** with a clean TypeScript API. Drop-in replacement for AWS S3 during development and testing — no cloud, no Docker, no MinIO. Just `npm install` and go. A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just `npm install` and go. 🚀
## Issue Reporting and Security ## Issue Reporting and Security
@@ -15,23 +15,34 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
| Large file uploads | Streaming, zero-copy | Yes | OOM risk | | Large file uploads | Streaming, zero-copy | Yes | OOM risk |
| Range requests | Seek-based | Yes | Full read | | Range requests | Seek-based | Yes | Full read |
| Language | Rust + TypeScript | Go | JavaScript | | Language | Rust + TypeScript | Go | JavaScript |
| Multipart uploads | Full support | Yes | No | | Multipart uploads | Full support | Yes | No |
| Auth | AWS SigV4 (full verification) | Full IAM | Basic | | Auth | AWS SigV4 (full verification) | Full IAM | Basic |
| Bucket policies | IAM-style evaluation | Yes | No | | Bucket policies | IAM-style evaluation | Yes | No |
| Clustering | ✅ Erasure-coded, QUIC | Yes | No |
| Multi-drive awareness | ✅ Per-drive health | Yes | No |
### Core Features ### Core Features
- **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure - 🦀 **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure
- **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client - 📦 **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client
- **Filesystem-backed storage** — buckets map to directories, objects to files - 💾 **Filesystem-backed storage** — buckets map to directories, objects to files
- **Streaming multipart uploads** — large files without memory pressure - 📤 **Streaming multipart uploads** — large files without memory pressure
- **Byte-range requests** — `seek()` directly to the requested byte offset - 📐 **Byte-range requests**`seek()` directly to the requested byte offset
- **AWS SigV4 authentication** — full signature verification with constant-time comparison and 15-min clock skew enforcement - 🔐 **AWS SigV4 authentication** — full signature verification with constant-time comparison
- **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation, wildcard matching, and anonymous access support - 📋 **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation and wildcard matching
- **CORS middleware** — configurable cross-origin support - 🌐 **CORS middleware** — configurable cross-origin support
- **Structured logging** — tracing-based, error through debug levels - 🧹 **Clean slate mode**wipe storage on startup for test isolation
- **Clean slate mode** — wipe storage on startup for test isolation - **Test-first design**start/stop in milliseconds, no port conflicts
- **Test-first design** — start/stop in milliseconds, no port conflicts
### Clustering Features
- 🔗 **Erasure coding** — Reed-Solomon (configurable k data + m parity shards) for storage efficiency and fault tolerance
- 🚄 **QUIC transport** — multiplexed, encrypted inter-node communication via `quinn` with zero head-of-line blocking
- 💽 **Multi-drive awareness** — each node manages multiple independent storage paths with health monitoring
- 🤝 **Cluster membership** — static seed config + runtime join, heartbeat-based failure detection
- ✍️ **Quorum writes** — data is only acknowledged after k+1 shards are persisted
- 📖 **Quorum reads** — reconstruct from any k available shards, local-first fast path
- 🩹 **Self-healing** — background scanner detects and reconstructs missing/corrupt shards
## Installation ## Installation
@@ -43,6 +54,8 @@ pnpm add @push.rocks/smartstorage -D
## Quick Start ## Quick Start
### Standalone Mode (Dev & Test)
```typescript ```typescript
import { SmartStorage } from '@push.rocks/smartstorage'; import { SmartStorage } from '@push.rocks/smartstorage';
@@ -63,6 +76,31 @@ const descriptor = await storage.getStorageDescriptor();
await storage.stop(); await storage.stop();
``` ```
### Cluster Mode (Distributed)
```typescript
import { SmartStorage } from '@push.rocks/smartstorage';
const storage = await SmartStorage.createAndStart({
server: { port: 3000 },
cluster: {
enabled: true,
nodeId: 'node-1',
quicPort: 4000,
seedNodes: ['192.168.1.11:4000', '192.168.1.12:4000'],
erasure: {
dataShards: 4, // k: minimum shards to reconstruct data
parityShards: 2, // m: fault tolerance (can lose up to m shards)
},
drives: {
paths: ['/mnt/disk1', '/mnt/disk2', '/mnt/disk3'],
},
},
});
```
Objects are automatically split into chunks (default 4 MB), erasure-coded into 6 shards (4 data + 2 parity), and distributed across drives/nodes. Any 4 of 6 shards can reconstruct the original data.
## Configuration ## Configuration
All config fields are optional — sensible defaults are applied automatically. All config fields are optional — sensible defaults are applied automatically.
@@ -75,7 +113,7 @@ const config: ISmartStorageConfig = {
port: 3000, // Default: 3000 port: 3000, // Default: 3000
address: '0.0.0.0', // Default: '0.0.0.0' address: '0.0.0.0', // Default: '0.0.0.0'
silent: false, // Default: false silent: false, // Default: false
region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing
}, },
storage: { storage: {
directory: './my-data', // Default: .nogit/bucketsDir directory: './my-data', // Default: .nogit/bucketsDir
@@ -111,6 +149,22 @@ const config: ISmartStorageConfig = {
expirationDays: 7, expirationDays: 7,
cleanupIntervalMinutes: 60, cleanupIntervalMinutes: 60,
}, },
cluster: { // Optional — omit for standalone mode
enabled: true,
nodeId: 'node-1', // Auto-generated UUID if omitted
quicPort: 4000, // Default: 4000
seedNodes: [], // Addresses of existing cluster members
erasure: {
dataShards: 4, // Default: 4
parityShards: 2, // Default: 2
chunkSizeBytes: 4194304, // Default: 4 MB
},
drives: {
paths: ['/mnt/disk1', '/mnt/disk2'],
},
heartbeatIntervalMs: 5000, // Default: 5000
heartbeatTimeoutMs: 30000, // Default: 30000
},
}; };
const storage = await SmartStorage.createAndStart(config); const storage = await SmartStorage.createAndStart(config);
@@ -207,7 +261,7 @@ const files = await dir.listFiles();
## Multipart Uploads ## Multipart Uploads
For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory. For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory. In cluster mode, each part is independently erasure-coded and distributed.
```typescript ```typescript
import { import {
@@ -255,8 +309,6 @@ When `auth.enabled` is `true`, the auth pipeline works as follows:
### Setting a Bucket Policy ### Setting a Bucket Policy
Use the S3 `PutBucketPolicy` API (or any S3 client that supports it):
```typescript ```typescript
import { PutBucketPolicyCommand } from '@aws-sdk/client-s3'; import { PutBucketPolicyCommand } from '@aws-sdk/client-s3';
@@ -294,6 +346,81 @@ await client.send(new PutBucketPolicyCommand({
Deleting a bucket automatically removes its associated policy. Deleting a bucket automatically removes its associated policy.
## Clustering Deep Dive 🔗
smartstorage can run as a distributed storage cluster where multiple nodes cooperate to store and retrieve data with built-in redundancy.
### How It Works
```
Client ──HTTP PUT──▶ Node A (coordinator)
├─ Split object into 4 MB chunks
├─ Erasure-code each chunk (4 data + 2 parity = 6 shards)
├──QUIC──▶ Node B (shard writes)
├──QUIC──▶ Node C (shard writes)
└─ Local disk (shard writes)
```
1. **Any node can coordinate** — the client connects to any cluster member
2. **Objects are chunked** — large objects split into fixed-size pieces (default 4 MB)
3. **Each chunk is erasure-coded** — Reed-Solomon produces k data + m parity shards
4. **Shards are distributed** — placed across different nodes and drives for fault isolation
5. **Quorum guarantees consistency** — writes need k+1 acks, reads need k shards
### Erasure Coding
With the default `4+2` configuration:
- Storage overhead: **33%** (vs. 200% for 3x replication)
- Fault tolerance: **any 2 drives/nodes can fail** simultaneously
- Read efficiency: only **4 of 6 shards** needed to reconstruct data
| Config | Total Shards | Overhead | Tolerance | Min Nodes |
|--------|-------------|----------|-----------|-----------|
| 4+2 | 6 | 33% | 2 failures | 3 |
| 6+3 | 9 | 50% | 3 failures | 5 |
| 2+1 | 3 | 50% | 1 failure | 2 |
### QUIC Transport
Inter-node communication uses [QUIC](https://en.wikipedia.org/wiki/QUIC) via the `quinn` library:
- 🔒 **Built-in TLS** — self-signed certs auto-generated at cluster init
- 🔀 **Multiplexed streams** — concurrent shard transfers without head-of-line blocking
-**Connection pooling** — persistent connections to peer nodes
- 🌊 **Natural backpressure** — QUIC flow control prevents overloading slow peers
### Cluster Membership
- **Static seed nodes** — initial cluster defined in config
- **Runtime join** — new nodes can join a running cluster
- **Heartbeat monitoring** — every 5s (configurable), with suspect/offline detection
- **Split-brain prevention** — nodes only mark peers offline when they have majority
### Self-Healing
A background scanner periodically (default: every 24h):
1. Checks shard checksums (CRC32C) for bit-rot detection
2. Identifies shards on offline nodes
3. Reconstructs missing shards from remaining data using Reed-Solomon
4. Places healed shards on healthy drives
Healing runs at low priority to avoid impacting foreground I/O.
### Erasure Set Formation
Drives are organized into fixed **erasure sets** at cluster initialization:
```
3 nodes × 4 drives each = 12 drives total
With 6-shard erasure sets → 2 erasure sets
Set 0: Node1-Disk0, Node2-Disk0, Node3-Disk0, Node1-Disk1, Node2-Disk1, Node3-Disk1
Set 1: Node1-Disk2, Node2-Disk2, Node3-Disk2, Node1-Disk3, Node2-Disk3, Node3-Disk3
```
Drives are interleaved across nodes for maximum fault isolation. New nodes form new erasure sets — existing data is never rebalanced.
## Testing Integration ## Testing Integration
```typescript ```typescript
@@ -358,31 +485,37 @@ Get connection details for S3-compatible clients. Returns:
smartstorage uses a **hybrid Rust + TypeScript** architecture: smartstorage uses a **hybrid Rust + TypeScript** architecture:
``` ```
┌─────────────────────────────────┐ ┌──────────────────────────────────────────────
│ Your Code (AWS SDK, etc.) │ │ Your Code (AWS SDK, SmartBucket, etc.)
│ ↕ HTTP (localhost:3000) │ │ ↕ HTTP (localhost:3000)
├─────────────────────────────────┤ ├──────────────────────────────────────────────
│ ruststorage binary (Rust) │ │ ruststorage binary (Rust)
│ ├─ hyper 1.x HTTP server │ │ ├─ hyper 1.x HTTP server
│ ├─ S3 path-style routing │ │ ├─ S3 path-style routing
│ ├─ Streaming storage layer │ ├─ StorageBackend (Standalone or Clustered)
├─ Multipart manager │ ├─ FileStore (single-node mode)
├─ SigV4 auth + policy engine │ └─ DistributedStore (cluster mode)
├─ CORS middleware │ ├─ ErasureCoder (Reed-Solomon)
└─ S3 XML response builder │ ├─ ShardStore (per-drive storage)
├─────────────────────────────────┤ │ │ ├─ QuicTransport (quinn) │
TypeScript (thin IPC wrapper) │ ├─ ClusterState & Membership
├─ SmartStorage class │ └─ HealingService
│ ├─ RustBridge (stdin/stdout) │ ├─ SigV4 auth + policy engine
─ Config & S3 descriptor ─ CORS middleware
└─────────────────────────────────┘ │ └─ S3 XML response builder │
├──────────────────────────────────────────────┤
│ TypeScript (thin IPC wrapper) │
│ ├─ SmartStorage class │
│ ├─ RustBridge (stdin/stdout JSON IPC) │
│ └─ Config & S3 descriptor │
└──────────────────────────────────────────────┘
``` ```
**Why Rust?** The TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests. **Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
**IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`. **IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`, `clusterStatus`.
### S3-Compatible Operations Supported ### S3-Compatible Operations
| Operation | Method | Path | | Operation | Method | Path |
|-----------|--------|------| |-----------|--------|------|
@@ -407,31 +540,45 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
### On-Disk Format ### On-Disk Format
**Standalone mode:**
``` ```
{storage.directory}/ {storage.directory}/
{bucket}/ {bucket}/
{key}._storage_object # Object data {key}._storage_object # Object data
{key}._storage_object.metadata.json # Metadata (content-type, x-amz-meta-*, etc.) {key}._storage_object.metadata.json # Metadata (content-type, x-amz-meta-*, etc.)
{key}._storage_object.md5 # Cached MD5 hash {key}._storage_object.md5 # Cached MD5 hash
.multipart/ .multipart/
{upload-id}/ {upload-id}/
metadata.json # Upload metadata (bucket, key, parts) metadata.json # Upload metadata
part-1 # Part data files part-1, part-2, ... # Part data files
part-2
...
.policies/ .policies/
{bucket}.policy.json # Bucket policy (IAM JSON format) {bucket}.policy.json # Bucket policy (IAM JSON format)
```
**Cluster mode:**
```
{drive_path}/.smartstorage/
format.json # Drive metadata (cluster ID, erasure set)
data/{bucket}/{key_hash}/{key}/
chunk-{N}/shard-{M}.dat # Erasure-coded shard data
chunk-{N}/shard-{M}.meta # Shard metadata (checksum, size)
{storage.directory}/
.manifests/{bucket}/
{key}.manifest.json # Object manifest (shard placements, checksums)
.buckets/{bucket}/ # Bucket metadata
.policies/{bucket}.policy.json # Bucket policies
``` ```
## Related Packages ## Related Packages
- [`@push.rocks/smartbucket`](https://code.foss.global/push.rocks/smartbucket) — High-level S3-compatible abstraction layer - [`@push.rocks/smartbucket`](https://code.foss.global/push.rocks/smartbucket) — High-level S3-compatible abstraction layer
- [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript <-> Rust IPC bridge - [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript Rust IPC bridge
- [`@git.zone/tsrust`](https://code.foss.global/git.zone/tsrust) — Rust cross-compilation for npm packages - [`@git.zone/tsrust`](https://code.foss.global/git.zone/tsrust) — Rust cross-compilation for npm packages
## License and Legal Information ## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file. This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file. **Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.

View File

@@ -13,7 +13,7 @@ use super::config::ErasureConfig;
use super::erasure::ErasureCoder; use super::erasure::ErasureCoder;
use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement}; use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement};
use super::placement::ErasureSet; use super::placement::ErasureSet;
use super::protocol::ShardWriteRequest; use super::protocol::{ClusterRequest, ShardDeleteRequest, ShardReadRequest, ShardWriteRequest};
use super::quic_transport::QuicTransport; use super::quic_transport::QuicTransport;
use super::shard_store::{ShardId, ShardStore}; use super::shard_store::{ShardId, ShardStore};
use super::state::ClusterState; use super::state::ClusterState;
@@ -22,6 +22,29 @@ use crate::storage::{
ListObjectsResult, MultipartUploadInfo, PutResult, ListObjectsResult, MultipartUploadInfo, PutResult,
}; };
use serde::{Deserialize, Serialize};
/// Multipart upload session metadata.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MultipartSession {
upload_id: String,
bucket: String,
key: String,
initiated: String,
metadata: HashMap<String, String>,
parts: HashMap<u32, String>, // part_number -> etag
}
/// Per-part info stored during multipart upload.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PartInfo {
part_number: u32,
etag: String,
size: u64,
part_key: String,
chunks: Vec<ChunkManifest>,
}
/// Distributed storage coordinator. /// Distributed storage coordinator.
/// ///
/// Handles S3 operations by distributing erasure-coded shards across /// Handles S3 operations by distributing erasure-coded shards across
@@ -36,6 +59,8 @@ pub struct DistributedStore {
manifest_dir: PathBuf, manifest_dir: PathBuf,
/// Root directory for buckets metadata /// Root directory for buckets metadata
buckets_dir: PathBuf, buckets_dir: PathBuf,
/// Root directory for bucket policies
policies_dir: PathBuf,
erasure_config: ErasureConfig, erasure_config: ErasureConfig,
} }
@@ -55,6 +80,8 @@ impl DistributedStore {
.map(|p| Arc::new(ShardStore::new(p.clone()))) .map(|p| Arc::new(ShardStore::new(p.clone())))
.collect(); .collect();
let policies_dir = buckets_dir.join(".policies");
Ok(Self { Ok(Self {
state, state,
transport, transport,
@@ -62,10 +89,16 @@ impl DistributedStore {
local_shard_stores, local_shard_stores,
manifest_dir, manifest_dir,
buckets_dir, buckets_dir,
policies_dir,
erasure_config, erasure_config,
}) })
} }
/// Get the policies directory path.
pub fn policies_dir(&self) -> PathBuf {
self.policies_dir.clone()
}
// ============================ // ============================
// Object operations // Object operations
// ============================ // ============================
@@ -172,7 +205,7 @@ impl DistributedStore {
// Determine which chunks to fetch based on range // Determine which chunks to fetch based on range
let chunk_size = manifest.chunk_size as u64; let chunk_size = manifest.chunk_size as u64;
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) = let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
if let Some((start, end)) = range { if let Some((start, end)) = range {
let first = (start / chunk_size) as usize; let first = (start / chunk_size) as usize;
let last = (end / chunk_size) as usize; let last = (end / chunk_size) as usize;
@@ -187,7 +220,9 @@ impl DistributedStore {
let mut full_data = Vec::new(); let mut full_data = Vec::new();
for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) { for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) {
let chunk = &manifest.chunks[chunk_idx]; let chunk = &manifest.chunks[chunk_idx];
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?; let reconstructed = self
.fetch_and_reconstruct_chunk_for_object(chunk, bucket, key)
.await?;
full_data.extend_from_slice(&reconstructed); full_data.extend_from_slice(&reconstructed);
} }
@@ -248,26 +283,45 @@ impl DistributedStore {
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> { pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
// Load manifest to find all shards // Load manifest to find all shards
if let Ok(manifest) = self.load_manifest(bucket, key).await { if let Ok(manifest) = self.load_manifest(bucket, key).await {
// Delete shards from all drives let local_id = self.state.local_node_id().to_string();
// Delete shards from all drives (local and remote)
for chunk in &manifest.chunks { for chunk in &manifest.chunks {
for placement in &chunk.shard_placements { for placement in &chunk.shard_placements {
let shard_id = ShardId { if placement.node_id == local_id {
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if placement.node_id == self.state.local_node_id() {
// Local delete // Local delete
let shard_id = ShardId {
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if let Some(store) = self if let Some(store) = self
.local_shard_stores .local_shard_stores
.get(placement.drive_id.parse::<usize>().unwrap_or(0)) .get(placement.drive_id.parse::<usize>().unwrap_or(0))
{ {
let _ = store.delete_shard(&shard_id).await; let _ = store.delete_shard(&shard_id).await;
} }
} else {
// Remote delete via QUIC (best-effort, don't fail the delete)
if let Err(e) = self
.delete_shard_remote(
&placement.node_id,
bucket,
key,
chunk.chunk_index,
placement.shard_index,
)
.await
{
tracing::warn!(
node = %placement.node_id,
shard = placement.shard_index,
error = %e,
"Failed to delete remote shard (will be cleaned up by healing)"
);
}
} }
// TODO: send delete to remote nodes via QUIC
} }
} }
} }
@@ -300,7 +354,9 @@ impl DistributedStore {
// Read source object fully, then reconstruct // Read source object fully, then reconstruct
let mut full_data = Vec::new(); let mut full_data = Vec::new();
for chunk in &src_manifest.chunks { for chunk in &src_manifest.chunks {
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?; let reconstructed = self
.fetch_and_reconstruct_chunk_for_object(chunk, src_bucket, src_key)
.await?;
full_data.extend_from_slice(&reconstructed); full_data.extend_from_slice(&reconstructed);
} }
@@ -526,45 +582,305 @@ impl DistributedStore {
} }
// ============================ // ============================
// Multipart (delegated to local temp storage for now) // Multipart uploads
// ============================ // ============================
pub async fn initiate_multipart( pub async fn initiate_multipart(
&self, &self,
_bucket: &str, bucket: &str,
_key: &str, key: &str,
_metadata: HashMap<String, String>, metadata: HashMap<String, String>,
) -> Result<String> { ) -> Result<String> {
// TODO: Implement distributed multipart if !self.bucket_exists(bucket).await {
anyhow::bail!("Multipart uploads not yet supported in cluster mode") return Err(crate::error::StorageError::no_such_bucket().into());
}
let upload_id = uuid::Uuid::new_v4().to_string().replace('-', "");
let upload_dir = self.multipart_dir().join(&upload_id);
fs::create_dir_all(&upload_dir).await?;
// Store multipart session metadata
let session = MultipartSession {
upload_id: upload_id.clone(),
bucket: bucket.to_string(),
key: key.to_string(),
initiated: Utc::now().to_rfc3339(),
metadata,
parts: HashMap::new(),
};
let json = serde_json::to_string_pretty(&session)?;
fs::write(upload_dir.join("session.json"), json).await?;
Ok(upload_id)
} }
pub async fn upload_part( pub async fn upload_part(
&self, &self,
_upload_id: &str, upload_id: &str,
_part_number: u32, part_number: u32,
_body: Incoming, body: Incoming,
) -> Result<(String, u64)> { ) -> Result<(String, u64)> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode") let upload_dir = self.multipart_dir().join(upload_id);
if !upload_dir.is_dir() {
return Err(crate::error::StorageError::no_such_upload().into());
}
// Read session to get bucket/key
let session = self.load_multipart_session(upload_id).await?;
let erasure_set = self
.state
.get_erasure_set_for_object(&session.bucket, &session.key)
.await
.ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?;
// Buffer and erasure-code the part data
let chunk_size = self.erasure_config.chunk_size_bytes;
let mut chunk_buffer = Vec::with_capacity(chunk_size);
let mut chunk_index: u32 = 0;
let mut chunks = Vec::new();
let mut total_size: u64 = 0;
let mut hasher = Md5::new();
// Use upload_id + part_number as a unique key prefix for shard storage
let part_key = format!("{}/_multipart/{}/part-{}", session.key, upload_id, part_number);
let mut body = body;
loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
hasher.update(&data);
total_size += data.len() as u64;
chunk_buffer.extend_from_slice(&data);
while chunk_buffer.len() >= chunk_size {
let chunk_data: Vec<u8> =
chunk_buffer.drain(..chunk_size).collect();
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
&session.bucket,
&part_key,
chunk_index,
&chunk_data,
)
.await?;
chunks.push(chunk_manifest);
chunk_index += 1;
}
}
}
Some(Err(e)) => return Err(anyhow::anyhow!("Body read error: {}", e)),
None => break,
}
}
// Process final partial chunk
if !chunk_buffer.is_empty() {
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
&session.bucket,
&part_key,
chunk_index,
&chunk_buffer,
)
.await?;
chunks.push(chunk_manifest);
}
let etag = format!("{:x}", hasher.finalize());
// Save per-part manifest
let part_manifest = PartInfo {
part_number,
etag: etag.clone(),
size: total_size,
part_key: part_key.clone(),
chunks,
};
let part_json = serde_json::to_string_pretty(&part_manifest)?;
fs::write(
upload_dir.join(format!("part-{}.json", part_number)),
part_json,
)
.await?;
Ok((etag, total_size))
} }
pub async fn complete_multipart( pub async fn complete_multipart(
&self, &self,
_upload_id: &str, upload_id: &str,
_parts: &[(u32, String)], parts: &[(u32, String)],
) -> Result<CompleteMultipartResult> { ) -> Result<CompleteMultipartResult> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode") let session = self.load_multipart_session(upload_id).await?;
let upload_dir = self.multipart_dir().join(upload_id);
// Read per-part manifests and concatenate chunks sequentially
let mut all_chunks = Vec::new();
let mut total_size: u64 = 0;
let mut full_hasher = Md5::new();
let mut global_chunk_index: u32 = 0;
for (part_number, _etag) in parts {
let part_path = upload_dir.join(format!("part-{}.json", part_number));
if !part_path.exists() {
return Err(anyhow::anyhow!("Part {} not found", part_number).into());
}
let part_json = fs::read_to_string(&part_path).await?;
let part_info: PartInfo = serde_json::from_str(&part_json)?;
// Reconstruct part data to compute overall MD5
for chunk in &part_info.chunks {
let reconstructed = self
.fetch_and_reconstruct_chunk_for_object(
chunk,
&session.bucket,
&part_info.part_key,
)
.await?;
full_hasher.update(&reconstructed);
total_size += reconstructed.len() as u64;
// Re-index chunks to be sequential in the final object
let mut adjusted_chunk = chunk.clone();
adjusted_chunk.chunk_index = global_chunk_index;
all_chunks.push(adjusted_chunk);
global_chunk_index += 1;
}
}
let etag = format!("{:x}", full_hasher.finalize());
// Build final object manifest
let manifest = ObjectManifest {
bucket: session.bucket.clone(),
key: session.key.clone(),
version_id: uuid::Uuid::new_v4().to_string(),
size: total_size,
content_md5: etag.clone(),
content_type: session
.metadata
.get("content-type")
.cloned()
.unwrap_or_else(|| "binary/octet-stream".to_string()),
metadata: session.metadata.clone(),
created_at: Utc::now().to_rfc3339(),
last_modified: Utc::now().to_rfc3339(),
data_shards: self.erasure_config.data_shards,
parity_shards: self.erasure_config.parity_shards,
chunk_size: self.erasure_config.chunk_size_bytes,
chunks: all_chunks,
};
self.store_manifest(&manifest).await?;
// Clean up multipart upload directory
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(CompleteMultipartResult { etag })
} }
pub async fn abort_multipart(&self, _upload_id: &str) -> Result<()> { pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode") let upload_dir = self.multipart_dir().join(upload_id);
if !upload_dir.is_dir() {
return Err(crate::error::StorageError::no_such_upload().into());
}
// Load session to get bucket/key for shard cleanup
if let Ok(session) = self.load_multipart_session(upload_id).await {
// Read part manifests and delete their shards
let mut entries = fs::read_dir(&upload_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with("part-") && name.ends_with(".json") {
if let Ok(content) = fs::read_to_string(entry.path()).await {
if let Ok(part_info) = serde_json::from_str::<PartInfo>(&content) {
let local_id = self.state.local_node_id().to_string();
for chunk in &part_info.chunks {
for placement in &chunk.shard_placements {
if placement.node_id == local_id {
let shard_id = ShardId {
bucket: session.bucket.clone(),
key: part_info.part_key.clone(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if let Some(store) = self.local_shard_stores.get(
placement.drive_id.parse::<usize>().unwrap_or(0),
) {
let _ = store.delete_shard(&shard_id).await;
}
} else {
let _ = self
.delete_shard_remote(
&placement.node_id,
&session.bucket,
&part_info.part_key,
chunk.chunk_index,
placement.shard_index,
)
.await;
}
}
}
}
}
}
}
}
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(())
} }
pub async fn list_multipart_uploads( pub async fn list_multipart_uploads(
&self, &self,
_bucket: &str, bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> { ) -> Result<Vec<MultipartUploadInfo>> {
Ok(Vec::new()) let multipart_dir = self.multipart_dir();
if !multipart_dir.is_dir() {
return Ok(Vec::new());
}
let mut uploads = Vec::new();
let mut entries = fs::read_dir(&multipart_dir).await?;
while let Some(entry) = entries.next_entry().await? {
if !entry.metadata().await?.is_dir() {
continue;
}
let session_path = entry.path().join("session.json");
if let Ok(content) = fs::read_to_string(&session_path).await {
if let Ok(session) = serde_json::from_str::<MultipartSession>(&content) {
if session.bucket == bucket {
let initiated = DateTime::parse_from_rfc3339(&session.initiated)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
uploads.push(MultipartUploadInfo {
upload_id: session.upload_id,
key: session.key,
initiated,
});
}
}
}
}
Ok(uploads)
}
fn multipart_dir(&self) -> PathBuf {
self.manifest_dir.join(".multipart")
}
async fn load_multipart_session(&self, upload_id: &str) -> Result<MultipartSession> {
let session_path = self.multipart_dir().join(upload_id).join("session.json");
let content = fs::read_to_string(&session_path).await?;
Ok(serde_json::from_str(&content)?)
} }
// ============================ // ============================
@@ -720,44 +1036,59 @@ impl DistributedStore {
// Internal: fetch + reconstruct // Internal: fetch + reconstruct
// ============================ // ============================
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> { /// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
let k = self.erasure_config.data_shards; async fn fetch_and_reconstruct_chunk_for_object(
&self,
chunk: &ChunkManifest,
bucket: &str,
key: &str,
) -> Result<Vec<u8>> {
let k = self.erasure_config.read_quorum();
let total = self.erasure_config.total_shards(); let total = self.erasure_config.total_shards();
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total]; let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut succeeded = 0usize; let mut succeeded = 0usize;
// Try to fetch shards (local first, then remote) // Sort placements: local first for fast path
for placement in &chunk.shard_placements { let mut sorted_placements = chunk.shard_placements.clone();
let local_id = self.state.local_node_id().to_string();
sorted_placements.sort_by_key(|p| if p.node_id == local_id { 0 } else { 1 });
for placement in &sorted_placements {
if succeeded >= k {
break; // Have enough shards
}
let shard_id = ShardId { let shard_id = ShardId {
bucket: String::new(), // Not needed for read bucket: bucket.to_string(),
key: String::new(), key: key.to_string(),
chunk_index: chunk.chunk_index, chunk_index: chunk.chunk_index,
shard_index: placement.shard_index, shard_index: placement.shard_index,
}; };
let result = if placement.node_id == self.state.local_node_id() { let result = if placement.node_id == local_id {
// Local read // Local read
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0); let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
if let Some(store) = self.local_shard_stores.get(store_idx) { if let Some(store) = self.local_shard_stores.get(store_idx) {
// Need to set proper bucket/key on shard_id for local reads
// We get this from the chunk's context, but we don't have it here.
// This will be passed through the manifest's shard placements.
store.read_shard(&shard_id).await.ok() store.read_shard(&shard_id).await.ok()
} else { } else {
None None
} }
} else { } else {
// Remote read via QUIC // Remote read via QUIC
// TODO: implement remote shard read self.read_shard_remote(
None &placement.node_id,
bucket,
key,
chunk.chunk_index,
placement.shard_index,
)
.await
.ok()
}; };
if let Some((data, _checksum)) = result { if let Some((data, _checksum)) = result {
shards[placement.shard_index as usize] = Some(data); shards[placement.shard_index as usize] = Some(data);
succeeded += 1; succeeded += 1;
if succeeded >= k {
break; // Have enough shards
}
} }
} }
@@ -774,6 +1105,66 @@ impl DistributedStore {
.decode_chunk(&mut shards, chunk.data_size) .decode_chunk(&mut shards, chunk.data_size)
} }
async fn read_shard_remote(
&self,
node_id: &str,
bucket: &str,
key: &str,
chunk_index: u32,
shard_index: u32,
) -> Result<(Vec<u8>, u32)> {
let node_info = self
.state
.get_node(node_id)
.await
.ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?;
let addr: SocketAddr = node_info.quic_addr.parse()?;
let conn = self.transport.get_connection(node_id, addr).await?;
let request = ClusterRequest::ShardRead(ShardReadRequest {
request_id: uuid::Uuid::new_v4().to_string(),
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index,
shard_index,
});
match self.transport.send_shard_read(&conn, &request).await? {
Some((data, checksum)) => Ok((data, checksum)),
None => anyhow::bail!("Shard not found on remote node"),
}
}
async fn delete_shard_remote(
&self,
node_id: &str,
bucket: &str,
key: &str,
chunk_index: u32,
shard_index: u32,
) -> Result<()> {
let node_info = self
.state
.get_node(node_id)
.await
.ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?;
let addr: SocketAddr = node_info.quic_addr.parse()?;
let conn = self.transport.get_connection(node_id, addr).await?;
let request = ClusterRequest::ShardDelete(ShardDeleteRequest {
request_id: uuid::Uuid::new_v4().to_string(),
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index,
shard_index,
});
let _response = self.transport.send_request(&conn, &request).await?;
Ok(())
}
// ============================ // ============================
// Manifest storage (local filesystem) // Manifest storage (local filesystem)
// ============================ // ============================

View File

@@ -1,23 +1,40 @@
use anyhow::Result; use anyhow::Result;
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::fs;
use super::coordinator::DistributedStore; use super::config::ErasureConfig;
use super::erasure::ErasureCoder;
use super::metadata::ObjectManifest;
use super::shard_store::{ShardId, ShardStore};
use super::state::ClusterState; use super::state::ClusterState;
/// Background healing service that scans for under-replicated shards /// Background healing service that scans for under-replicated shards
/// and reconstructs them. /// and reconstructs them.
pub struct HealingService { pub struct HealingService {
state: Arc<ClusterState>, state: Arc<ClusterState>,
erasure_coder: ErasureCoder,
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval: Duration, scan_interval: Duration,
} }
impl HealingService { impl HealingService {
pub fn new(state: Arc<ClusterState>, scan_interval_hours: u64) -> Self { pub fn new(
Self { state: Arc<ClusterState>,
erasure_config: &ErasureConfig,
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval_hours: u64,
) -> Result<Self> {
Ok(Self {
state, state,
erasure_coder: ErasureCoder::new(erasure_config)?,
local_shard_stores,
manifest_dir,
scan_interval: Duration::from_secs(scan_interval_hours * 3600), scan_interval: Duration::from_secs(scan_interval_hours * 3600),
} })
} }
/// Run the healing loop as a background task. /// Run the healing loop as a background task.
@@ -53,7 +70,7 @@ impl HealingService {
} }
} }
/// Scan for offline nodes and identify objects that need healing. /// Scan all manifests for shards on offline nodes, reconstruct and re-place them.
async fn heal_scan(&self) -> Result<HealStats> { async fn heal_scan(&self) -> Result<HealStats> {
let mut stats = HealStats::default(); let mut stats = HealStats::default();
@@ -63,25 +80,272 @@ impl HealingService {
return Ok(stats); return Ok(stats);
} }
tracing::info!( // Check that we have majority before healing (split-brain prevention)
"Found {} offline nodes, scanning for affected shards",
offline_nodes.len()
);
// Check that we have majority before healing
// (prevents healing during split-brain)
if !self.state.has_majority().await { if !self.state.has_majority().await {
tracing::warn!("No majority quorum, skipping heal to prevent split-brain"); tracing::warn!("No majority quorum, skipping heal to prevent split-brain");
return Ok(stats); return Ok(stats);
} }
// TODO: Iterate all manifests, find shards on offline nodes, tracing::info!(
// reconstruct from remaining shards and place on healthy nodes. "Found {} offline nodes, scanning for affected shards",
// This requires access to the DistributedStore and manifest listing offline_nodes.len()
// which will be wired in when the full healing pipeline is implemented. );
// Iterate all bucket directories under manifest_dir
let mut bucket_entries = match fs::read_dir(&self.manifest_dir).await {
Ok(e) => e,
Err(_) => return Ok(stats),
};
while let Some(bucket_entry) = bucket_entries.next_entry().await? {
if !bucket_entry.metadata().await?.is_dir() {
continue;
}
let bucket_name = bucket_entry.file_name().to_string_lossy().to_string();
if bucket_name.starts_with('.') {
continue;
}
// Scan manifests in this bucket
self.heal_bucket(&bucket_name, &offline_nodes, &mut stats)
.await;
// Yield to avoid starving foreground I/O
tokio::task::yield_now().await;
}
Ok(stats) Ok(stats)
} }
async fn heal_bucket(
&self,
bucket: &str,
offline_nodes: &[String],
stats: &mut HealStats,
) {
let bucket_dir = self.manifest_dir.join(bucket);
let manifests = match self.collect_manifests(&bucket_dir).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(bucket = bucket, error = %e, "Failed to list manifests");
stats.errors += 1;
return;
}
};
let local_id = self.state.local_node_id().to_string();
for manifest in &manifests {
for chunk in &manifest.chunks {
// Check if any shard in this chunk is on an offline node
let affected: Vec<_> = chunk
.shard_placements
.iter()
.filter(|p| offline_nodes.contains(&p.node_id))
.collect();
if affected.is_empty() {
continue;
}
stats.shards_checked += chunk.shard_placements.len() as u64;
// Try to reconstruct missing shards from available ones
let k = manifest.data_shards;
let total = manifest.data_shards + manifest.parity_shards;
// Count available shards (those NOT on offline nodes)
let available_count = chunk
.shard_placements
.iter()
.filter(|p| !offline_nodes.contains(&p.node_id))
.count();
if available_count < k {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
available = available_count,
needed = k,
"Cannot heal chunk: not enough available shards"
);
stats.errors += 1;
continue;
}
// Fetch available shards (only local ones for now)
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut fetched = 0usize;
for placement in &chunk.shard_placements {
if offline_nodes.contains(&placement.node_id) {
continue; // Skip offline nodes
}
if fetched >= k {
break;
}
if placement.node_id == local_id {
let shard_id = ShardId {
bucket: manifest.bucket.clone(),
key: manifest.key.clone(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
if let Some(store) = self.local_shard_stores.get(store_idx) {
if let Ok((data, _)) = store.read_shard(&shard_id).await {
shards[placement.shard_index as usize] = Some(data);
fetched += 1;
}
}
}
// TODO: fetch from other online remote nodes
}
if fetched < k {
tracing::warn!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Not enough local shards to heal, skipping"
);
continue;
}
// Reconstruct all shards
let reconstructed = match self.erasure_coder.decode_chunk(
&mut shards,
chunk.data_size,
) {
Ok(_) => true,
Err(e) => {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
error = %e,
"Reconstruction failed"
);
stats.errors += 1;
false
}
};
if !reconstructed {
continue;
}
// Re-encode to get all shards back (including the missing ones)
let full_data_size = chunk.data_size;
let mut data_buf = Vec::with_capacity(full_data_size);
for i in 0..k {
if let Some(ref shard) = shards[i] {
data_buf.extend_from_slice(shard);
}
}
data_buf.truncate(full_data_size);
let all_shards = match self.erasure_coder.encode_chunk(&data_buf) {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "Re-encoding for heal failed");
stats.errors += 1;
continue;
}
};
// Verify reconstructed shards are consistent
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Shard verification failed after reconstruction"
);
stats.errors += 1;
continue;
}
// Write the missing shards to the first available local drive
for affected_placement in &affected {
let shard_idx = affected_placement.shard_index as usize;
if shard_idx < all_shards.len() {
let shard_data = &all_shards[shard_idx];
let checksum = crc32c::crc32c(shard_data);
let shard_id = ShardId {
bucket: manifest.bucket.clone(),
key: manifest.key.clone(),
chunk_index: chunk.chunk_index,
shard_index: affected_placement.shard_index,
};
// Place on first available local drive
if let Some(store) = self.local_shard_stores.first() {
match store.write_shard(&shard_id, shard_data, checksum).await {
Ok(()) => {
stats.shards_healed += 1;
tracing::info!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
shard = affected_placement.shard_index,
"Shard healed successfully"
);
}
Err(e) => {
tracing::error!(error = %e, "Failed to write healed shard");
stats.errors += 1;
}
}
}
}
}
tokio::task::yield_now().await;
}
}
}
/// Collect all manifests under a bucket directory.
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
let mut manifests = Vec::new();
self.collect_manifests_recursive(dir, &mut manifests).await?;
Ok(manifests)
}
fn collect_manifests_recursive<'a>(
&'a self,
dir: &'a std::path::Path,
manifests: &'a mut Vec<ObjectManifest>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let mut entries = match fs::read_dir(dir).await {
Ok(e) => e,
Err(_) => return Ok(()),
};
while let Some(entry) = entries.next_entry().await? {
let meta = entry.metadata().await?;
let name = entry.file_name().to_string_lossy().to_string();
if meta.is_dir() {
self.collect_manifests_recursive(&entry.path(), manifests)
.await?;
} else if name.ends_with(".manifest.json") {
if let Ok(content) = fs::read_to_string(entry.path()).await {
if let Ok(manifest) = serde_json::from_str::<ObjectManifest>(&content) {
manifests.push(manifest);
}
}
}
}
Ok(())
})
}
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]

View File

@@ -3,8 +3,12 @@ use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::Mutex;
use super::drive_manager::{DriveManager, DriveStatus};
use super::protocol::{ use super::protocol::{
ClusterRequest, ClusterResponse, HeartbeatMessage, JoinRequestMessage, NodeInfo, ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage,
NodeInfo,
}; };
use super::quic_transport::QuicTransport; use super::quic_transport::QuicTransport;
use super::state::ClusterState; use super::state::ClusterState;
@@ -15,6 +19,7 @@ pub struct MembershipManager {
transport: Arc<QuicTransport>, transport: Arc<QuicTransport>,
heartbeat_interval: Duration, heartbeat_interval: Duration,
local_node_info: NodeInfo, local_node_info: NodeInfo,
drive_manager: Option<Arc<Mutex<DriveManager>>>,
} }
impl MembershipManager { impl MembershipManager {
@@ -29,9 +34,16 @@ impl MembershipManager {
transport, transport,
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms), heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
local_node_info, local_node_info,
drive_manager: None,
} }
} }
/// Set the drive manager for health reporting in heartbeats.
pub fn with_drive_manager(mut self, dm: Arc<Mutex<DriveManager>>) -> Self {
self.drive_manager = Some(dm);
self
}
/// Join the cluster by contacting seed nodes. /// Join the cluster by contacting seed nodes.
/// Sends a JoinRequest to each seed node until one accepts. /// Sends a JoinRequest to each seed node until one accepts.
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> { pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
@@ -129,6 +141,9 @@ impl MembershipManager {
let topology_version = self.state.version().await; let topology_version = self.state.version().await;
let mut responded = Vec::new(); let mut responded = Vec::new();
// Collect drive health states
let drive_states = self.collect_drive_states().await;
for peer in &peers { for peer in &peers {
let addr: SocketAddr = match peer.quic_addr.parse() { let addr: SocketAddr = match peer.quic_addr.parse() {
Ok(a) => a, Ok(a) => a,
@@ -138,7 +153,7 @@ impl MembershipManager {
let heartbeat = ClusterRequest::Heartbeat(HeartbeatMessage { let heartbeat = ClusterRequest::Heartbeat(HeartbeatMessage {
node_id: self.local_node_info.node_id.clone(), node_id: self.local_node_info.node_id.clone(),
timestamp: chrono::Utc::now().to_rfc3339(), timestamp: chrono::Utc::now().to_rfc3339(),
drive_states: Vec::new(), // TODO: populate from DriveManager drive_states: drive_states.clone(),
topology_version, topology_version,
}); });
@@ -181,4 +196,31 @@ impl MembershipManager {
let _response = self.transport.send_request(&conn, heartbeat).await?; let _response = self.transport.send_request(&conn, heartbeat).await?;
Ok(()) Ok(())
} }
/// Collect drive health states from the DriveManager, if available.
async fn collect_drive_states(&self) -> Vec<DriveStateInfo> {
let dm = match &self.drive_manager {
Some(dm) => dm,
None => return Vec::new(),
};
let mut manager = dm.lock().await;
let results = manager.check_all_drives().await;
results
.into_iter()
.map(|(idx, status)| {
let status_str = match status {
DriveStatus::Online => "online",
DriveStatus::Degraded => "degraded",
DriveStatus::Offline => "offline",
DriveStatus::Healing => "healing",
};
DriveStateInfo {
drive_index: idx as u32,
status: status_str.to_string(),
}
})
.collect()
}
} }

View File

@@ -1,3 +1,7 @@
// Cluster modules contain forward-looking public API that is incrementally wired.
// Allow dead_code for methods/structs not yet called from the main server path.
#![allow(dead_code)]
pub mod config; pub mod config;
pub mod coordinator; pub mod coordinator;
pub mod drive_manager; pub mod drive_manager;

View File

@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer}; use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::protocol::{ use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest, self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
}; };
@@ -225,20 +223,14 @@ impl QuicTransport {
mut recv: quinn::RecvStream, mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>, shard_store: Arc<ShardStore>,
) -> Result<()> { ) -> Result<()> {
// Read the length-prefixed request header // Read the full request (length-prefixed bincode + optional trailing data)
let mut len_buf = [0u8; 4]; let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
recv.read_exact(&mut len_buf).await?; let (request, header_len) = protocol::decode_request(&raw)?;
let msg_len = u32::from_le_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
recv.read_exact(&mut msg_buf).await?;
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
match request { match request {
ClusterRequest::ShardWrite(write_req) => { ClusterRequest::ShardWrite(write_req) => {
// Read shard data from the stream // Shard data follows the header in the raw buffer
let mut shard_data = vec![0u8; write_req.shard_data_length as usize]; let shard_data = &raw[header_len..];
recv.read_exact(&mut shard_data).await?;
let shard_id = ShardId { let shard_id = ShardId {
bucket: write_req.bucket, bucket: write_req.bucket,
@@ -348,8 +340,6 @@ impl QuicTransport {
// will be handled by the membership and coordinator modules. // will be handled by the membership and coordinator modules.
// For now, send a generic ack. // For now, send a generic ack.
_ => { _ => {
let response_data = recv.read_to_end(0).await.unwrap_or_default();
drop(response_data);
let err = protocol::ErrorResponse { let err = protocol::ErrorResponse {
request_id: String::new(), request_id: String::new(),
code: "NotImplemented".to_string(), code: "NotImplemented".to_string(),

View File

@@ -1,6 +1,6 @@
use anyhow::Result; use anyhow::Result;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;

View File

@@ -24,7 +24,8 @@ use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore}; use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError; use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore; use crate::cluster::coordinator::DistributedStore;
use crate::cluster::config::ErasureConfig; use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager; use crate::cluster::membership::MembershipManager;
use crate::cluster::placement; use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo; use crate::cluster::protocol::NodeInfo;
@@ -217,20 +218,34 @@ impl StorageServer {
}; };
cluster_state.add_node(local_node_info.clone()).await; cluster_state.add_node(local_node_info.clone()).await;
// Join cluster if seed nodes are configured // Initialize drive manager for health monitoring
let membership = Arc::new(MembershipManager::new( let drive_manager = Arc::new(tokio::sync::Mutex::new(
cluster_state.clone(), DriveManager::new(&cluster_config.drives).await?,
transport.clone(),
cluster_config.heartbeat_interval_ms,
local_node_info,
)); ));
// Join cluster if seed nodes are configured
let membership = Arc::new(
MembershipManager::new(
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
local_node_info,
)
.with_drive_manager(drive_manager),
);
membership membership
.join_cluster(&cluster_config.seed_nodes) .join_cluster(&cluster_config.seed_nodes)
.await?; .await?;
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
.iter()
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
// Start QUIC accept loop for incoming connections // Start QUIC accept loop for incoming connections
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone())); let shard_store_for_accept = local_shard_stores[0].clone();
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone(); let transport_clone = transport.clone();
tokio::spawn(async move { tokio::spawn(async move {
transport_clone transport_clone
@@ -240,11 +255,24 @@ impl StorageServer {
// Start heartbeat loop // Start heartbeat loop
let membership_clone = membership.clone(); let membership_clone = membership.clone();
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move { tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await; membership_clone.heartbeat_loop(hb_shutdown_rx).await;
}); });
// Start healing service
let healing_service = HealingService::new(
cluster_state.clone(),
&erasure_config,
local_shard_stores.clone(),
manifest_dir.clone(),
24, // scan every 24 hours
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
// Create distributed store // Create distributed store
let distributed_store = DistributedStore::new( let distributed_store = DistributedStore::new(
cluster_state, cluster_state,

View File

@@ -811,14 +811,18 @@ impl StorageBackend {
pub fn policies_dir(&self) -> std::path::PathBuf { pub fn policies_dir(&self) -> std::path::PathBuf {
match self { match self {
StorageBackend::Standalone(fs) => fs.policies_dir(), StorageBackend::Standalone(fs) => fs.policies_dir(),
StorageBackend::Clustered(_) => PathBuf::from(".policies"), // TODO: proper policies in cluster mode StorageBackend::Clustered(ds) => ds.policies_dir(),
} }
} }
pub async fn initialize(&self) -> Result<()> { pub async fn initialize(&self) -> Result<()> {
match self { match self {
StorageBackend::Standalone(fs) => fs.initialize().await, StorageBackend::Standalone(fs) => fs.initialize().await,
StorageBackend::Clustered(_) => Ok(()), // Cluster init happens separately StorageBackend::Clustered(ds) => {
// Ensure policies directory exists
tokio::fs::create_dir_all(ds.policies_dir()).await?;
Ok(())
}
} }
} }

View File

@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartstorage', name: '@push.rocks/smartstorage',
version: '6.1.0', version: '6.3.2',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.' description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
} }