Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a009d990d0 | |||
| 08d545f5db | |||
| a0a282c712 | |||
| 3eb0045676 |
15
changelog.md
15
changelog.md
@@ -1,5 +1,20 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-03-21 - 6.3.0 - feat(readme)
|
||||||
|
document distributed cluster mode, erasure coding, and QUIC-based architecture
|
||||||
|
|
||||||
|
- Expand README overview and feature matrix to highlight clustering, multi-drive awareness, and distributed storage capabilities
|
||||||
|
- Add standalone and cluster mode usage examples plus cluster configuration options
|
||||||
|
- Document clustering internals including erasure coding, quorum behavior, QUIC transport, self-healing, and on-disk layout
|
||||||
|
|
||||||
|
## 2026-03-21 - 6.2.0 - feat(cluster)
|
||||||
|
add shard healing, drive health heartbeats, and clustered policy directory support
|
||||||
|
|
||||||
|
- implements manifest-based healing that scans affected shards on offline nodes, reconstructs data with erasure coding, and rewrites recovered shards to local storage
|
||||||
|
- includes drive status reporting in membership heartbeats by wiring DriveManager health checks into cluster heartbeat messages
|
||||||
|
- adds clustered policies directory initialization and exposes policy storage paths from the distributed coordinator
|
||||||
|
- extends distributed coordinator support for remote shard read and delete operations plus multipart upload session metadata
|
||||||
|
|
||||||
## 2026-03-21 - 6.1.0 - feat(cluster)
|
## 2026-03-21 - 6.1.0 - feat(cluster)
|
||||||
add clustered storage backend with QUIC transport, erasure coding, and shard management
|
add clustered storage backend with QUIC transport, erasure coding, and shard management
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartstorage",
|
"name": "@push.rocks/smartstorage",
|
||||||
"version": "6.1.0",
|
"version": "6.3.0",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
|
|||||||
239
readme.md
239
readme.md
@@ -1,6 +1,6 @@
|
|||||||
# @push.rocks/smartstorage
|
# @push.rocks/smartstorage
|
||||||
|
|
||||||
A high-performance, S3-compatible local storage server powered by a **Rust core** with a clean TypeScript API. Drop-in replacement for AWS S3 during development and testing — no cloud, no Docker, no MinIO. Just `npm install` and go.
|
A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just `npm install` and go. 🚀
|
||||||
|
|
||||||
## Issue Reporting and Security
|
## Issue Reporting and Security
|
||||||
|
|
||||||
@@ -15,23 +15,34 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
|
|||||||
| Large file uploads | Streaming, zero-copy | Yes | OOM risk |
|
| Large file uploads | Streaming, zero-copy | Yes | OOM risk |
|
||||||
| Range requests | Seek-based | Yes | Full read |
|
| Range requests | Seek-based | Yes | Full read |
|
||||||
| Language | Rust + TypeScript | Go | JavaScript |
|
| Language | Rust + TypeScript | Go | JavaScript |
|
||||||
| Multipart uploads | Full support | Yes | No |
|
| Multipart uploads | ✅ Full support | Yes | No |
|
||||||
| Auth | AWS SigV4 (full verification) | Full IAM | Basic |
|
| Auth | AWS SigV4 (full verification) | Full IAM | Basic |
|
||||||
| Bucket policies | IAM-style evaluation | Yes | No |
|
| Bucket policies | IAM-style evaluation | Yes | No |
|
||||||
|
| Clustering | ✅ Erasure-coded, QUIC | Yes | No |
|
||||||
|
| Multi-drive awareness | ✅ Per-drive health | Yes | No |
|
||||||
|
|
||||||
### Core Features
|
### Core Features
|
||||||
|
|
||||||
- **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure
|
- 🦀 **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure
|
||||||
- **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client
|
- 📦 **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client
|
||||||
- **Filesystem-backed storage** — buckets map to directories, objects to files
|
- 💾 **Filesystem-backed storage** — buckets map to directories, objects to files
|
||||||
- **Streaming multipart uploads** — large files without memory pressure
|
- 📤 **Streaming multipart uploads** — large files without memory pressure
|
||||||
- **Byte-range requests** — `seek()` directly to the requested byte offset
|
- 📐 **Byte-range requests** — `seek()` directly to the requested byte offset
|
||||||
- **AWS SigV4 authentication** — full signature verification with constant-time comparison and 15-min clock skew enforcement
|
- 🔐 **AWS SigV4 authentication** — full signature verification with constant-time comparison
|
||||||
- **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation, wildcard matching, and anonymous access support
|
- 📋 **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation and wildcard matching
|
||||||
- **CORS middleware** — configurable cross-origin support
|
- 🌐 **CORS middleware** — configurable cross-origin support
|
||||||
- **Structured logging** — tracing-based, error through debug levels
|
- 🧹 **Clean slate mode** — wipe storage on startup for test isolation
|
||||||
- **Clean slate mode** — wipe storage on startup for test isolation
|
- ⚡ **Test-first design** — start/stop in milliseconds, no port conflicts
|
||||||
- **Test-first design** — start/stop in milliseconds, no port conflicts
|
|
||||||
|
### Clustering Features
|
||||||
|
|
||||||
|
- 🔗 **Erasure coding** — Reed-Solomon (configurable k data + m parity shards) for storage efficiency and fault tolerance
|
||||||
|
- 🚄 **QUIC transport** — multiplexed, encrypted inter-node communication via `quinn` with zero head-of-line blocking
|
||||||
|
- 💽 **Multi-drive awareness** — each node manages multiple independent storage paths with health monitoring
|
||||||
|
- 🤝 **Cluster membership** — static seed config + runtime join, heartbeat-based failure detection
|
||||||
|
- ✍️ **Quorum writes** — data is only acknowledged after k+1 shards are persisted
|
||||||
|
- 📖 **Quorum reads** — reconstruct from any k available shards, local-first fast path
|
||||||
|
- 🩹 **Self-healing** — background scanner detects and reconstructs missing/corrupt shards
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
@@ -43,6 +54,8 @@ pnpm add @push.rocks/smartstorage -D
|
|||||||
|
|
||||||
## Quick Start
|
## Quick Start
|
||||||
|
|
||||||
|
### Standalone Mode (Dev & Test)
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
import { SmartStorage } from '@push.rocks/smartstorage';
|
import { SmartStorage } from '@push.rocks/smartstorage';
|
||||||
|
|
||||||
@@ -63,6 +76,31 @@ const descriptor = await storage.getStorageDescriptor();
|
|||||||
await storage.stop();
|
await storage.stop();
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Cluster Mode (Distributed)
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
import { SmartStorage } from '@push.rocks/smartstorage';
|
||||||
|
|
||||||
|
const storage = await SmartStorage.createAndStart({
|
||||||
|
server: { port: 3000 },
|
||||||
|
cluster: {
|
||||||
|
enabled: true,
|
||||||
|
nodeId: 'node-1',
|
||||||
|
quicPort: 4000,
|
||||||
|
seedNodes: ['192.168.1.11:4000', '192.168.1.12:4000'],
|
||||||
|
erasure: {
|
||||||
|
dataShards: 4, // k: minimum shards to reconstruct data
|
||||||
|
parityShards: 2, // m: fault tolerance (can lose up to m shards)
|
||||||
|
},
|
||||||
|
drives: {
|
||||||
|
paths: ['/mnt/disk1', '/mnt/disk2', '/mnt/disk3'],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
Objects are automatically split into chunks (default 4 MB), erasure-coded into 6 shards (4 data + 2 parity), and distributed across drives/nodes. Any 4 of 6 shards can reconstruct the original data.
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
All config fields are optional — sensible defaults are applied automatically.
|
All config fields are optional — sensible defaults are applied automatically.
|
||||||
@@ -75,7 +113,7 @@ const config: ISmartStorageConfig = {
|
|||||||
port: 3000, // Default: 3000
|
port: 3000, // Default: 3000
|
||||||
address: '0.0.0.0', // Default: '0.0.0.0'
|
address: '0.0.0.0', // Default: '0.0.0.0'
|
||||||
silent: false, // Default: false
|
silent: false, // Default: false
|
||||||
region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing
|
region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing
|
||||||
},
|
},
|
||||||
storage: {
|
storage: {
|
||||||
directory: './my-data', // Default: .nogit/bucketsDir
|
directory: './my-data', // Default: .nogit/bucketsDir
|
||||||
@@ -111,6 +149,22 @@ const config: ISmartStorageConfig = {
|
|||||||
expirationDays: 7,
|
expirationDays: 7,
|
||||||
cleanupIntervalMinutes: 60,
|
cleanupIntervalMinutes: 60,
|
||||||
},
|
},
|
||||||
|
cluster: { // Optional — omit for standalone mode
|
||||||
|
enabled: true,
|
||||||
|
nodeId: 'node-1', // Auto-generated UUID if omitted
|
||||||
|
quicPort: 4000, // Default: 4000
|
||||||
|
seedNodes: [], // Addresses of existing cluster members
|
||||||
|
erasure: {
|
||||||
|
dataShards: 4, // Default: 4
|
||||||
|
parityShards: 2, // Default: 2
|
||||||
|
chunkSizeBytes: 4194304, // Default: 4 MB
|
||||||
|
},
|
||||||
|
drives: {
|
||||||
|
paths: ['/mnt/disk1', '/mnt/disk2'],
|
||||||
|
},
|
||||||
|
heartbeatIntervalMs: 5000, // Default: 5000
|
||||||
|
heartbeatTimeoutMs: 30000, // Default: 30000
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
const storage = await SmartStorage.createAndStart(config);
|
const storage = await SmartStorage.createAndStart(config);
|
||||||
@@ -207,7 +261,7 @@ const files = await dir.listFiles();
|
|||||||
|
|
||||||
## Multipart Uploads
|
## Multipart Uploads
|
||||||
|
|
||||||
For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory.
|
For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory. In cluster mode, each part is independently erasure-coded and distributed.
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
import {
|
import {
|
||||||
@@ -255,8 +309,6 @@ When `auth.enabled` is `true`, the auth pipeline works as follows:
|
|||||||
|
|
||||||
### Setting a Bucket Policy
|
### Setting a Bucket Policy
|
||||||
|
|
||||||
Use the S3 `PutBucketPolicy` API (or any S3 client that supports it):
|
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
import { PutBucketPolicyCommand } from '@aws-sdk/client-s3';
|
import { PutBucketPolicyCommand } from '@aws-sdk/client-s3';
|
||||||
|
|
||||||
@@ -294,6 +346,81 @@ await client.send(new PutBucketPolicyCommand({
|
|||||||
|
|
||||||
Deleting a bucket automatically removes its associated policy.
|
Deleting a bucket automatically removes its associated policy.
|
||||||
|
|
||||||
|
## Clustering Deep Dive 🔗
|
||||||
|
|
||||||
|
smartstorage can run as a distributed storage cluster where multiple nodes cooperate to store and retrieve data with built-in redundancy.
|
||||||
|
|
||||||
|
### How It Works
|
||||||
|
|
||||||
|
```
|
||||||
|
Client ──HTTP PUT──▶ Node A (coordinator)
|
||||||
|
│
|
||||||
|
├─ Split object into 4 MB chunks
|
||||||
|
├─ Erasure-code each chunk (4 data + 2 parity = 6 shards)
|
||||||
|
│
|
||||||
|
├──QUIC──▶ Node B (shard writes)
|
||||||
|
├──QUIC──▶ Node C (shard writes)
|
||||||
|
└─ Local disk (shard writes)
|
||||||
|
```
|
||||||
|
|
||||||
|
1. **Any node can coordinate** — the client connects to any cluster member
|
||||||
|
2. **Objects are chunked** — large objects split into fixed-size pieces (default 4 MB)
|
||||||
|
3. **Each chunk is erasure-coded** — Reed-Solomon produces k data + m parity shards
|
||||||
|
4. **Shards are distributed** — placed across different nodes and drives for fault isolation
|
||||||
|
5. **Quorum guarantees consistency** — writes need k+1 acks, reads need k shards
|
||||||
|
|
||||||
|
### Erasure Coding
|
||||||
|
|
||||||
|
With the default `4+2` configuration:
|
||||||
|
- Storage overhead: **33%** (vs. 200% for 3x replication)
|
||||||
|
- Fault tolerance: **any 2 drives/nodes can fail** simultaneously
|
||||||
|
- Read efficiency: only **4 of 6 shards** needed to reconstruct data
|
||||||
|
|
||||||
|
| Config | Total Shards | Overhead | Tolerance | Min Nodes |
|
||||||
|
|--------|-------------|----------|-----------|-----------|
|
||||||
|
| 4+2 | 6 | 33% | 2 failures | 3 |
|
||||||
|
| 6+3 | 9 | 50% | 3 failures | 5 |
|
||||||
|
| 2+1 | 3 | 50% | 1 failure | 2 |
|
||||||
|
|
||||||
|
### QUIC Transport
|
||||||
|
|
||||||
|
Inter-node communication uses [QUIC](https://en.wikipedia.org/wiki/QUIC) via the `quinn` library:
|
||||||
|
- 🔒 **Built-in TLS** — self-signed certs auto-generated at cluster init
|
||||||
|
- 🔀 **Multiplexed streams** — concurrent shard transfers without head-of-line blocking
|
||||||
|
- ⚡ **Connection pooling** — persistent connections to peer nodes
|
||||||
|
- 🌊 **Natural backpressure** — QUIC flow control prevents overloading slow peers
|
||||||
|
|
||||||
|
### Cluster Membership
|
||||||
|
|
||||||
|
- **Static seed nodes** — initial cluster defined in config
|
||||||
|
- **Runtime join** — new nodes can join a running cluster
|
||||||
|
- **Heartbeat monitoring** — every 5s (configurable), with suspect/offline detection
|
||||||
|
- **Split-brain prevention** — nodes only mark peers offline when they have majority
|
||||||
|
|
||||||
|
### Self-Healing
|
||||||
|
|
||||||
|
A background scanner periodically (default: every 24h):
|
||||||
|
1. Checks shard checksums (CRC32C) for bit-rot detection
|
||||||
|
2. Identifies shards on offline nodes
|
||||||
|
3. Reconstructs missing shards from remaining data using Reed-Solomon
|
||||||
|
4. Places healed shards on healthy drives
|
||||||
|
|
||||||
|
Healing runs at low priority to avoid impacting foreground I/O.
|
||||||
|
|
||||||
|
### Erasure Set Formation
|
||||||
|
|
||||||
|
Drives are organized into fixed **erasure sets** at cluster initialization:
|
||||||
|
|
||||||
|
```
|
||||||
|
3 nodes × 4 drives each = 12 drives total
|
||||||
|
With 6-shard erasure sets → 2 erasure sets
|
||||||
|
|
||||||
|
Set 0: Node1-Disk0, Node2-Disk0, Node3-Disk0, Node1-Disk1, Node2-Disk1, Node3-Disk1
|
||||||
|
Set 1: Node1-Disk2, Node2-Disk2, Node3-Disk2, Node1-Disk3, Node2-Disk3, Node3-Disk3
|
||||||
|
```
|
||||||
|
|
||||||
|
Drives are interleaved across nodes for maximum fault isolation. New nodes form new erasure sets — existing data is never rebalanced.
|
||||||
|
|
||||||
## Testing Integration
|
## Testing Integration
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
@@ -358,31 +485,37 @@ Get connection details for S3-compatible clients. Returns:
|
|||||||
smartstorage uses a **hybrid Rust + TypeScript** architecture:
|
smartstorage uses a **hybrid Rust + TypeScript** architecture:
|
||||||
|
|
||||||
```
|
```
|
||||||
┌─────────────────────────────────┐
|
┌──────────────────────────────────────────────┐
|
||||||
│ Your Code (AWS SDK, etc.) │
|
│ Your Code (AWS SDK, SmartBucket, etc.) │
|
||||||
│ ↕ HTTP (localhost:3000) │
|
│ ↕ HTTP (localhost:3000) │
|
||||||
├─────────────────────────────────┤
|
├──────────────────────────────────────────────┤
|
||||||
│ ruststorage binary (Rust) │
|
│ ruststorage binary (Rust) │
|
||||||
│ ├─ hyper 1.x HTTP server │
|
│ ├─ hyper 1.x HTTP server │
|
||||||
│ ├─ S3 path-style routing │
|
│ ├─ S3 path-style routing │
|
||||||
│ ├─ Streaming storage layer │
|
│ ├─ StorageBackend (Standalone or Clustered) │
|
||||||
│ ├─ Multipart manager │
|
│ │ ├─ FileStore (single-node mode) │
|
||||||
│ ├─ SigV4 auth + policy engine │
|
│ │ └─ DistributedStore (cluster mode) │
|
||||||
│ ├─ CORS middleware │
|
│ │ ├─ ErasureCoder (Reed-Solomon) │
|
||||||
│ └─ S3 XML response builder │
|
│ │ ├─ ShardStore (per-drive storage) │
|
||||||
├─────────────────────────────────┤
|
│ │ ├─ QuicTransport (quinn) │
|
||||||
│ TypeScript (thin IPC wrapper) │
|
│ │ ├─ ClusterState & Membership │
|
||||||
│ ├─ SmartStorage class │
|
│ │ └─ HealingService │
|
||||||
│ ├─ RustBridge (stdin/stdout) │
|
│ ├─ SigV4 auth + policy engine │
|
||||||
│ └─ Config & S3 descriptor │
|
│ ├─ CORS middleware │
|
||||||
└─────────────────────────────────┘
|
│ └─ S3 XML response builder │
|
||||||
|
├──────────────────────────────────────────────┤
|
||||||
|
│ TypeScript (thin IPC wrapper) │
|
||||||
|
│ ├─ SmartStorage class │
|
||||||
|
│ ├─ RustBridge (stdin/stdout JSON IPC) │
|
||||||
|
│ └─ Config & S3 descriptor │
|
||||||
|
└──────────────────────────────────────────────┘
|
||||||
```
|
```
|
||||||
|
|
||||||
**Why Rust?** The TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
|
**Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
|
||||||
|
|
||||||
**IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`.
|
**IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`, `clusterStatus`.
|
||||||
|
|
||||||
### S3-Compatible Operations Supported
|
### S3-Compatible Operations
|
||||||
|
|
||||||
| Operation | Method | Path |
|
| Operation | Method | Path |
|
||||||
|-----------|--------|------|
|
|-----------|--------|------|
|
||||||
@@ -407,26 +540,40 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
|
|||||||
|
|
||||||
### On-Disk Format
|
### On-Disk Format
|
||||||
|
|
||||||
|
**Standalone mode:**
|
||||||
```
|
```
|
||||||
{storage.directory}/
|
{storage.directory}/
|
||||||
{bucket}/
|
{bucket}/
|
||||||
{key}._storage_object # Object data
|
{key}._storage_object # Object data
|
||||||
{key}._storage_object.metadata.json # Metadata (content-type, x-amz-meta-*, etc.)
|
{key}._storage_object.metadata.json # Metadata (content-type, x-amz-meta-*, etc.)
|
||||||
{key}._storage_object.md5 # Cached MD5 hash
|
{key}._storage_object.md5 # Cached MD5 hash
|
||||||
.multipart/
|
.multipart/
|
||||||
{upload-id}/
|
{upload-id}/
|
||||||
metadata.json # Upload metadata (bucket, key, parts)
|
metadata.json # Upload metadata
|
||||||
part-1 # Part data files
|
part-1, part-2, ... # Part data files
|
||||||
part-2
|
|
||||||
...
|
|
||||||
.policies/
|
.policies/
|
||||||
{bucket}.policy.json # Bucket policy (IAM JSON format)
|
{bucket}.policy.json # Bucket policy (IAM JSON format)
|
||||||
|
```
|
||||||
|
|
||||||
|
**Cluster mode:**
|
||||||
|
```
|
||||||
|
{drive_path}/.smartstorage/
|
||||||
|
format.json # Drive metadata (cluster ID, erasure set)
|
||||||
|
data/{bucket}/{key_hash}/{key}/
|
||||||
|
chunk-{N}/shard-{M}.dat # Erasure-coded shard data
|
||||||
|
chunk-{N}/shard-{M}.meta # Shard metadata (checksum, size)
|
||||||
|
|
||||||
|
{storage.directory}/
|
||||||
|
.manifests/{bucket}/
|
||||||
|
{key}.manifest.json # Object manifest (shard placements, checksums)
|
||||||
|
.buckets/{bucket}/ # Bucket metadata
|
||||||
|
.policies/{bucket}.policy.json # Bucket policies
|
||||||
```
|
```
|
||||||
|
|
||||||
## Related Packages
|
## Related Packages
|
||||||
|
|
||||||
- [`@push.rocks/smartbucket`](https://code.foss.global/push.rocks/smartbucket) — High-level S3-compatible abstraction layer
|
- [`@push.rocks/smartbucket`](https://code.foss.global/push.rocks/smartbucket) — High-level S3-compatible abstraction layer
|
||||||
- [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript <-> Rust IPC bridge
|
- [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript ↔ Rust IPC bridge
|
||||||
- [`@git.zone/tsrust`](https://code.foss.global/git.zone/tsrust) — Rust cross-compilation for npm packages
|
- [`@git.zone/tsrust`](https://code.foss.global/git.zone/tsrust) — Rust cross-compilation for npm packages
|
||||||
|
|
||||||
## License and Legal Information
|
## License and Legal Information
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ use super::config::ErasureConfig;
|
|||||||
use super::erasure::ErasureCoder;
|
use super::erasure::ErasureCoder;
|
||||||
use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement};
|
use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement};
|
||||||
use super::placement::ErasureSet;
|
use super::placement::ErasureSet;
|
||||||
use super::protocol::ShardWriteRequest;
|
use super::protocol::{ClusterRequest, ShardDeleteRequest, ShardReadRequest, ShardWriteRequest};
|
||||||
use super::quic_transport::QuicTransport;
|
use super::quic_transport::QuicTransport;
|
||||||
use super::shard_store::{ShardId, ShardStore};
|
use super::shard_store::{ShardId, ShardStore};
|
||||||
use super::state::ClusterState;
|
use super::state::ClusterState;
|
||||||
@@ -22,6 +22,29 @@ use crate::storage::{
|
|||||||
ListObjectsResult, MultipartUploadInfo, PutResult,
|
ListObjectsResult, MultipartUploadInfo, PutResult,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Multipart upload session metadata.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
struct MultipartSession {
|
||||||
|
upload_id: String,
|
||||||
|
bucket: String,
|
||||||
|
key: String,
|
||||||
|
initiated: String,
|
||||||
|
metadata: HashMap<String, String>,
|
||||||
|
parts: HashMap<u32, String>, // part_number -> etag
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Per-part info stored during multipart upload.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
struct PartInfo {
|
||||||
|
part_number: u32,
|
||||||
|
etag: String,
|
||||||
|
size: u64,
|
||||||
|
part_key: String,
|
||||||
|
chunks: Vec<ChunkManifest>,
|
||||||
|
}
|
||||||
|
|
||||||
/// Distributed storage coordinator.
|
/// Distributed storage coordinator.
|
||||||
///
|
///
|
||||||
/// Handles S3 operations by distributing erasure-coded shards across
|
/// Handles S3 operations by distributing erasure-coded shards across
|
||||||
@@ -36,6 +59,8 @@ pub struct DistributedStore {
|
|||||||
manifest_dir: PathBuf,
|
manifest_dir: PathBuf,
|
||||||
/// Root directory for buckets metadata
|
/// Root directory for buckets metadata
|
||||||
buckets_dir: PathBuf,
|
buckets_dir: PathBuf,
|
||||||
|
/// Root directory for bucket policies
|
||||||
|
policies_dir: PathBuf,
|
||||||
erasure_config: ErasureConfig,
|
erasure_config: ErasureConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,6 +80,8 @@ impl DistributedStore {
|
|||||||
.map(|p| Arc::new(ShardStore::new(p.clone())))
|
.map(|p| Arc::new(ShardStore::new(p.clone())))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let policies_dir = buckets_dir.join(".policies");
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
state,
|
state,
|
||||||
transport,
|
transport,
|
||||||
@@ -62,10 +89,16 @@ impl DistributedStore {
|
|||||||
local_shard_stores,
|
local_shard_stores,
|
||||||
manifest_dir,
|
manifest_dir,
|
||||||
buckets_dir,
|
buckets_dir,
|
||||||
|
policies_dir,
|
||||||
erasure_config,
|
erasure_config,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get the policies directory path.
|
||||||
|
pub fn policies_dir(&self) -> PathBuf {
|
||||||
|
self.policies_dir.clone()
|
||||||
|
}
|
||||||
|
|
||||||
// ============================
|
// ============================
|
||||||
// Object operations
|
// Object operations
|
||||||
// ============================
|
// ============================
|
||||||
@@ -187,7 +220,9 @@ impl DistributedStore {
|
|||||||
let mut full_data = Vec::new();
|
let mut full_data = Vec::new();
|
||||||
for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) {
|
for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) {
|
||||||
let chunk = &manifest.chunks[chunk_idx];
|
let chunk = &manifest.chunks[chunk_idx];
|
||||||
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?;
|
let reconstructed = self
|
||||||
|
.fetch_and_reconstruct_chunk_for_object(chunk, bucket, key)
|
||||||
|
.await?;
|
||||||
full_data.extend_from_slice(&reconstructed);
|
full_data.extend_from_slice(&reconstructed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,26 +283,45 @@ impl DistributedStore {
|
|||||||
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
|
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
|
||||||
// Load manifest to find all shards
|
// Load manifest to find all shards
|
||||||
if let Ok(manifest) = self.load_manifest(bucket, key).await {
|
if let Ok(manifest) = self.load_manifest(bucket, key).await {
|
||||||
// Delete shards from all drives
|
let local_id = self.state.local_node_id().to_string();
|
||||||
|
|
||||||
|
// Delete shards from all drives (local and remote)
|
||||||
for chunk in &manifest.chunks {
|
for chunk in &manifest.chunks {
|
||||||
for placement in &chunk.shard_placements {
|
for placement in &chunk.shard_placements {
|
||||||
let shard_id = ShardId {
|
if placement.node_id == local_id {
|
||||||
bucket: bucket.to_string(),
|
|
||||||
key: key.to_string(),
|
|
||||||
chunk_index: chunk.chunk_index,
|
|
||||||
shard_index: placement.shard_index,
|
|
||||||
};
|
|
||||||
|
|
||||||
if placement.node_id == self.state.local_node_id() {
|
|
||||||
// Local delete
|
// Local delete
|
||||||
|
let shard_id = ShardId {
|
||||||
|
bucket: bucket.to_string(),
|
||||||
|
key: key.to_string(),
|
||||||
|
chunk_index: chunk.chunk_index,
|
||||||
|
shard_index: placement.shard_index,
|
||||||
|
};
|
||||||
if let Some(store) = self
|
if let Some(store) = self
|
||||||
.local_shard_stores
|
.local_shard_stores
|
||||||
.get(placement.drive_id.parse::<usize>().unwrap_or(0))
|
.get(placement.drive_id.parse::<usize>().unwrap_or(0))
|
||||||
{
|
{
|
||||||
let _ = store.delete_shard(&shard_id).await;
|
let _ = store.delete_shard(&shard_id).await;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Remote delete via QUIC (best-effort, don't fail the delete)
|
||||||
|
if let Err(e) = self
|
||||||
|
.delete_shard_remote(
|
||||||
|
&placement.node_id,
|
||||||
|
bucket,
|
||||||
|
key,
|
||||||
|
chunk.chunk_index,
|
||||||
|
placement.shard_index,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(
|
||||||
|
node = %placement.node_id,
|
||||||
|
shard = placement.shard_index,
|
||||||
|
error = %e,
|
||||||
|
"Failed to delete remote shard (will be cleaned up by healing)"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// TODO: send delete to remote nodes via QUIC
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -300,7 +354,9 @@ impl DistributedStore {
|
|||||||
// Read source object fully, then reconstruct
|
// Read source object fully, then reconstruct
|
||||||
let mut full_data = Vec::new();
|
let mut full_data = Vec::new();
|
||||||
for chunk in &src_manifest.chunks {
|
for chunk in &src_manifest.chunks {
|
||||||
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?;
|
let reconstructed = self
|
||||||
|
.fetch_and_reconstruct_chunk_for_object(chunk, src_bucket, src_key)
|
||||||
|
.await?;
|
||||||
full_data.extend_from_slice(&reconstructed);
|
full_data.extend_from_slice(&reconstructed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -526,45 +582,305 @@ impl DistributedStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ============================
|
// ============================
|
||||||
// Multipart (delegated to local temp storage for now)
|
// Multipart uploads
|
||||||
// ============================
|
// ============================
|
||||||
|
|
||||||
pub async fn initiate_multipart(
|
pub async fn initiate_multipart(
|
||||||
&self,
|
&self,
|
||||||
_bucket: &str,
|
bucket: &str,
|
||||||
_key: &str,
|
key: &str,
|
||||||
_metadata: HashMap<String, String>,
|
metadata: HashMap<String, String>,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
// TODO: Implement distributed multipart
|
if !self.bucket_exists(bucket).await {
|
||||||
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
|
return Err(crate::error::StorageError::no_such_bucket().into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let upload_id = uuid::Uuid::new_v4().to_string().replace('-', "");
|
||||||
|
let upload_dir = self.multipart_dir().join(&upload_id);
|
||||||
|
fs::create_dir_all(&upload_dir).await?;
|
||||||
|
|
||||||
|
// Store multipart session metadata
|
||||||
|
let session = MultipartSession {
|
||||||
|
upload_id: upload_id.clone(),
|
||||||
|
bucket: bucket.to_string(),
|
||||||
|
key: key.to_string(),
|
||||||
|
initiated: Utc::now().to_rfc3339(),
|
||||||
|
metadata,
|
||||||
|
parts: HashMap::new(),
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string_pretty(&session)?;
|
||||||
|
fs::write(upload_dir.join("session.json"), json).await?;
|
||||||
|
|
||||||
|
Ok(upload_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn upload_part(
|
pub async fn upload_part(
|
||||||
&self,
|
&self,
|
||||||
_upload_id: &str,
|
upload_id: &str,
|
||||||
_part_number: u32,
|
part_number: u32,
|
||||||
_body: Incoming,
|
body: Incoming,
|
||||||
) -> Result<(String, u64)> {
|
) -> Result<(String, u64)> {
|
||||||
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
|
let upload_dir = self.multipart_dir().join(upload_id);
|
||||||
|
if !upload_dir.is_dir() {
|
||||||
|
return Err(crate::error::StorageError::no_such_upload().into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read session to get bucket/key
|
||||||
|
let session = self.load_multipart_session(upload_id).await?;
|
||||||
|
|
||||||
|
let erasure_set = self
|
||||||
|
.state
|
||||||
|
.get_erasure_set_for_object(&session.bucket, &session.key)
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?;
|
||||||
|
|
||||||
|
// Buffer and erasure-code the part data
|
||||||
|
let chunk_size = self.erasure_config.chunk_size_bytes;
|
||||||
|
let mut chunk_buffer = Vec::with_capacity(chunk_size);
|
||||||
|
let mut chunk_index: u32 = 0;
|
||||||
|
let mut chunks = Vec::new();
|
||||||
|
let mut total_size: u64 = 0;
|
||||||
|
let mut hasher = Md5::new();
|
||||||
|
|
||||||
|
// Use upload_id + part_number as a unique key prefix for shard storage
|
||||||
|
let part_key = format!("{}/_multipart/{}/part-{}", session.key, upload_id, part_number);
|
||||||
|
|
||||||
|
let mut body = body;
|
||||||
|
loop {
|
||||||
|
match body.frame().await {
|
||||||
|
Some(Ok(frame)) => {
|
||||||
|
if let Ok(data) = frame.into_data() {
|
||||||
|
hasher.update(&data);
|
||||||
|
total_size += data.len() as u64;
|
||||||
|
chunk_buffer.extend_from_slice(&data);
|
||||||
|
|
||||||
|
while chunk_buffer.len() >= chunk_size {
|
||||||
|
let chunk_data: Vec<u8> =
|
||||||
|
chunk_buffer.drain(..chunk_size).collect();
|
||||||
|
let chunk_manifest = self
|
||||||
|
.encode_and_distribute_chunk(
|
||||||
|
&erasure_set,
|
||||||
|
&session.bucket,
|
||||||
|
&part_key,
|
||||||
|
chunk_index,
|
||||||
|
&chunk_data,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
chunks.push(chunk_manifest);
|
||||||
|
chunk_index += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Err(e)) => return Err(anyhow::anyhow!("Body read error: {}", e)),
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process final partial chunk
|
||||||
|
if !chunk_buffer.is_empty() {
|
||||||
|
let chunk_manifest = self
|
||||||
|
.encode_and_distribute_chunk(
|
||||||
|
&erasure_set,
|
||||||
|
&session.bucket,
|
||||||
|
&part_key,
|
||||||
|
chunk_index,
|
||||||
|
&chunk_buffer,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
chunks.push(chunk_manifest);
|
||||||
|
}
|
||||||
|
|
||||||
|
let etag = format!("{:x}", hasher.finalize());
|
||||||
|
|
||||||
|
// Save per-part manifest
|
||||||
|
let part_manifest = PartInfo {
|
||||||
|
part_number,
|
||||||
|
etag: etag.clone(),
|
||||||
|
size: total_size,
|
||||||
|
part_key: part_key.clone(),
|
||||||
|
chunks,
|
||||||
|
};
|
||||||
|
let part_json = serde_json::to_string_pretty(&part_manifest)?;
|
||||||
|
fs::write(
|
||||||
|
upload_dir.join(format!("part-{}.json", part_number)),
|
||||||
|
part_json,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok((etag, total_size))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn complete_multipart(
|
pub async fn complete_multipart(
|
||||||
&self,
|
&self,
|
||||||
_upload_id: &str,
|
upload_id: &str,
|
||||||
_parts: &[(u32, String)],
|
parts: &[(u32, String)],
|
||||||
) -> Result<CompleteMultipartResult> {
|
) -> Result<CompleteMultipartResult> {
|
||||||
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
|
let session = self.load_multipart_session(upload_id).await?;
|
||||||
|
let upload_dir = self.multipart_dir().join(upload_id);
|
||||||
|
|
||||||
|
// Read per-part manifests and concatenate chunks sequentially
|
||||||
|
let mut all_chunks = Vec::new();
|
||||||
|
let mut total_size: u64 = 0;
|
||||||
|
let mut full_hasher = Md5::new();
|
||||||
|
let mut global_chunk_index: u32 = 0;
|
||||||
|
|
||||||
|
for (part_number, _etag) in parts {
|
||||||
|
let part_path = upload_dir.join(format!("part-{}.json", part_number));
|
||||||
|
if !part_path.exists() {
|
||||||
|
return Err(anyhow::anyhow!("Part {} not found", part_number).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
let part_json = fs::read_to_string(&part_path).await?;
|
||||||
|
let part_info: PartInfo = serde_json::from_str(&part_json)?;
|
||||||
|
|
||||||
|
// Reconstruct part data to compute overall MD5
|
||||||
|
for chunk in &part_info.chunks {
|
||||||
|
let reconstructed = self
|
||||||
|
.fetch_and_reconstruct_chunk_for_object(
|
||||||
|
chunk,
|
||||||
|
&session.bucket,
|
||||||
|
&part_info.part_key,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
full_hasher.update(&reconstructed);
|
||||||
|
total_size += reconstructed.len() as u64;
|
||||||
|
|
||||||
|
// Re-index chunks to be sequential in the final object
|
||||||
|
let mut adjusted_chunk = chunk.clone();
|
||||||
|
adjusted_chunk.chunk_index = global_chunk_index;
|
||||||
|
all_chunks.push(adjusted_chunk);
|
||||||
|
global_chunk_index += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let etag = format!("{:x}", full_hasher.finalize());
|
||||||
|
|
||||||
|
// Build final object manifest
|
||||||
|
let manifest = ObjectManifest {
|
||||||
|
bucket: session.bucket.clone(),
|
||||||
|
key: session.key.clone(),
|
||||||
|
version_id: uuid::Uuid::new_v4().to_string(),
|
||||||
|
size: total_size,
|
||||||
|
content_md5: etag.clone(),
|
||||||
|
content_type: session
|
||||||
|
.metadata
|
||||||
|
.get("content-type")
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_else(|| "binary/octet-stream".to_string()),
|
||||||
|
metadata: session.metadata.clone(),
|
||||||
|
created_at: Utc::now().to_rfc3339(),
|
||||||
|
last_modified: Utc::now().to_rfc3339(),
|
||||||
|
data_shards: self.erasure_config.data_shards,
|
||||||
|
parity_shards: self.erasure_config.parity_shards,
|
||||||
|
chunk_size: self.erasure_config.chunk_size_bytes,
|
||||||
|
chunks: all_chunks,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.store_manifest(&manifest).await?;
|
||||||
|
|
||||||
|
// Clean up multipart upload directory
|
||||||
|
let _ = fs::remove_dir_all(&upload_dir).await;
|
||||||
|
|
||||||
|
Ok(CompleteMultipartResult { etag })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn abort_multipart(&self, _upload_id: &str) -> Result<()> {
|
pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
|
||||||
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
|
let upload_dir = self.multipart_dir().join(upload_id);
|
||||||
|
if !upload_dir.is_dir() {
|
||||||
|
return Err(crate::error::StorageError::no_such_upload().into());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load session to get bucket/key for shard cleanup
|
||||||
|
if let Ok(session) = self.load_multipart_session(upload_id).await {
|
||||||
|
// Read part manifests and delete their shards
|
||||||
|
let mut entries = fs::read_dir(&upload_dir).await?;
|
||||||
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
|
let name = entry.file_name().to_string_lossy().to_string();
|
||||||
|
if name.starts_with("part-") && name.ends_with(".json") {
|
||||||
|
if let Ok(content) = fs::read_to_string(entry.path()).await {
|
||||||
|
if let Ok(part_info) = serde_json::from_str::<PartInfo>(&content) {
|
||||||
|
let local_id = self.state.local_node_id().to_string();
|
||||||
|
for chunk in &part_info.chunks {
|
||||||
|
for placement in &chunk.shard_placements {
|
||||||
|
if placement.node_id == local_id {
|
||||||
|
let shard_id = ShardId {
|
||||||
|
bucket: session.bucket.clone(),
|
||||||
|
key: part_info.part_key.clone(),
|
||||||
|
chunk_index: chunk.chunk_index,
|
||||||
|
shard_index: placement.shard_index,
|
||||||
|
};
|
||||||
|
if let Some(store) = self.local_shard_stores.get(
|
||||||
|
placement.drive_id.parse::<usize>().unwrap_or(0),
|
||||||
|
) {
|
||||||
|
let _ = store.delete_shard(&shard_id).await;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let _ = self
|
||||||
|
.delete_shard_remote(
|
||||||
|
&placement.node_id,
|
||||||
|
&session.bucket,
|
||||||
|
&part_info.part_key,
|
||||||
|
chunk.chunk_index,
|
||||||
|
placement.shard_index,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = fs::remove_dir_all(&upload_dir).await;
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_multipart_uploads(
|
pub async fn list_multipart_uploads(
|
||||||
&self,
|
&self,
|
||||||
_bucket: &str,
|
bucket: &str,
|
||||||
) -> Result<Vec<MultipartUploadInfo>> {
|
) -> Result<Vec<MultipartUploadInfo>> {
|
||||||
Ok(Vec::new())
|
let multipart_dir = self.multipart_dir();
|
||||||
|
if !multipart_dir.is_dir() {
|
||||||
|
return Ok(Vec::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut uploads = Vec::new();
|
||||||
|
let mut entries = fs::read_dir(&multipart_dir).await?;
|
||||||
|
|
||||||
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
|
if !entry.metadata().await?.is_dir() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let session_path = entry.path().join("session.json");
|
||||||
|
if let Ok(content) = fs::read_to_string(&session_path).await {
|
||||||
|
if let Ok(session) = serde_json::from_str::<MultipartSession>(&content) {
|
||||||
|
if session.bucket == bucket {
|
||||||
|
let initiated = DateTime::parse_from_rfc3339(&session.initiated)
|
||||||
|
.map(|dt| dt.with_timezone(&Utc))
|
||||||
|
.unwrap_or_else(|_| Utc::now());
|
||||||
|
uploads.push(MultipartUploadInfo {
|
||||||
|
upload_id: session.upload_id,
|
||||||
|
key: session.key,
|
||||||
|
initiated,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(uploads)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn multipart_dir(&self) -> PathBuf {
|
||||||
|
self.manifest_dir.join(".multipart")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_multipart_session(&self, upload_id: &str) -> Result<MultipartSession> {
|
||||||
|
let session_path = self.multipart_dir().join(upload_id).join("session.json");
|
||||||
|
let content = fs::read_to_string(&session_path).await?;
|
||||||
|
Ok(serde_json::from_str(&content)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================
|
// ============================
|
||||||
@@ -721,43 +1037,62 @@ impl DistributedStore {
|
|||||||
// ============================
|
// ============================
|
||||||
|
|
||||||
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
|
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
|
||||||
|
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
|
||||||
|
async fn fetch_and_reconstruct_chunk_for_object(
|
||||||
|
&self,
|
||||||
|
chunk: &ChunkManifest,
|
||||||
|
bucket: &str,
|
||||||
|
key: &str,
|
||||||
|
) -> Result<Vec<u8>> {
|
||||||
let k = self.erasure_config.data_shards;
|
let k = self.erasure_config.data_shards;
|
||||||
let total = self.erasure_config.total_shards();
|
let total = self.erasure_config.total_shards();
|
||||||
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
|
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
|
||||||
let mut succeeded = 0usize;
|
let mut succeeded = 0usize;
|
||||||
|
|
||||||
// Try to fetch shards (local first, then remote)
|
// Sort placements: local first for fast path
|
||||||
for placement in &chunk.shard_placements {
|
let mut sorted_placements = chunk.shard_placements.clone();
|
||||||
|
let local_id = self.state.local_node_id().to_string();
|
||||||
|
sorted_placements.sort_by_key(|p| if p.node_id == local_id { 0 } else { 1 });
|
||||||
|
|
||||||
|
for placement in &sorted_placements {
|
||||||
|
if succeeded >= k {
|
||||||
|
break; // Have enough shards
|
||||||
|
}
|
||||||
|
|
||||||
let shard_id = ShardId {
|
let shard_id = ShardId {
|
||||||
bucket: String::new(), // Not needed for read
|
bucket: bucket.to_string(),
|
||||||
key: String::new(),
|
key: key.to_string(),
|
||||||
chunk_index: chunk.chunk_index,
|
chunk_index: chunk.chunk_index,
|
||||||
shard_index: placement.shard_index,
|
shard_index: placement.shard_index,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = if placement.node_id == self.state.local_node_id() {
|
let result = if placement.node_id == local_id {
|
||||||
// Local read
|
// Local read
|
||||||
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
|
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
|
||||||
if let Some(store) = self.local_shard_stores.get(store_idx) {
|
if let Some(store) = self.local_shard_stores.get(store_idx) {
|
||||||
// Need to set proper bucket/key on shard_id for local reads
|
|
||||||
// We get this from the chunk's context, but we don't have it here.
|
|
||||||
// This will be passed through the manifest's shard placements.
|
|
||||||
store.read_shard(&shard_id).await.ok()
|
store.read_shard(&shard_id).await.ok()
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Remote read via QUIC
|
// Remote read via QUIC
|
||||||
// TODO: implement remote shard read
|
self.read_shard_remote(
|
||||||
None
|
&placement.node_id,
|
||||||
|
bucket,
|
||||||
|
key,
|
||||||
|
chunk.chunk_index,
|
||||||
|
placement.shard_index,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
};
|
};
|
||||||
|
|
||||||
if let Some((data, _checksum)) = result {
|
if let Some((data, _checksum)) = result {
|
||||||
shards[placement.shard_index as usize] = Some(data);
|
shards[placement.shard_index as usize] = Some(data);
|
||||||
succeeded += 1;
|
succeeded += 1;
|
||||||
if succeeded >= k {
|
|
||||||
break; // Have enough shards
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -774,6 +1109,66 @@ impl DistributedStore {
|
|||||||
.decode_chunk(&mut shards, chunk.data_size)
|
.decode_chunk(&mut shards, chunk.data_size)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn read_shard_remote(
|
||||||
|
&self,
|
||||||
|
node_id: &str,
|
||||||
|
bucket: &str,
|
||||||
|
key: &str,
|
||||||
|
chunk_index: u32,
|
||||||
|
shard_index: u32,
|
||||||
|
) -> Result<(Vec<u8>, u32)> {
|
||||||
|
let node_info = self
|
||||||
|
.state
|
||||||
|
.get_node(node_id)
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?;
|
||||||
|
|
||||||
|
let addr: SocketAddr = node_info.quic_addr.parse()?;
|
||||||
|
let conn = self.transport.get_connection(node_id, addr).await?;
|
||||||
|
|
||||||
|
let request = ClusterRequest::ShardRead(ShardReadRequest {
|
||||||
|
request_id: uuid::Uuid::new_v4().to_string(),
|
||||||
|
bucket: bucket.to_string(),
|
||||||
|
key: key.to_string(),
|
||||||
|
chunk_index,
|
||||||
|
shard_index,
|
||||||
|
});
|
||||||
|
|
||||||
|
match self.transport.send_shard_read(&conn, &request).await? {
|
||||||
|
Some((data, checksum)) => Ok((data, checksum)),
|
||||||
|
None => anyhow::bail!("Shard not found on remote node"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn delete_shard_remote(
|
||||||
|
&self,
|
||||||
|
node_id: &str,
|
||||||
|
bucket: &str,
|
||||||
|
key: &str,
|
||||||
|
chunk_index: u32,
|
||||||
|
shard_index: u32,
|
||||||
|
) -> Result<()> {
|
||||||
|
let node_info = self
|
||||||
|
.state
|
||||||
|
.get_node(node_id)
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?;
|
||||||
|
|
||||||
|
let addr: SocketAddr = node_info.quic_addr.parse()?;
|
||||||
|
let conn = self.transport.get_connection(node_id, addr).await?;
|
||||||
|
|
||||||
|
let request = ClusterRequest::ShardDelete(ShardDeleteRequest {
|
||||||
|
request_id: uuid::Uuid::new_v4().to_string(),
|
||||||
|
bucket: bucket.to_string(),
|
||||||
|
key: key.to_string(),
|
||||||
|
chunk_index,
|
||||||
|
shard_index,
|
||||||
|
});
|
||||||
|
|
||||||
|
let _response = self.transport.send_request(&conn, &request).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// ============================
|
// ============================
|
||||||
// Manifest storage (local filesystem)
|
// Manifest storage (local filesystem)
|
||||||
// ============================
|
// ============================
|
||||||
|
|||||||
@@ -1,23 +1,40 @@
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
use tokio::fs;
|
||||||
|
|
||||||
use super::coordinator::DistributedStore;
|
use super::config::ErasureConfig;
|
||||||
|
use super::erasure::ErasureCoder;
|
||||||
|
use super::metadata::ObjectManifest;
|
||||||
|
use super::shard_store::{ShardId, ShardStore};
|
||||||
use super::state::ClusterState;
|
use super::state::ClusterState;
|
||||||
|
|
||||||
/// Background healing service that scans for under-replicated shards
|
/// Background healing service that scans for under-replicated shards
|
||||||
/// and reconstructs them.
|
/// and reconstructs them.
|
||||||
pub struct HealingService {
|
pub struct HealingService {
|
||||||
state: Arc<ClusterState>,
|
state: Arc<ClusterState>,
|
||||||
|
erasure_coder: ErasureCoder,
|
||||||
|
local_shard_stores: Vec<Arc<ShardStore>>,
|
||||||
|
manifest_dir: PathBuf,
|
||||||
scan_interval: Duration,
|
scan_interval: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HealingService {
|
impl HealingService {
|
||||||
pub fn new(state: Arc<ClusterState>, scan_interval_hours: u64) -> Self {
|
pub fn new(
|
||||||
Self {
|
state: Arc<ClusterState>,
|
||||||
|
erasure_config: &ErasureConfig,
|
||||||
|
local_shard_stores: Vec<Arc<ShardStore>>,
|
||||||
|
manifest_dir: PathBuf,
|
||||||
|
scan_interval_hours: u64,
|
||||||
|
) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
state,
|
state,
|
||||||
|
erasure_coder: ErasureCoder::new(erasure_config)?,
|
||||||
|
local_shard_stores,
|
||||||
|
manifest_dir,
|
||||||
scan_interval: Duration::from_secs(scan_interval_hours * 3600),
|
scan_interval: Duration::from_secs(scan_interval_hours * 3600),
|
||||||
}
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Run the healing loop as a background task.
|
/// Run the healing loop as a background task.
|
||||||
@@ -53,7 +70,7 @@ impl HealingService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Scan for offline nodes and identify objects that need healing.
|
/// Scan all manifests for shards on offline nodes, reconstruct and re-place them.
|
||||||
async fn heal_scan(&self) -> Result<HealStats> {
|
async fn heal_scan(&self) -> Result<HealStats> {
|
||||||
let mut stats = HealStats::default();
|
let mut stats = HealStats::default();
|
||||||
|
|
||||||
@@ -63,25 +80,260 @@ impl HealingService {
|
|||||||
return Ok(stats);
|
return Ok(stats);
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(
|
// Check that we have majority before healing (split-brain prevention)
|
||||||
"Found {} offline nodes, scanning for affected shards",
|
|
||||||
offline_nodes.len()
|
|
||||||
);
|
|
||||||
|
|
||||||
// Check that we have majority before healing
|
|
||||||
// (prevents healing during split-brain)
|
|
||||||
if !self.state.has_majority().await {
|
if !self.state.has_majority().await {
|
||||||
tracing::warn!("No majority quorum, skipping heal to prevent split-brain");
|
tracing::warn!("No majority quorum, skipping heal to prevent split-brain");
|
||||||
return Ok(stats);
|
return Ok(stats);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Iterate all manifests, find shards on offline nodes,
|
tracing::info!(
|
||||||
// reconstruct from remaining shards and place on healthy nodes.
|
"Found {} offline nodes, scanning for affected shards",
|
||||||
// This requires access to the DistributedStore and manifest listing
|
offline_nodes.len()
|
||||||
// which will be wired in when the full healing pipeline is implemented.
|
);
|
||||||
|
|
||||||
|
// Iterate all bucket directories under manifest_dir
|
||||||
|
let mut bucket_entries = match fs::read_dir(&self.manifest_dir).await {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(_) => return Ok(stats),
|
||||||
|
};
|
||||||
|
|
||||||
|
while let Some(bucket_entry) = bucket_entries.next_entry().await? {
|
||||||
|
if !bucket_entry.metadata().await?.is_dir() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let bucket_name = bucket_entry.file_name().to_string_lossy().to_string();
|
||||||
|
if bucket_name.starts_with('.') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scan manifests in this bucket
|
||||||
|
self.heal_bucket(&bucket_name, &offline_nodes, &mut stats)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Yield to avoid starving foreground I/O
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(stats)
|
Ok(stats)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn heal_bucket(
|
||||||
|
&self,
|
||||||
|
bucket: &str,
|
||||||
|
offline_nodes: &[String],
|
||||||
|
stats: &mut HealStats,
|
||||||
|
) {
|
||||||
|
let bucket_dir = self.manifest_dir.join(bucket);
|
||||||
|
let manifests = match self.collect_manifests(&bucket_dir).await {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(bucket = bucket, error = %e, "Failed to list manifests");
|
||||||
|
stats.errors += 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let local_id = self.state.local_node_id().to_string();
|
||||||
|
|
||||||
|
for manifest in &manifests {
|
||||||
|
for chunk in &manifest.chunks {
|
||||||
|
// Check if any shard in this chunk is on an offline node
|
||||||
|
let affected: Vec<_> = chunk
|
||||||
|
.shard_placements
|
||||||
|
.iter()
|
||||||
|
.filter(|p| offline_nodes.contains(&p.node_id))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if affected.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.shards_checked += chunk.shard_placements.len() as u64;
|
||||||
|
|
||||||
|
// Try to reconstruct missing shards from available ones
|
||||||
|
let k = manifest.data_shards;
|
||||||
|
let total = manifest.data_shards + manifest.parity_shards;
|
||||||
|
|
||||||
|
// Count available shards (those NOT on offline nodes)
|
||||||
|
let available_count = chunk
|
||||||
|
.shard_placements
|
||||||
|
.iter()
|
||||||
|
.filter(|p| !offline_nodes.contains(&p.node_id))
|
||||||
|
.count();
|
||||||
|
|
||||||
|
if available_count < k {
|
||||||
|
tracing::error!(
|
||||||
|
bucket = manifest.bucket,
|
||||||
|
key = manifest.key,
|
||||||
|
chunk = chunk.chunk_index,
|
||||||
|
available = available_count,
|
||||||
|
needed = k,
|
||||||
|
"Cannot heal chunk: not enough available shards"
|
||||||
|
);
|
||||||
|
stats.errors += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch available shards (only local ones for now)
|
||||||
|
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
|
||||||
|
let mut fetched = 0usize;
|
||||||
|
|
||||||
|
for placement in &chunk.shard_placements {
|
||||||
|
if offline_nodes.contains(&placement.node_id) {
|
||||||
|
continue; // Skip offline nodes
|
||||||
|
}
|
||||||
|
if fetched >= k {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if placement.node_id == local_id {
|
||||||
|
let shard_id = ShardId {
|
||||||
|
bucket: manifest.bucket.clone(),
|
||||||
|
key: manifest.key.clone(),
|
||||||
|
chunk_index: chunk.chunk_index,
|
||||||
|
shard_index: placement.shard_index,
|
||||||
|
};
|
||||||
|
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
|
||||||
|
if let Some(store) = self.local_shard_stores.get(store_idx) {
|
||||||
|
if let Ok((data, _)) = store.read_shard(&shard_id).await {
|
||||||
|
shards[placement.shard_index as usize] = Some(data);
|
||||||
|
fetched += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// TODO: fetch from other online remote nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
if fetched < k {
|
||||||
|
tracing::warn!(
|
||||||
|
bucket = manifest.bucket,
|
||||||
|
key = manifest.key,
|
||||||
|
chunk = chunk.chunk_index,
|
||||||
|
"Not enough local shards to heal, skipping"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconstruct all shards
|
||||||
|
let reconstructed = match self.erasure_coder.decode_chunk(
|
||||||
|
&mut shards,
|
||||||
|
chunk.data_size,
|
||||||
|
) {
|
||||||
|
Ok(_) => true,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
bucket = manifest.bucket,
|
||||||
|
key = manifest.key,
|
||||||
|
chunk = chunk.chunk_index,
|
||||||
|
error = %e,
|
||||||
|
"Reconstruction failed"
|
||||||
|
);
|
||||||
|
stats.errors += 1;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !reconstructed {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-encode to get all shards back (including the missing ones)
|
||||||
|
let full_data_size = chunk.data_size;
|
||||||
|
let mut data_buf = Vec::with_capacity(full_data_size);
|
||||||
|
for i in 0..k {
|
||||||
|
if let Some(ref shard) = shards[i] {
|
||||||
|
data_buf.extend_from_slice(shard);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
data_buf.truncate(full_data_size);
|
||||||
|
|
||||||
|
let all_shards = match self.erasure_coder.encode_chunk(&data_buf) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "Re-encoding for heal failed");
|
||||||
|
stats.errors += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Write the missing shards to the first available local drive
|
||||||
|
for affected_placement in &affected {
|
||||||
|
let shard_idx = affected_placement.shard_index as usize;
|
||||||
|
if shard_idx < all_shards.len() {
|
||||||
|
let shard_data = &all_shards[shard_idx];
|
||||||
|
let checksum = crc32c::crc32c(shard_data);
|
||||||
|
|
||||||
|
let shard_id = ShardId {
|
||||||
|
bucket: manifest.bucket.clone(),
|
||||||
|
key: manifest.key.clone(),
|
||||||
|
chunk_index: chunk.chunk_index,
|
||||||
|
shard_index: affected_placement.shard_index,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Place on first available local drive
|
||||||
|
if let Some(store) = self.local_shard_stores.first() {
|
||||||
|
match store.write_shard(&shard_id, shard_data, checksum).await {
|
||||||
|
Ok(()) => {
|
||||||
|
stats.shards_healed += 1;
|
||||||
|
tracing::info!(
|
||||||
|
bucket = manifest.bucket,
|
||||||
|
key = manifest.key,
|
||||||
|
chunk = chunk.chunk_index,
|
||||||
|
shard = affected_placement.shard_index,
|
||||||
|
"Shard healed successfully"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "Failed to write healed shard");
|
||||||
|
stats.errors += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::task::yield_now().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Collect all manifests under a bucket directory.
|
||||||
|
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
|
||||||
|
let mut manifests = Vec::new();
|
||||||
|
self.collect_manifests_recursive(dir, &mut manifests).await?;
|
||||||
|
Ok(manifests)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect_manifests_recursive<'a>(
|
||||||
|
&'a self,
|
||||||
|
dir: &'a std::path::Path,
|
||||||
|
manifests: &'a mut Vec<ObjectManifest>,
|
||||||
|
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
|
||||||
|
Box::pin(async move {
|
||||||
|
let mut entries = match fs::read_dir(dir).await {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(_) => return Ok(()),
|
||||||
|
};
|
||||||
|
|
||||||
|
while let Some(entry) = entries.next_entry().await? {
|
||||||
|
let meta = entry.metadata().await?;
|
||||||
|
let name = entry.file_name().to_string_lossy().to_string();
|
||||||
|
|
||||||
|
if meta.is_dir() {
|
||||||
|
self.collect_manifests_recursive(&entry.path(), manifests)
|
||||||
|
.await?;
|
||||||
|
} else if name.ends_with(".manifest.json") {
|
||||||
|
if let Ok(content) = fs::read_to_string(entry.path()).await {
|
||||||
|
if let Ok(manifest) = serde_json::from_str::<ObjectManifest>(&content) {
|
||||||
|
manifests.push(manifest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
|
|||||||
@@ -3,8 +3,12 @@ use std::net::SocketAddr;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use super::drive_manager::{DriveManager, DriveStatus};
|
||||||
use super::protocol::{
|
use super::protocol::{
|
||||||
ClusterRequest, ClusterResponse, HeartbeatMessage, JoinRequestMessage, NodeInfo,
|
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage,
|
||||||
|
NodeInfo,
|
||||||
};
|
};
|
||||||
use super::quic_transport::QuicTransport;
|
use super::quic_transport::QuicTransport;
|
||||||
use super::state::ClusterState;
|
use super::state::ClusterState;
|
||||||
@@ -15,6 +19,7 @@ pub struct MembershipManager {
|
|||||||
transport: Arc<QuicTransport>,
|
transport: Arc<QuicTransport>,
|
||||||
heartbeat_interval: Duration,
|
heartbeat_interval: Duration,
|
||||||
local_node_info: NodeInfo,
|
local_node_info: NodeInfo,
|
||||||
|
drive_manager: Option<Arc<Mutex<DriveManager>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MembershipManager {
|
impl MembershipManager {
|
||||||
@@ -29,9 +34,16 @@ impl MembershipManager {
|
|||||||
transport,
|
transport,
|
||||||
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
|
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
|
||||||
local_node_info,
|
local_node_info,
|
||||||
|
drive_manager: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the drive manager for health reporting in heartbeats.
|
||||||
|
pub fn with_drive_manager(mut self, dm: Arc<Mutex<DriveManager>>) -> Self {
|
||||||
|
self.drive_manager = Some(dm);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Join the cluster by contacting seed nodes.
|
/// Join the cluster by contacting seed nodes.
|
||||||
/// Sends a JoinRequest to each seed node until one accepts.
|
/// Sends a JoinRequest to each seed node until one accepts.
|
||||||
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
|
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
|
||||||
@@ -129,6 +141,9 @@ impl MembershipManager {
|
|||||||
let topology_version = self.state.version().await;
|
let topology_version = self.state.version().await;
|
||||||
let mut responded = Vec::new();
|
let mut responded = Vec::new();
|
||||||
|
|
||||||
|
// Collect drive health states
|
||||||
|
let drive_states = self.collect_drive_states().await;
|
||||||
|
|
||||||
for peer in &peers {
|
for peer in &peers {
|
||||||
let addr: SocketAddr = match peer.quic_addr.parse() {
|
let addr: SocketAddr = match peer.quic_addr.parse() {
|
||||||
Ok(a) => a,
|
Ok(a) => a,
|
||||||
@@ -138,7 +153,7 @@ impl MembershipManager {
|
|||||||
let heartbeat = ClusterRequest::Heartbeat(HeartbeatMessage {
|
let heartbeat = ClusterRequest::Heartbeat(HeartbeatMessage {
|
||||||
node_id: self.local_node_info.node_id.clone(),
|
node_id: self.local_node_info.node_id.clone(),
|
||||||
timestamp: chrono::Utc::now().to_rfc3339(),
|
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||||
drive_states: Vec::new(), // TODO: populate from DriveManager
|
drive_states: drive_states.clone(),
|
||||||
topology_version,
|
topology_version,
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -181,4 +196,31 @@ impl MembershipManager {
|
|||||||
let _response = self.transport.send_request(&conn, heartbeat).await?;
|
let _response = self.transport.send_request(&conn, heartbeat).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Collect drive health states from the DriveManager, if available.
|
||||||
|
async fn collect_drive_states(&self) -> Vec<DriveStateInfo> {
|
||||||
|
let dm = match &self.drive_manager {
|
||||||
|
Some(dm) => dm,
|
||||||
|
None => return Vec::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut manager = dm.lock().await;
|
||||||
|
let results = manager.check_all_drives().await;
|
||||||
|
|
||||||
|
results
|
||||||
|
.into_iter()
|
||||||
|
.map(|(idx, status)| {
|
||||||
|
let status_str = match status {
|
||||||
|
DriveStatus::Online => "online",
|
||||||
|
DriveStatus::Degraded => "degraded",
|
||||||
|
DriveStatus::Offline => "offline",
|
||||||
|
DriveStatus::Healing => "healing",
|
||||||
|
};
|
||||||
|
DriveStateInfo {
|
||||||
|
drive_index: idx as u32,
|
||||||
|
status: status_str.to_string(),
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ use crate::policy::{self, PolicyDecision, PolicyStore};
|
|||||||
use crate::error::StorageError;
|
use crate::error::StorageError;
|
||||||
use crate::cluster::coordinator::DistributedStore;
|
use crate::cluster::coordinator::DistributedStore;
|
||||||
use crate::cluster::config::ErasureConfig;
|
use crate::cluster::config::ErasureConfig;
|
||||||
|
use crate::cluster::drive_manager::DriveManager;
|
||||||
use crate::cluster::membership::MembershipManager;
|
use crate::cluster::membership::MembershipManager;
|
||||||
use crate::cluster::placement;
|
use crate::cluster::placement;
|
||||||
use crate::cluster::protocol::NodeInfo;
|
use crate::cluster::protocol::NodeInfo;
|
||||||
@@ -217,13 +218,21 @@ impl StorageServer {
|
|||||||
};
|
};
|
||||||
cluster_state.add_node(local_node_info.clone()).await;
|
cluster_state.add_node(local_node_info.clone()).await;
|
||||||
|
|
||||||
// Join cluster if seed nodes are configured
|
// Initialize drive manager for health monitoring
|
||||||
let membership = Arc::new(MembershipManager::new(
|
let drive_manager = Arc::new(tokio::sync::Mutex::new(
|
||||||
cluster_state.clone(),
|
DriveManager::new(&cluster_config.drives).await?,
|
||||||
transport.clone(),
|
|
||||||
cluster_config.heartbeat_interval_ms,
|
|
||||||
local_node_info,
|
|
||||||
));
|
));
|
||||||
|
|
||||||
|
// Join cluster if seed nodes are configured
|
||||||
|
let membership = Arc::new(
|
||||||
|
MembershipManager::new(
|
||||||
|
cluster_state.clone(),
|
||||||
|
transport.clone(),
|
||||||
|
cluster_config.heartbeat_interval_ms,
|
||||||
|
local_node_info,
|
||||||
|
)
|
||||||
|
.with_drive_manager(drive_manager),
|
||||||
|
);
|
||||||
membership
|
membership
|
||||||
.join_cluster(&cluster_config.seed_nodes)
|
.join_cluster(&cluster_config.seed_nodes)
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
@@ -811,14 +811,18 @@ impl StorageBackend {
|
|||||||
pub fn policies_dir(&self) -> std::path::PathBuf {
|
pub fn policies_dir(&self) -> std::path::PathBuf {
|
||||||
match self {
|
match self {
|
||||||
StorageBackend::Standalone(fs) => fs.policies_dir(),
|
StorageBackend::Standalone(fs) => fs.policies_dir(),
|
||||||
StorageBackend::Clustered(_) => PathBuf::from(".policies"), // TODO: proper policies in cluster mode
|
StorageBackend::Clustered(ds) => ds.policies_dir(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn initialize(&self) -> Result<()> {
|
pub async fn initialize(&self) -> Result<()> {
|
||||||
match self {
|
match self {
|
||||||
StorageBackend::Standalone(fs) => fs.initialize().await,
|
StorageBackend::Standalone(fs) => fs.initialize().await,
|
||||||
StorageBackend::Clustered(_) => Ok(()), // Cluster init happens separately
|
StorageBackend::Clustered(ds) => {
|
||||||
|
// Ensure policies directory exists
|
||||||
|
tokio::fs::create_dir_all(ds.policies_dir()).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartstorage',
|
name: '@push.rocks/smartstorage',
|
||||||
version: '6.1.0',
|
version: '6.3.0',
|
||||||
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user