Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c683b02e8c | |||
| b64be03c2f | |||
| 494dac1267 | |||
| cea3407777 | |||
| a009d990d0 | |||
| 08d545f5db |
22
changelog.md
22
changelog.md
@@ -1,5 +1,27 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-03-23 - 6.3.2 - fix(docs)
|
||||
update license ownership and correct README license file reference
|
||||
|
||||
- Adjusts the copyright holder name in the license file
|
||||
- Fixes the README link to match the lowercase license filename
|
||||
|
||||
## 2026-03-21 - 6.3.1 - fix(cluster)
|
||||
improve shard reconstruction validation and start background healing service
|
||||
|
||||
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
|
||||
- verify reconstructed shards before writing healed data back to disk
|
||||
- start the healing service during server initialization with shared local shard stores
|
||||
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
|
||||
- clean up unused variables and imports across cluster modules
|
||||
|
||||
## 2026-03-21 - 6.3.0 - feat(readme)
|
||||
document distributed cluster mode, erasure coding, and QUIC-based architecture
|
||||
|
||||
- Expand README overview and feature matrix to highlight clustering, multi-drive awareness, and distributed storage capabilities
|
||||
- Add standalone and cluster mode usage examples plus cluster configuration options
|
||||
- Document clustering internals including erasure coding, quorum behavior, QUIC transport, self-healing, and on-disk layout
|
||||
|
||||
## 2026-03-21 - 6.2.0 - feat(cluster)
|
||||
add shard healing, drive health heartbeats, and clustered policy directory support
|
||||
|
||||
|
||||
2
license
2
license
@@ -1,4 +1,4 @@
|
||||
Copyright (c) 2021 Lossless GmbH (hello@lossless.com)
|
||||
Copyright (c) 2021 Task Venture Capital GmbH (hello@task.vc)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartstorage",
|
||||
"version": "6.2.0",
|
||||
"version": "6.3.2",
|
||||
"private": false,
|
||||
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
||||
"main": "dist_ts/index.js",
|
||||
|
||||
241
readme.md
241
readme.md
@@ -1,6 +1,6 @@
|
||||
# @push.rocks/smartstorage
|
||||
|
||||
A high-performance, S3-compatible local storage server powered by a **Rust core** with a clean TypeScript API. Drop-in replacement for AWS S3 during development and testing — no cloud, no Docker, no MinIO. Just `npm install` and go.
|
||||
A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just `npm install` and go. 🚀
|
||||
|
||||
## Issue Reporting and Security
|
||||
|
||||
@@ -15,23 +15,34 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
|
||||
| Large file uploads | Streaming, zero-copy | Yes | OOM risk |
|
||||
| Range requests | Seek-based | Yes | Full read |
|
||||
| Language | Rust + TypeScript | Go | JavaScript |
|
||||
| Multipart uploads | Full support | Yes | No |
|
||||
| Multipart uploads | ✅ Full support | Yes | No |
|
||||
| Auth | AWS SigV4 (full verification) | Full IAM | Basic |
|
||||
| Bucket policies | IAM-style evaluation | Yes | No |
|
||||
| Clustering | ✅ Erasure-coded, QUIC | Yes | No |
|
||||
| Multi-drive awareness | ✅ Per-drive health | Yes | No |
|
||||
|
||||
### Core Features
|
||||
|
||||
- **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure
|
||||
- **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client
|
||||
- **Filesystem-backed storage** — buckets map to directories, objects to files
|
||||
- **Streaming multipart uploads** — large files without memory pressure
|
||||
- **Byte-range requests** — `seek()` directly to the requested byte offset
|
||||
- **AWS SigV4 authentication** — full signature verification with constant-time comparison and 15-min clock skew enforcement
|
||||
- **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation, wildcard matching, and anonymous access support
|
||||
- **CORS middleware** — configurable cross-origin support
|
||||
- **Structured logging** — tracing-based, error through debug levels
|
||||
- **Clean slate mode** — wipe storage on startup for test isolation
|
||||
- **Test-first design** — start/stop in milliseconds, no port conflicts
|
||||
- 🦀 **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure
|
||||
- 📦 **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client
|
||||
- 💾 **Filesystem-backed storage** — buckets map to directories, objects to files
|
||||
- 📤 **Streaming multipart uploads** — large files without memory pressure
|
||||
- 📐 **Byte-range requests** — `seek()` directly to the requested byte offset
|
||||
- 🔐 **AWS SigV4 authentication** — full signature verification with constant-time comparison
|
||||
- 📋 **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation and wildcard matching
|
||||
- 🌐 **CORS middleware** — configurable cross-origin support
|
||||
- 🧹 **Clean slate mode** — wipe storage on startup for test isolation
|
||||
- ⚡ **Test-first design** — start/stop in milliseconds, no port conflicts
|
||||
|
||||
### Clustering Features
|
||||
|
||||
- 🔗 **Erasure coding** — Reed-Solomon (configurable k data + m parity shards) for storage efficiency and fault tolerance
|
||||
- 🚄 **QUIC transport** — multiplexed, encrypted inter-node communication via `quinn` with zero head-of-line blocking
|
||||
- 💽 **Multi-drive awareness** — each node manages multiple independent storage paths with health monitoring
|
||||
- 🤝 **Cluster membership** — static seed config + runtime join, heartbeat-based failure detection
|
||||
- ✍️ **Quorum writes** — data is only acknowledged after k+1 shards are persisted
|
||||
- 📖 **Quorum reads** — reconstruct from any k available shards, local-first fast path
|
||||
- 🩹 **Self-healing** — background scanner detects and reconstructs missing/corrupt shards
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -43,6 +54,8 @@ pnpm add @push.rocks/smartstorage -D
|
||||
|
||||
## Quick Start
|
||||
|
||||
### Standalone Mode (Dev & Test)
|
||||
|
||||
```typescript
|
||||
import { SmartStorage } from '@push.rocks/smartstorage';
|
||||
|
||||
@@ -63,6 +76,31 @@ const descriptor = await storage.getStorageDescriptor();
|
||||
await storage.stop();
|
||||
```
|
||||
|
||||
### Cluster Mode (Distributed)
|
||||
|
||||
```typescript
|
||||
import { SmartStorage } from '@push.rocks/smartstorage';
|
||||
|
||||
const storage = await SmartStorage.createAndStart({
|
||||
server: { port: 3000 },
|
||||
cluster: {
|
||||
enabled: true,
|
||||
nodeId: 'node-1',
|
||||
quicPort: 4000,
|
||||
seedNodes: ['192.168.1.11:4000', '192.168.1.12:4000'],
|
||||
erasure: {
|
||||
dataShards: 4, // k: minimum shards to reconstruct data
|
||||
parityShards: 2, // m: fault tolerance (can lose up to m shards)
|
||||
},
|
||||
drives: {
|
||||
paths: ['/mnt/disk1', '/mnt/disk2', '/mnt/disk3'],
|
||||
},
|
||||
},
|
||||
});
|
||||
```
|
||||
|
||||
Objects are automatically split into chunks (default 4 MB), erasure-coded into 6 shards (4 data + 2 parity), and distributed across drives/nodes. Any 4 of 6 shards can reconstruct the original data.
|
||||
|
||||
## Configuration
|
||||
|
||||
All config fields are optional — sensible defaults are applied automatically.
|
||||
@@ -75,7 +113,7 @@ const config: ISmartStorageConfig = {
|
||||
port: 3000, // Default: 3000
|
||||
address: '0.0.0.0', // Default: '0.0.0.0'
|
||||
silent: false, // Default: false
|
||||
region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing
|
||||
region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing
|
||||
},
|
||||
storage: {
|
||||
directory: './my-data', // Default: .nogit/bucketsDir
|
||||
@@ -111,6 +149,22 @@ const config: ISmartStorageConfig = {
|
||||
expirationDays: 7,
|
||||
cleanupIntervalMinutes: 60,
|
||||
},
|
||||
cluster: { // Optional — omit for standalone mode
|
||||
enabled: true,
|
||||
nodeId: 'node-1', // Auto-generated UUID if omitted
|
||||
quicPort: 4000, // Default: 4000
|
||||
seedNodes: [], // Addresses of existing cluster members
|
||||
erasure: {
|
||||
dataShards: 4, // Default: 4
|
||||
parityShards: 2, // Default: 2
|
||||
chunkSizeBytes: 4194304, // Default: 4 MB
|
||||
},
|
||||
drives: {
|
||||
paths: ['/mnt/disk1', '/mnt/disk2'],
|
||||
},
|
||||
heartbeatIntervalMs: 5000, // Default: 5000
|
||||
heartbeatTimeoutMs: 30000, // Default: 30000
|
||||
},
|
||||
};
|
||||
|
||||
const storage = await SmartStorage.createAndStart(config);
|
||||
@@ -207,7 +261,7 @@ const files = await dir.listFiles();
|
||||
|
||||
## Multipart Uploads
|
||||
|
||||
For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory.
|
||||
For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory. In cluster mode, each part is independently erasure-coded and distributed.
|
||||
|
||||
```typescript
|
||||
import {
|
||||
@@ -255,8 +309,6 @@ When `auth.enabled` is `true`, the auth pipeline works as follows:
|
||||
|
||||
### Setting a Bucket Policy
|
||||
|
||||
Use the S3 `PutBucketPolicy` API (or any S3 client that supports it):
|
||||
|
||||
```typescript
|
||||
import { PutBucketPolicyCommand } from '@aws-sdk/client-s3';
|
||||
|
||||
@@ -294,6 +346,81 @@ await client.send(new PutBucketPolicyCommand({
|
||||
|
||||
Deleting a bucket automatically removes its associated policy.
|
||||
|
||||
## Clustering Deep Dive 🔗
|
||||
|
||||
smartstorage can run as a distributed storage cluster where multiple nodes cooperate to store and retrieve data with built-in redundancy.
|
||||
|
||||
### How It Works
|
||||
|
||||
```
|
||||
Client ──HTTP PUT──▶ Node A (coordinator)
|
||||
│
|
||||
├─ Split object into 4 MB chunks
|
||||
├─ Erasure-code each chunk (4 data + 2 parity = 6 shards)
|
||||
│
|
||||
├──QUIC──▶ Node B (shard writes)
|
||||
├──QUIC──▶ Node C (shard writes)
|
||||
└─ Local disk (shard writes)
|
||||
```
|
||||
|
||||
1. **Any node can coordinate** — the client connects to any cluster member
|
||||
2. **Objects are chunked** — large objects split into fixed-size pieces (default 4 MB)
|
||||
3. **Each chunk is erasure-coded** — Reed-Solomon produces k data + m parity shards
|
||||
4. **Shards are distributed** — placed across different nodes and drives for fault isolation
|
||||
5. **Quorum guarantees consistency** — writes need k+1 acks, reads need k shards
|
||||
|
||||
### Erasure Coding
|
||||
|
||||
With the default `4+2` configuration:
|
||||
- Storage overhead: **33%** (vs. 200% for 3x replication)
|
||||
- Fault tolerance: **any 2 drives/nodes can fail** simultaneously
|
||||
- Read efficiency: only **4 of 6 shards** needed to reconstruct data
|
||||
|
||||
| Config | Total Shards | Overhead | Tolerance | Min Nodes |
|
||||
|--------|-------------|----------|-----------|-----------|
|
||||
| 4+2 | 6 | 33% | 2 failures | 3 |
|
||||
| 6+3 | 9 | 50% | 3 failures | 5 |
|
||||
| 2+1 | 3 | 50% | 1 failure | 2 |
|
||||
|
||||
### QUIC Transport
|
||||
|
||||
Inter-node communication uses [QUIC](https://en.wikipedia.org/wiki/QUIC) via the `quinn` library:
|
||||
- 🔒 **Built-in TLS** — self-signed certs auto-generated at cluster init
|
||||
- 🔀 **Multiplexed streams** — concurrent shard transfers without head-of-line blocking
|
||||
- ⚡ **Connection pooling** — persistent connections to peer nodes
|
||||
- 🌊 **Natural backpressure** — QUIC flow control prevents overloading slow peers
|
||||
|
||||
### Cluster Membership
|
||||
|
||||
- **Static seed nodes** — initial cluster defined in config
|
||||
- **Runtime join** — new nodes can join a running cluster
|
||||
- **Heartbeat monitoring** — every 5s (configurable), with suspect/offline detection
|
||||
- **Split-brain prevention** — nodes only mark peers offline when they have majority
|
||||
|
||||
### Self-Healing
|
||||
|
||||
A background scanner periodically (default: every 24h):
|
||||
1. Checks shard checksums (CRC32C) for bit-rot detection
|
||||
2. Identifies shards on offline nodes
|
||||
3. Reconstructs missing shards from remaining data using Reed-Solomon
|
||||
4. Places healed shards on healthy drives
|
||||
|
||||
Healing runs at low priority to avoid impacting foreground I/O.
|
||||
|
||||
### Erasure Set Formation
|
||||
|
||||
Drives are organized into fixed **erasure sets** at cluster initialization:
|
||||
|
||||
```
|
||||
3 nodes × 4 drives each = 12 drives total
|
||||
With 6-shard erasure sets → 2 erasure sets
|
||||
|
||||
Set 0: Node1-Disk0, Node2-Disk0, Node3-Disk0, Node1-Disk1, Node2-Disk1, Node3-Disk1
|
||||
Set 1: Node1-Disk2, Node2-Disk2, Node3-Disk2, Node1-Disk3, Node2-Disk3, Node3-Disk3
|
||||
```
|
||||
|
||||
Drives are interleaved across nodes for maximum fault isolation. New nodes form new erasure sets — existing data is never rebalanced.
|
||||
|
||||
## Testing Integration
|
||||
|
||||
```typescript
|
||||
@@ -358,31 +485,37 @@ Get connection details for S3-compatible clients. Returns:
|
||||
smartstorage uses a **hybrid Rust + TypeScript** architecture:
|
||||
|
||||
```
|
||||
┌─────────────────────────────────┐
|
||||
│ Your Code (AWS SDK, etc.) │
|
||||
│ ↕ HTTP (localhost:3000) │
|
||||
├─────────────────────────────────┤
|
||||
│ ruststorage binary (Rust) │
|
||||
│ ├─ hyper 1.x HTTP server │
|
||||
│ ├─ S3 path-style routing │
|
||||
│ ├─ Streaming storage layer │
|
||||
│ ├─ Multipart manager │
|
||||
│ ├─ SigV4 auth + policy engine │
|
||||
│ ├─ CORS middleware │
|
||||
│ └─ S3 XML response builder │
|
||||
├─────────────────────────────────┤
|
||||
│ TypeScript (thin IPC wrapper) │
|
||||
│ ├─ SmartStorage class │
|
||||
│ ├─ RustBridge (stdin/stdout) │
|
||||
│ └─ Config & S3 descriptor │
|
||||
└─────────────────────────────────┘
|
||||
┌──────────────────────────────────────────────┐
|
||||
│ Your Code (AWS SDK, SmartBucket, etc.) │
|
||||
│ ↕ HTTP (localhost:3000) │
|
||||
├──────────────────────────────────────────────┤
|
||||
│ ruststorage binary (Rust) │
|
||||
│ ├─ hyper 1.x HTTP server │
|
||||
│ ├─ S3 path-style routing │
|
||||
│ ├─ StorageBackend (Standalone or Clustered) │
|
||||
│ │ ├─ FileStore (single-node mode) │
|
||||
│ │ └─ DistributedStore (cluster mode) │
|
||||
│ │ ├─ ErasureCoder (Reed-Solomon) │
|
||||
│ │ ├─ ShardStore (per-drive storage) │
|
||||
│ │ ├─ QuicTransport (quinn) │
|
||||
│ │ ├─ ClusterState & Membership │
|
||||
│ │ └─ HealingService │
|
||||
│ ├─ SigV4 auth + policy engine │
|
||||
│ ├─ CORS middleware │
|
||||
│ └─ S3 XML response builder │
|
||||
├──────────────────────────────────────────────┤
|
||||
│ TypeScript (thin IPC wrapper) │
|
||||
│ ├─ SmartStorage class │
|
||||
│ ├─ RustBridge (stdin/stdout JSON IPC) │
|
||||
│ └─ Config & S3 descriptor │
|
||||
└──────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
**Why Rust?** The TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
|
||||
**Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
|
||||
|
||||
**IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`.
|
||||
**IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`, `clusterStatus`.
|
||||
|
||||
### S3-Compatible Operations Supported
|
||||
### S3-Compatible Operations
|
||||
|
||||
| Operation | Method | Path |
|
||||
|-----------|--------|------|
|
||||
@@ -407,31 +540,45 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
|
||||
|
||||
### On-Disk Format
|
||||
|
||||
**Standalone mode:**
|
||||
```
|
||||
{storage.directory}/
|
||||
{bucket}/
|
||||
{key}._storage_object # Object data
|
||||
{key}._storage_object # Object data
|
||||
{key}._storage_object.metadata.json # Metadata (content-type, x-amz-meta-*, etc.)
|
||||
{key}._storage_object.md5 # Cached MD5 hash
|
||||
{key}._storage_object.md5 # Cached MD5 hash
|
||||
.multipart/
|
||||
{upload-id}/
|
||||
metadata.json # Upload metadata (bucket, key, parts)
|
||||
part-1 # Part data files
|
||||
part-2
|
||||
...
|
||||
metadata.json # Upload metadata
|
||||
part-1, part-2, ... # Part data files
|
||||
.policies/
|
||||
{bucket}.policy.json # Bucket policy (IAM JSON format)
|
||||
{bucket}.policy.json # Bucket policy (IAM JSON format)
|
||||
```
|
||||
|
||||
**Cluster mode:**
|
||||
```
|
||||
{drive_path}/.smartstorage/
|
||||
format.json # Drive metadata (cluster ID, erasure set)
|
||||
data/{bucket}/{key_hash}/{key}/
|
||||
chunk-{N}/shard-{M}.dat # Erasure-coded shard data
|
||||
chunk-{N}/shard-{M}.meta # Shard metadata (checksum, size)
|
||||
|
||||
{storage.directory}/
|
||||
.manifests/{bucket}/
|
||||
{key}.manifest.json # Object manifest (shard placements, checksums)
|
||||
.buckets/{bucket}/ # Bucket metadata
|
||||
.policies/{bucket}.policy.json # Bucket policies
|
||||
```
|
||||
|
||||
## Related Packages
|
||||
|
||||
- [`@push.rocks/smartbucket`](https://code.foss.global/push.rocks/smartbucket) — High-level S3-compatible abstraction layer
|
||||
- [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript <-> Rust IPC bridge
|
||||
- [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript ↔ Rust IPC bridge
|
||||
- [`@git.zone/tsrust`](https://code.foss.global/git.zone/tsrust) — Rust cross-compilation for npm packages
|
||||
|
||||
## License and Legal Information
|
||||
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
|
||||
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
|
||||
|
||||
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
|
||||
|
||||
|
||||
@@ -205,7 +205,7 @@ impl DistributedStore {
|
||||
|
||||
// Determine which chunks to fetch based on range
|
||||
let chunk_size = manifest.chunk_size as u64;
|
||||
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) =
|
||||
let (first_chunk, last_chunk, byte_offset_in_first, _byte_end_in_last) =
|
||||
if let Some((start, end)) = range {
|
||||
let first = (start / chunk_size) as usize;
|
||||
let last = (end / chunk_size) as usize;
|
||||
@@ -1036,10 +1036,6 @@ impl DistributedStore {
|
||||
// Internal: fetch + reconstruct
|
||||
// ============================
|
||||
|
||||
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
|
||||
self.fetch_and_reconstruct_chunk_for_object(chunk, "", "").await
|
||||
}
|
||||
|
||||
/// Fetch shards and reconstruct a chunk. bucket/key needed for shard ID lookups.
|
||||
async fn fetch_and_reconstruct_chunk_for_object(
|
||||
&self,
|
||||
@@ -1047,7 +1043,7 @@ impl DistributedStore {
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
) -> Result<Vec<u8>> {
|
||||
let k = self.erasure_config.data_shards;
|
||||
let k = self.erasure_config.read_quorum();
|
||||
let total = self.erasure_config.total_shards();
|
||||
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
|
||||
let mut succeeded = 0usize;
|
||||
|
||||
@@ -256,6 +256,18 @@ impl HealingService {
|
||||
}
|
||||
};
|
||||
|
||||
// Verify reconstructed shards are consistent
|
||||
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
|
||||
tracing::error!(
|
||||
bucket = manifest.bucket,
|
||||
key = manifest.key,
|
||||
chunk = chunk.chunk_index,
|
||||
"Shard verification failed after reconstruction"
|
||||
);
|
||||
stats.errors += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Write the missing shards to the first available local drive
|
||||
for affected_placement in &affected {
|
||||
let shard_idx = affected_placement.shard_index as usize;
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
// Cluster modules contain forward-looking public API that is incrementally wired.
|
||||
// Allow dead_code for methods/structs not yet called from the main server path.
|
||||
#![allow(dead_code)]
|
||||
|
||||
pub mod config;
|
||||
pub mod coordinator;
|
||||
pub mod drive_manager;
|
||||
|
||||
@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
|
||||
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
use super::protocol::{
|
||||
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
||||
};
|
||||
@@ -225,20 +223,14 @@ impl QuicTransport {
|
||||
mut recv: quinn::RecvStream,
|
||||
shard_store: Arc<ShardStore>,
|
||||
) -> Result<()> {
|
||||
// Read the length-prefixed request header
|
||||
let mut len_buf = [0u8; 4];
|
||||
recv.read_exact(&mut len_buf).await?;
|
||||
let msg_len = u32::from_le_bytes(len_buf) as usize;
|
||||
|
||||
let mut msg_buf = vec![0u8; msg_len];
|
||||
recv.read_exact(&mut msg_buf).await?;
|
||||
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
|
||||
// Read the full request (length-prefixed bincode + optional trailing data)
|
||||
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
|
||||
let (request, header_len) = protocol::decode_request(&raw)?;
|
||||
|
||||
match request {
|
||||
ClusterRequest::ShardWrite(write_req) => {
|
||||
// Read shard data from the stream
|
||||
let mut shard_data = vec![0u8; write_req.shard_data_length as usize];
|
||||
recv.read_exact(&mut shard_data).await?;
|
||||
// Shard data follows the header in the raw buffer
|
||||
let shard_data = &raw[header_len..];
|
||||
|
||||
let shard_id = ShardId {
|
||||
bucket: write_req.bucket,
|
||||
@@ -348,8 +340,6 @@ impl QuicTransport {
|
||||
// will be handled by the membership and coordinator modules.
|
||||
// For now, send a generic ack.
|
||||
_ => {
|
||||
let response_data = recv.read_to_end(0).await.unwrap_or_default();
|
||||
drop(response_data);
|
||||
let err = protocol::ErrorResponse {
|
||||
request_id: String::new(),
|
||||
code: "NotImplemented".to_string(),
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
|
||||
@@ -24,8 +24,8 @@ use crate::config::SmartStorageConfig;
|
||||
use crate::policy::{self, PolicyDecision, PolicyStore};
|
||||
use crate::error::StorageError;
|
||||
use crate::cluster::coordinator::DistributedStore;
|
||||
use crate::cluster::config::ErasureConfig;
|
||||
use crate::cluster::drive_manager::DriveManager;
|
||||
use crate::cluster::healing::HealingService;
|
||||
use crate::cluster::membership::MembershipManager;
|
||||
use crate::cluster::placement;
|
||||
use crate::cluster::protocol::NodeInfo;
|
||||
@@ -237,9 +237,15 @@ impl StorageServer {
|
||||
.join_cluster(&cluster_config.seed_nodes)
|
||||
.await?;
|
||||
|
||||
// Build local shard stores (one per drive) for shared use
|
||||
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
|
||||
.iter()
|
||||
.map(|p| Arc::new(ShardStore::new(p.clone())))
|
||||
.collect();
|
||||
|
||||
// Start QUIC accept loop for incoming connections
|
||||
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone()));
|
||||
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
||||
let shard_store_for_accept = local_shard_stores[0].clone();
|
||||
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
||||
let transport_clone = transport.clone();
|
||||
tokio::spawn(async move {
|
||||
transport_clone
|
||||
@@ -249,11 +255,24 @@ impl StorageServer {
|
||||
|
||||
// Start heartbeat loop
|
||||
let membership_clone = membership.clone();
|
||||
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
||||
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
||||
tokio::spawn(async move {
|
||||
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
|
||||
});
|
||||
|
||||
// Start healing service
|
||||
let healing_service = HealingService::new(
|
||||
cluster_state.clone(),
|
||||
&erasure_config,
|
||||
local_shard_stores.clone(),
|
||||
manifest_dir.clone(),
|
||||
24, // scan every 24 hours
|
||||
)?;
|
||||
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
|
||||
tokio::spawn(async move {
|
||||
healing_service.run(heal_shutdown_rx).await;
|
||||
});
|
||||
|
||||
// Create distributed store
|
||||
let distributed_store = DistributedStore::new(
|
||||
cluster_state,
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartstorage',
|
||||
version: '6.2.0',
|
||||
version: '6.3.2',
|
||||
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user