Compare commits

...

5 Commits

Author SHA1 Message Date
jkunz c8d3ed79aa v6.4.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-30 06:08:42 +00:00
jkunz a31e477359 feat(cluster,server,auth): add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings 2026-04-30 06:08:42 +00:00
jkunz c2b40ee240 v6.3.3
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-19 12:22:53 +00:00
jkunz 0db138bf42 fix(build): rename npmextra config to .smartconfig and refresh build metadata 2026-04-19 12:22:53 +00:00
jkunz 0e9862efca feat: enhance storage stats and cluster health reporting
- Introduced new data structures for bucket and storage statistics, including BucketSummary, StorageStats, and ClusterHealth.
- Implemented runtime statistics tracking for buckets, including object count and total size.
- Added methods to retrieve storage stats and bucket summaries in the FileStore.
- Enhanced the SmartStorage interface to expose storage stats and cluster health.
- Implemented tests for runtime stats, cluster health, and credential management.
- Added support for runtime-managed credentials with atomic replacement.
- Improved filesystem usage reporting for storage locations.
2026-04-19 11:57:28 +00:00
30 changed files with 4552 additions and 1938 deletions
View File
+1 -1
View File
@@ -1,7 +1,7 @@
{
"json.schemas": [
{
"fileMatch": ["/npmextra.json"],
"fileMatch": ["/.smartconfig.json"],
"schema": {
"type": "object",
"properties": {
+41
View File
@@ -1,5 +1,46 @@
# Changelog
## 2026-04-30 - 6.4.0 - feat(cluster,server,auth)
add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings
- persist cluster identity and topology snapshots under .smartstorage/cluster to support safer clustered restarts and seed-node joins
- add unauthenticated /-/live, /-/ready, /-/health, and /-/metrics endpoints with basic request and storage metrics
- route clustered shard read/write/delete/head operations by drive index and handle join, heartbeat, and topology sync over QUIC
- change runtime credential listing to return access-key metadata only, excluding secretAccessKey values
- add tests for operational endpoints and multi-node cluster persistence and recovery behavior
## 2026-04-19 - 6.3.3 - fix(build)
rename npmextra config to .smartconfig and refresh build metadata
- renames the published project config file from npmextra.json to .smartconfig.json
- updates build and tooling dependencies to newer patch and minor versions
- adds Node type configuration and TypeScript deprecation handling in tsconfig
- refreshes README documentation to match the current build command and runtime management APIs
## Next - feat(credentials)
add runtime credential management APIs
- Expose `listCredentials()` and `replaceCredentials()` through the Rust bridge and the `SmartStorage` TypeScript API.
- Move request authentication onto a native runtime credential store so credential replacement is atomic and effective for new requests immediately without a restart.
- Validate replacement input cleanly by rejecting empty replacement sets, empty credential fields, and duplicate `accessKeyId` values.
- Add runtime credential rotation tests covering initial auth, revocation of old credentials, multiple active credentials, and invalid replacements.
## Next - feat(cluster-health)
add runtime cluster and drive health introspection
- Expose `getClusterHealth()` through the Rust bridge and the `SmartStorage` TypeScript API.
- Report native cluster mode state including local node id, peer status, local drive probe health, quorum health, erasure settings, and tracked healing runtime state.
- Return a clear `{ enabled: false }` response when clustering is not active instead of synthesizing config-based data.
- Add standalone and single-node cluster tests plus README documentation for the best-effort semantics of peer and repair health values.
## Next - feat(stats)
add runtime bucket summaries and storage stats
- Expose `getStorageStats()` and `listBucketSummaries()` through the Rust bridge and the `SmartStorage` TypeScript API.
- Maintain native runtime stats for bucket counts, object counts, and logical stored bytes, initialized from on-disk state at startup and updated on bucket/object mutations.
- Include cheap filesystem-capacity snapshots for the storage directory or configured cluster drive paths.
- Add AWS SDK integration coverage for object add, delete, and bucket delete stats flows and document the cache consistency semantics in the README.
## 2026-03-23 - 6.3.2 - fix(docs)
update license ownership and correct README license file reference
+10 -10
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartstorage",
"version": "6.3.2",
"version": "6.4.0",
"private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js",
@@ -15,14 +15,14 @@
"buildDocs": "tsdoc"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.1014.0",
"@git.zone/tsbuild": "^4.3.0",
"@git.zone/tsbundle": "^2.9.1",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.5.0",
"@push.rocks/smartbucket": "^4.5.1",
"@types/node": "^25.5.0"
"@aws-sdk/client-s3": "^3.1032.0",
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.2",
"@git.zone/tsrust": "^1.3.2",
"@git.zone/tstest": "^3.6.3",
"@push.rocks/smartbucket": "^4.6.0",
"@types/node": "^25.6.0"
},
"browserslist": [
"last 1 chrome versions"
@@ -37,7 +37,7 @@
"dist_ts_web/**/*",
"assets/**/*",
"cli.js",
"npmextra.json",
".smartconfig.json",
"readme.md"
],
"dependencies": {
+1594 -1724
View File
File diff suppressed because it is too large Load Diff
+27 -3
View File
@@ -11,6 +11,12 @@
- **Bucket policies** (AWS/MinIO-compatible JSON policies, public access support)
- CORS support
- ListBuckets, ListObjects (v1/v2), CopyObject
- Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans)
- Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state)
- Runtime credential listing and atomic replacement via the Rust bridge
- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/`
- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics`
- Runtime credential listing returns access-key metadata only; secrets are write-only
## Architecture
@@ -20,6 +26,7 @@
- `management.rs` - IPC loop (newline-delimited JSON over stdin/stdout)
- `server.rs` - hyper 1.x HTTP server, routing, CORS, auth+policy pipeline, all S3-compatible handlers
- `storage.rs` - FileStore: filesystem-backed storage, multipart manager, `.policies/` dir
- `storage.rs` also owns the runtime stats cache and standalone storage scans used by the bridge stats API
- `xml_response.rs` - S3-compatible XML response builders
- `error.rs` - StorageError codes with HTTP status mapping
- `auth.rs` - AWS SigV4 signature verification (HMAC-SHA256, clock skew, constant-time compare)
@@ -37,6 +44,20 @@
| `start` | `{ config: ISmartStorageConfig }` | Init storage + HTTP server |
| `stop` | `{}` | Graceful shutdown |
| `createBucket` | `{ name: string }` | Create bucket directory |
| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots |
| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries |
| `listCredentials` | `{}` | Return the active runtime auth credential set |
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
### Operational HTTP Endpoints
| Endpoint | Purpose |
|----------|---------|
| `GET /-/live` | Process liveness |
| `GET /-/ready` | S3 readiness and cluster quorum readiness |
| `GET /-/health` | JSON storage, cluster, and runtime health |
| `GET /-/metrics` | Prometheus text metrics |
### Storage Layout
- Objects: `{root}/{bucket}/{key}._storage_object`
@@ -47,9 +68,9 @@
## Build
- `pnpm build` runs `tsrust && tsbuild --web --allowimplicitany`
- `pnpm build` runs `tsrust && tsbuild tsfolders --allowimplicitany`
- `tsrust` compiles Rust to `dist_rust/ruststorage`
- Targets: linux_amd64, linux_arm64 (configured in npmextra.json)
- Targets: linux_amd64, linux_arm64 (configured in .smartconfig.json)
## Dependencies
@@ -60,7 +81,10 @@
## Testing
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility (10 tests, auth disabled, port 3337)
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337)
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353)
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
- `test/test.auth.node.ts` - Auth + bucket policy integration (20 tests, auth enabled, port 3344)
- `test/test.policy-crud.node.ts` - Policy API CRUD + validation edge cases (17 tests, port 3345)
- `test/test.policy-eval.node.ts` - Policy evaluation: principals, actions, resources, deny-vs-allow (22 tests, port 3346)
+130 -1
View File
@@ -32,6 +32,8 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
- 📋 **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation and wildcard matching
- 🌐 **CORS middleware** — configurable cross-origin support
- 🧹 **Clean slate mode** — wipe storage on startup for test isolation
- 📊 **Runtime storage stats** — cheap bucket summaries and global counts without S3 list scans
- 🔑 **Runtime credential rotation** — list and replace active auth credentials without mutating internals
-**Test-first design** — start/stop in milliseconds, no port conflicts
### Clustering Features
@@ -39,6 +41,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
- 🔗 **Erasure coding** — Reed-Solomon (configurable k data + m parity shards) for storage efficiency and fault tolerance
- 🚄 **QUIC transport** — multiplexed, encrypted inter-node communication via `quinn` with zero head-of-line blocking
- 💽 **Multi-drive awareness** — each node manages multiple independent storage paths with health monitoring
- 🩺 **Cluster health introspection** — query native node, drive, quorum, and healing status for product dashboards
- 🤝 **Cluster membership** — static seed config + runtime join, heartbeat-based failure detection
- ✍️ **Quorum writes** — data is only acknowledged after k+1 shards are persisted
- 📖 **Quorum reads** — reconstruct from any k available shards, local-first fast path
@@ -201,6 +204,112 @@ const storage = await SmartStorage.createAndStart({
});
```
## Runtime Credentials
```typescript
const credentials = await storage.listCredentials();
await storage.replaceCredentials([
{
accessKeyId: 'ADMINA',
secretAccessKey: 'super-secret-a',
},
{
accessKeyId: 'ADMINB',
secretAccessKey: 'super-secret-b',
},
]);
```
```typescript
interface IStorageCredential {
accessKeyId: string;
secretAccessKey: string;
}
```
- `listCredentials()` returns the Rust core's current runtime credential set.
- `replaceCredentials()` swaps the full set atomically. On success, new requests use the new set immediately and the old credentials stop authenticating immediately.
- Requests that were already authenticated before the replacement keep running; auth is evaluated when each request starts.
- No restart is required.
- Replacement input must contain at least one credential, each `accessKeyId` and `secretAccessKey` must be non-empty, and `accessKeyId` values must be unique.
## Runtime Stats
```typescript
const stats = await storage.getStorageStats();
const bucketSummaries = await storage.listBucketSummaries();
console.log(stats.bucketCount);
console.log(stats.totalObjectCount);
console.log(stats.totalStorageBytes);
console.log(bucketSummaries[0]?.name, bucketSummaries[0]?.objectCount);
```
```typescript
interface IBucketSummary {
name: string;
objectCount: number;
totalSizeBytes: number;
creationDate?: number;
}
interface IStorageLocationSummary {
path: string;
totalBytes?: number;
availableBytes?: number;
usedBytes?: number;
}
interface IStorageStats {
bucketCount: number;
totalObjectCount: number;
totalStorageBytes: number;
buckets: IBucketSummary[];
storageDirectory: string;
storageLocations?: IStorageLocationSummary[];
}
```
- `bucketCount`, `totalObjectCount`, `totalStorageBytes`, and per-bucket totals are logical object stats maintained by the Rust runtime. They count object payload bytes, not sidecar files or erasure-coded shard overhead.
- smartstorage initializes these values from native on-disk state at startup, then keeps them in memory and updates them when bucket/object mutations succeed. Stats reads do not issue S3 `ListObjects` or rescan every object.
- Values are exact for mutations performed through smartstorage after startup. Direct filesystem edits outside smartstorage are not watched; restart the server to resync.
- `storageLocations` is a cheap filesystem-capacity snapshot. Standalone mode reports the storage directory. Cluster mode reports the configured drive paths.
## Cluster Health
```typescript
const clusterHealth = await storage.getClusterHealth();
if (!clusterHealth.enabled) {
console.log('Cluster mode is disabled');
} else {
console.log(clusterHealth.nodeId, clusterHealth.quorumHealthy);
console.log(clusterHealth.peers);
console.log(clusterHealth.drives);
}
```
```typescript
interface IClusterHealth {
enabled: boolean;
nodeId?: string;
quorumHealthy?: boolean;
majorityHealthy?: boolean;
peers?: IClusterPeerHealth[];
drives?: IClusterDriveHealth[];
erasure?: IClusterErasureHealth;
repairs?: IClusterRepairHealth;
}
```
- `getClusterHealth()` is served by the Rust core. The TypeScript wrapper does not infer values from static config.
- Standalone mode returns `{ enabled: false }`.
- Peer status is the local node's current view of cluster membership and heartbeats, so it is best-effort and may lag real network state.
- Drive health is based on live native probe checks on the configured local drive paths. Capacity values are cheap filesystem snapshots.
- `quorumHealthy` means the local node currently sees majority quorum and enough available placements in every erasure set to satisfy the configured write quorum.
- Repair fields expose the background healer's currently available runtime state. They are best-effort and limited to what the engine tracks today, such as whether a scan is active, the last completed run, and the last error.
## Usage with AWS SDK v3
```typescript
@@ -480,6 +589,26 @@ Get connection details for S3-compatible clients. Returns:
| `accessSecret` | `string` | Secret key from first configured credential |
| `useSsl` | `boolean` | Always `false` (plain HTTP) |
#### `getStorageStats(): Promise<IStorageStats>`
Read cached logical bucket and object totals from the Rust runtime without issuing S3 list calls.
#### `listBucketSummaries(): Promise<IBucketSummary[]>`
Get per-bucket logical object counts and total payload sizes.
#### `listCredentials(): Promise<IStorageCredential[]>`
Return the currently active runtime credential set.
#### `replaceCredentials(credentials: IStorageCredential[]): Promise<void>`
Atomically replace the active runtime credential set without restarting the server.
#### `getClusterHealth(): Promise<IClusterHealth>`
Read the Rust core's current cluster, drive, quorum, and repair health snapshot. Standalone mode returns `{ enabled: false }`.
## Architecture
smartstorage uses a **hybrid Rust + TypeScript** architecture:
@@ -513,7 +642,7 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
**Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
**IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`, `clusterStatus`.
**IPC Protocol:** TypeScript communicates with the `ruststorage` binary over newline-delimited JSON via stdin/stdout. The current management commands are `start`, `stop`, `createBucket`, `getStorageStats`, `listBucketSummaries`, `listCredentials`, `replaceCredentials`, and `getClusterHealth`.
### S3-Compatible Operations
+1
View File
@@ -1346,6 +1346,7 @@ dependencies = [
"http-body-util",
"hyper",
"hyper-util",
"libc",
"md-5",
"percent-encoding",
"quick-xml",
+1
View File
@@ -41,3 +41,4 @@ dashmap = "6"
hmac = "0.12"
sha2 = "0.10"
hex = "0.4"
libc = "0.2"
+84 -8
View File
@@ -2,9 +2,10 @@ use hmac::{Hmac, Mac};
use hyper::body::Incoming;
use hyper::Request;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use tokio::sync::RwLock;
use crate::config::{Credential, SmartStorageConfig};
use crate::config::{AuthConfig, Credential};
use crate::error::StorageError;
type HmacSha256 = Hmac<Sha256>;
@@ -27,7 +28,7 @@ struct SigV4Header {
/// Verify the request's SigV4 signature. Returns the caller identity on success.
pub fn verify_request(
req: &Request<Incoming>,
config: &SmartStorageConfig,
credentials: &[Credential],
) -> Result<AuthenticatedIdentity, StorageError> {
let auth_header = req
.headers()
@@ -47,7 +48,7 @@ pub fn verify_request(
let parsed = parse_auth_header(auth_header)?;
// Look up credential
let credential = find_credential(&parsed.access_key_id, config)
let credential = find_credential(&parsed.access_key_id, credentials)
.ok_or_else(StorageError::invalid_access_key_id)?;
// Get x-amz-date
@@ -163,14 +164,89 @@ fn parse_auth_header(header: &str) -> Result<SigV4Header, StorageError> {
}
/// Find a credential by access key ID.
fn find_credential<'a>(access_key_id: &str, config: &'a SmartStorageConfig) -> Option<&'a Credential> {
config
.auth
.credentials
fn find_credential<'a>(access_key_id: &str, credentials: &'a [Credential]) -> Option<&'a Credential> {
credentials
.iter()
.find(|c| c.access_key_id == access_key_id)
}
#[derive(Debug)]
pub struct RuntimeCredentialStore {
enabled: bool,
credentials: RwLock<Vec<Credential>>,
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CredentialMetadata {
pub access_key_id: String,
}
impl RuntimeCredentialStore {
pub fn new(config: &AuthConfig) -> Self {
Self {
enabled: config.enabled,
credentials: RwLock::new(config.credentials.clone()),
}
}
pub fn enabled(&self) -> bool {
self.enabled
}
pub async fn list_credentials(&self) -> Vec<CredentialMetadata> {
self.credentials
.read()
.await
.iter()
.map(|credential| CredentialMetadata {
access_key_id: credential.access_key_id.clone(),
})
.collect()
}
pub async fn snapshot_credentials(&self) -> Vec<Credential> {
self.credentials.read().await.clone()
}
pub async fn replace_credentials(&self, credentials: Vec<Credential>) -> Result<(), StorageError> {
validate_credentials(&credentials)?;
*self.credentials.write().await = credentials;
Ok(())
}
}
fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> {
if credentials.is_empty() {
return Err(StorageError::invalid_request(
"Credential replacement requires at least one credential.",
));
}
let mut seen_access_keys = HashSet::new();
for credential in credentials {
if credential.access_key_id.trim().is_empty() {
return Err(StorageError::invalid_request(
"Credential accessKeyId must not be empty.",
));
}
if credential.secret_access_key.trim().is_empty() {
return Err(StorageError::invalid_request(
"Credential secretAccessKey must not be empty.",
));
}
if !seen_access_keys.insert(credential.access_key_id.as_str()) {
return Err(StorageError::invalid_request(
"Credential accessKeyId values must be unique.",
));
}
}
Ok(())
}
/// Check clock skew (15 minutes max).
fn check_clock_skew(amz_date: &str) -> Result<(), StorageError> {
// Parse ISO 8601 basic format: YYYYMMDDTHHMMSSZ
+370 -24
View File
@@ -8,18 +8,24 @@ use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::{Mutex, RwLock};
use super::config::ErasureConfig;
use super::drive_manager::{DriveManager, DriveState, DriveStatus};
use super::erasure::ErasureCoder;
use super::healing::HealingRuntimeState;
use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement};
use super::placement::ErasureSet;
use super::protocol::{ClusterRequest, ShardDeleteRequest, ShardReadRequest, ShardWriteRequest};
use super::quic_transport::QuicTransport;
use super::shard_store::{ShardId, ShardStore};
use super::state::ClusterState;
use super::state::{ClusterState, NodeStatus};
use crate::storage::{
BucketInfo, CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry,
ListObjectsResult, MultipartUploadInfo, PutResult,
storage_location_summary, BucketInfo, BucketSummary, ClusterDriveHealth,
ClusterErasureHealth, ClusterHealth, ClusterPeerHealth, ClusterRepairHealth,
CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry,
ListObjectsResult, MultipartUploadInfo, PutResult, RuntimeBucketStats,
RuntimeStatsState, StorageLocationSummary, StorageStats,
};
use serde::{Deserialize, Serialize};
@@ -53,6 +59,10 @@ pub struct DistributedStore {
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
erasure_coder: ErasureCoder,
storage_dir: PathBuf,
drive_paths: Vec<PathBuf>,
drive_manager: Arc<Mutex<DriveManager>>,
healing_runtime: Arc<RwLock<HealingRuntimeState>>,
/// Local shard stores, one per drive. Index = drive index.
local_shard_stores: Vec<Arc<ShardStore>>,
/// Root directory for manifests on this node
@@ -62,6 +72,7 @@ pub struct DistributedStore {
/// Root directory for bucket policies
policies_dir: PathBuf,
erasure_config: ErasureConfig,
runtime_stats: RwLock<RuntimeStatsState>,
}
impl DistributedStore {
@@ -69,7 +80,10 @@ impl DistributedStore {
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
erasure_config: ErasureConfig,
storage_dir: PathBuf,
drive_paths: Vec<PathBuf>,
drive_manager: Arc<Mutex<DriveManager>>,
healing_runtime: Arc<RwLock<HealingRuntimeState>>,
manifest_dir: PathBuf,
buckets_dir: PathBuf,
) -> Result<Self> {
@@ -86,11 +100,16 @@ impl DistributedStore {
state,
transport,
erasure_coder,
storage_dir,
drive_paths,
drive_manager,
healing_runtime,
local_shard_stores,
manifest_dir,
buckets_dir,
policies_dir,
erasure_config,
runtime_stats: RwLock::new(RuntimeStatsState::default()),
})
}
@@ -99,6 +118,80 @@ impl DistributedStore {
self.policies_dir.clone()
}
pub async fn initialize_runtime_stats(&self) {
let buckets = match self.list_buckets().await {
Ok(buckets) => buckets,
Err(error) => {
tracing::warn!(path = %self.storage_dir.display(), error = %error, "Failed to initialize distributed runtime stats");
return;
}
};
let mut runtime_buckets = HashMap::new();
for bucket in buckets {
let manifest_bucket_dir = self.manifest_dir.join(&bucket.name);
let (object_count, total_size_bytes) = self
.scan_bucket_manifests(&bucket.name, &manifest_bucket_dir)
.await;
runtime_buckets.insert(
bucket.name,
RuntimeBucketStats {
object_count,
total_size_bytes,
creation_date: Some(bucket.creation_date),
},
);
}
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.replace_buckets(runtime_buckets);
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.snapshot(&self.storage_dir, self.storage_locations()))
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.bucket_summaries())
}
pub async fn get_cluster_health(&self) -> Result<ClusterHealth> {
let nodes = self.state.all_nodes().await;
let erasure_sets = self.state.erasure_sets().await;
let majority_healthy = self.state.has_majority().await;
let mut drive_manager = self.drive_manager.lock().await;
drive_manager.check_all_drives().await;
let drive_states = drive_manager.snapshot();
drop(drive_manager);
let peers = self.peer_health(&nodes);
let drives = self.drive_health(&drive_states, &erasure_sets);
let repairs = self.repair_health().await;
let quorum_healthy = majority_healthy && self.quorum_is_healthy(&nodes, &drive_states, &erasure_sets);
Ok(ClusterHealth {
enabled: true,
node_id: Some(self.state.local_node_id().to_string()),
quorum_healthy: Some(quorum_healthy),
majority_healthy: Some(majority_healthy),
peers: Some(peers),
drives: Some(drives),
erasure: Some(ClusterErasureHealth {
data_shards: self.erasure_config.data_shards,
parity_shards: self.erasure_config.parity_shards,
chunk_size_bytes: self.erasure_config.chunk_size_bytes,
total_shards: self.erasure_config.total_shards(),
read_quorum: self.erasure_config.read_quorum(),
write_quorum: self.erasure_config.write_quorum(),
erasure_set_count: erasure_sets.len(),
}),
repairs: Some(repairs),
})
}
// ============================
// Object operations
// ============================
@@ -114,6 +207,8 @@ impl DistributedStore {
return Err(crate::error::StorageError::no_such_bucket().into());
}
let previous_size = self.manifest_size_if_exists(bucket, key).await;
let erasure_set = self
.state
.get_erasure_set_for_object(bucket, key)
@@ -139,8 +234,7 @@ impl DistributedStore {
// Process complete chunks
while chunk_buffer.len() >= chunk_size {
let chunk_data: Vec<u8> =
chunk_buffer.drain(..chunk_size).collect();
let chunk_data: Vec<u8> = chunk_buffer.drain(..chunk_size).collect();
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
@@ -191,6 +285,8 @@ impl DistributedStore {
};
self.store_manifest(&manifest).await?;
self.track_object_upsert(bucket, previous_size, total_size)
.await;
Ok(PutResult { md5: md5_hex })
}
@@ -281,6 +377,7 @@ impl DistributedStore {
}
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
let existing_size = self.manifest_size_if_exists(bucket, key).await;
// Load manifest to find all shards
if let Ok(manifest) = self.load_manifest(bucket, key).await {
let local_id = self.state.local_node_id().to_string();
@@ -311,6 +408,7 @@ impl DistributedStore {
key,
chunk.chunk_index,
placement.shard_index,
placement.drive_id.parse::<u32>().unwrap_or(0),
)
.await
{
@@ -328,6 +426,7 @@ impl DistributedStore {
// Delete manifest
self.delete_manifest(bucket, key).await?;
self.track_object_deleted(bucket, existing_size).await;
Ok(())
}
@@ -351,6 +450,8 @@ impl DistributedStore {
src_manifest.metadata.clone()
};
let previous_size = self.manifest_size_if_exists(dest_bucket, dest_key).await;
// Read source object fully, then reconstruct
let mut full_data = Vec::new();
for chunk in &src_manifest.chunks {
@@ -414,6 +515,8 @@ impl DistributedStore {
};
self.store_manifest(&manifest).await?;
self.track_object_upsert(dest_bucket, previous_size, manifest.size)
.await;
Ok(CopyResult {
md5: md5_hex,
@@ -468,11 +571,7 @@ impl DistributedStore {
if !delimiter.is_empty() {
let remaining = &key[prefix.len()..];
if let Some(delim_idx) = remaining.find(delimiter) {
let cp = format!(
"{}{}",
prefix,
&remaining[..delim_idx + delimiter.len()]
);
let cp = format!("{}{}", prefix, &remaining[..delim_idx + delimiter.len()]);
if common_prefix_set.insert(cp.clone()) {
common_prefixes.push(cp);
}
@@ -560,6 +659,7 @@ impl DistributedStore {
// Also create manifest bucket dir
let manifest_bucket = self.manifest_dir.join(bucket);
fs::create_dir_all(&manifest_bucket).await?;
self.track_bucket_created(bucket).await;
Ok(())
}
@@ -578,6 +678,7 @@ impl DistributedStore {
}
let _ = fs::remove_dir_all(&bucket_path).await;
let _ = fs::remove_dir_all(&manifest_bucket).await;
self.track_bucket_deleted(bucket).await;
Ok(())
}
@@ -643,7 +744,10 @@ impl DistributedStore {
let mut hasher = Md5::new();
// Use upload_id + part_number as a unique key prefix for shard storage
let part_key = format!("{}/_multipart/{}/part-{}", session.key, upload_id, part_number);
let part_key = format!(
"{}/_multipart/{}/part-{}",
session.key, upload_id, part_number
);
let mut body = body;
loop {
@@ -655,8 +759,7 @@ impl DistributedStore {
chunk_buffer.extend_from_slice(&data);
while chunk_buffer.len() >= chunk_size {
let chunk_data: Vec<u8> =
chunk_buffer.drain(..chunk_size).collect();
let chunk_data: Vec<u8> = chunk_buffer.drain(..chunk_size).collect();
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
@@ -717,6 +820,9 @@ impl DistributedStore {
) -> Result<CompleteMultipartResult> {
let session = self.load_multipart_session(upload_id).await?;
let upload_dir = self.multipart_dir().join(upload_id);
let previous_size = self
.manifest_size_if_exists(&session.bucket, &session.key)
.await;
// Read per-part manifests and concatenate chunks sequentially
let mut all_chunks = Vec::new();
@@ -777,6 +883,8 @@ impl DistributedStore {
};
self.store_manifest(&manifest).await?;
self.track_object_upsert(&session.bucket, previous_size, manifest.size)
.await;
// Clean up multipart upload directory
let _ = fs::remove_dir_all(&upload_dir).await;
@@ -809,9 +917,10 @@ impl DistributedStore {
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if let Some(store) = self.local_shard_stores.get(
placement.drive_id.parse::<usize>().unwrap_or(0),
) {
if let Some(store) = self
.local_shard_stores
.get(placement.drive_id.parse::<usize>().unwrap_or(0))
{
let _ = store.delete_shard(&shard_id).await;
}
} else {
@@ -822,6 +931,7 @@ impl DistributedStore {
&part_info.part_key,
chunk.chunk_index,
placement.shard_index,
placement.drive_id.parse::<u32>().unwrap_or(0),
)
.await;
}
@@ -837,10 +947,7 @@ impl DistributedStore {
Ok(())
}
pub async fn list_multipart_uploads(
&self,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
let multipart_dir = self.multipart_dir();
if !multipart_dir.is_dir() {
return Ok(Vec::new());
@@ -883,6 +990,236 @@ impl DistributedStore {
Ok(serde_json::from_str(&content)?)
}
fn storage_locations(&self) -> Vec<StorageLocationSummary> {
self.drive_paths
.iter()
.map(|path| storage_location_summary(path))
.collect()
}
fn node_status_label(status: &NodeStatus) -> String {
match status {
NodeStatus::Online => "online".to_string(),
NodeStatus::Suspect => "suspect".to_string(),
NodeStatus::Offline => "offline".to_string(),
}
}
fn drive_status_label(status: &DriveStatus) -> String {
match status {
DriveStatus::Online => "online".to_string(),
DriveStatus::Degraded => "degraded".to_string(),
DriveStatus::Offline => "offline".to_string(),
DriveStatus::Healing => "healing".to_string(),
}
}
fn peer_health(&self, nodes: &[super::state::NodeState]) -> Vec<ClusterPeerHealth> {
let local_node_id = self.state.local_node_id();
let mut peers: Vec<ClusterPeerHealth> = nodes
.iter()
.filter(|node| node.info.node_id != local_node_id)
.map(|node| ClusterPeerHealth {
node_id: node.info.node_id.clone(),
status: Self::node_status_label(&node.status),
quic_address: Some(node.info.quic_addr.clone()),
s3_address: Some(node.info.s3_addr.clone()),
drive_count: Some(node.info.drive_count),
last_heartbeat: Some(node.last_heartbeat.timestamp_millis()),
missed_heartbeats: Some(node.missed_heartbeats),
})
.collect();
peers.sort_by(|a, b| a.node_id.cmp(&b.node_id));
peers
}
fn drive_health(&self, drive_states: &[DriveState], erasure_sets: &[ErasureSet]) -> Vec<ClusterDriveHealth> {
let local_node_id = self.state.local_node_id();
let mut drive_to_set = HashMap::new();
for erasure_set in erasure_sets {
for drive in &erasure_set.drives {
if drive.node_id == local_node_id {
drive_to_set.insert(drive.drive_index as usize, erasure_set.set_id);
}
}
}
drive_states
.iter()
.enumerate()
.map(|(index, drive)| ClusterDriveHealth {
index: index as u32,
path: drive.path.to_string_lossy().to_string(),
status: Self::drive_status_label(&drive.status),
total_bytes: (drive.stats.total_bytes > 0).then_some(drive.stats.total_bytes),
used_bytes: (drive.stats.total_bytes > 0).then_some(drive.stats.used_bytes),
available_bytes: (drive.stats.total_bytes > 0)
.then_some(drive.stats.available_bytes),
error_count: Some(drive.stats.error_count),
last_error: drive.stats.last_error.clone(),
last_check: Some(drive.stats.last_check.timestamp_millis()),
erasure_set_id: drive_to_set.get(&index).copied(),
})
.collect()
}
async fn repair_health(&self) -> ClusterRepairHealth {
let runtime_state = self.healing_runtime.read().await;
ClusterRepairHealth {
active: runtime_state.active,
scan_interval_ms: Some(runtime_state.scan_interval_ms),
last_run_started_at: runtime_state
.last_run_started_at
.as_ref()
.map(|timestamp| timestamp.timestamp_millis()),
last_run_completed_at: runtime_state
.last_run_completed_at
.as_ref()
.map(|timestamp| timestamp.timestamp_millis()),
last_duration_ms: runtime_state.last_duration_ms,
shards_checked: runtime_state
.last_stats
.as_ref()
.map(|stats| stats.shards_checked),
shards_healed: runtime_state
.last_stats
.as_ref()
.map(|stats| stats.shards_healed),
failed: runtime_state.last_stats.as_ref().map(|stats| stats.errors),
last_error: runtime_state.last_error.clone(),
}
}
fn quorum_is_healthy(
&self,
nodes: &[super::state::NodeState],
drive_states: &[DriveState],
erasure_sets: &[ErasureSet],
) -> bool {
if erasure_sets.is_empty() {
return false;
}
let local_node_id = self.state.local_node_id();
let node_statuses: HashMap<&str, &NodeStatus> = nodes
.iter()
.map(|node| (node.info.node_id.as_str(), &node.status))
.collect();
erasure_sets.iter().all(|erasure_set| {
let available = erasure_set
.drives
.iter()
.filter(|drive| {
if drive.node_id == local_node_id {
return drive_states
.get(drive.drive_index as usize)
.map(|drive_state| !matches!(drive_state.status, DriveStatus::Offline))
.unwrap_or(false);
}
matches!(node_statuses.get(drive.node_id.as_str()), Some(NodeStatus::Online))
})
.count();
available >= self.erasure_config.write_quorum()
})
}
async fn scan_bucket_manifests(
&self,
bucket: &str,
manifest_bucket_dir: &std::path::Path,
) -> (u64, u64) {
let mut object_count = 0u64;
let mut total_size_bytes = 0u64;
let mut directories = vec![manifest_bucket_dir.to_path_buf()];
while let Some(directory) = directories.pop() {
let mut entries = match fs::read_dir(&directory).await {
Ok(entries) => entries,
Err(_) => continue,
};
while let Ok(Some(entry)) = entries.next_entry().await {
let metadata = match entry.metadata().await {
Ok(metadata) => metadata,
Err(error) => {
tracing::warn!(bucket = bucket, error = %error, "Failed to read manifest entry metadata for runtime stats");
continue;
}
};
if metadata.is_dir() {
directories.push(entry.path());
continue;
}
let name = entry.file_name().to_string_lossy().to_string();
if !name.ends_with(".manifest.json") {
continue;
}
match fs::read_to_string(entry.path()).await {
Ok(content) => match serde_json::from_str::<ObjectManifest>(&content) {
Ok(manifest) => {
object_count += 1;
total_size_bytes += manifest.size;
}
Err(error) => {
tracing::warn!(bucket = bucket, error = %error, "Failed to parse manifest for runtime stats");
}
},
Err(error) => {
tracing::warn!(bucket = bucket, error = %error, "Failed to read manifest for runtime stats");
}
}
}
}
(object_count, total_size_bytes)
}
async fn bucket_creation_date(&self, bucket: &str) -> Option<DateTime<Utc>> {
let metadata = fs::metadata(self.buckets_dir.join(bucket)).await.ok()?;
let created_or_modified = metadata.created().unwrap_or(
metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
);
Some(created_or_modified.into())
}
async fn manifest_size_if_exists(&self, bucket: &str, key: &str) -> Option<u64> {
self.load_manifest(bucket, key)
.await
.ok()
.map(|manifest| manifest.size)
}
async fn track_bucket_created(&self, bucket: &str) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
}
async fn track_bucket_deleted(&self, bucket: &str) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_bucket(bucket);
}
async fn track_object_upsert(&self, bucket: &str, previous_size: Option<u64>, new_size: u64) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
runtime_stats.upsert_object(bucket, previous_size, new_size);
}
async fn track_object_deleted(&self, bucket: &str, existing_size: Option<u64>) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_object(bucket, existing_size);
}
// ============================
// Internal: erasure encode + distribute
// ============================
@@ -920,12 +1257,13 @@ impl DistributedStore {
let result = if drive.node_id == self.state.local_node_id() {
// Local write
if let Some(store) =
self.local_shard_stores.get(drive.drive_index as usize)
{
if let Some(store) = self.local_shard_stores.get(drive.drive_index as usize) {
store.write_shard(&shard_id, shard_data, checksum).await
} else {
Err(anyhow::anyhow!("Local drive {} not found", drive.drive_index))
Err(anyhow::anyhow!(
"Local drive {} not found",
drive.drive_index
))
}
} else {
// Remote write via QUIC
@@ -935,6 +1273,7 @@ impl DistributedStore {
key,
chunk_index,
shard_idx as u32,
drive.drive_index,
shard_data,
checksum,
)
@@ -994,6 +1333,7 @@ impl DistributedStore {
key: &str,
chunk_index: u32,
shard_index: u32,
drive_index: u32,
data: &[u8],
checksum: u32,
) -> Result<()> {
@@ -1012,6 +1352,7 @@ impl DistributedStore {
key: key.to_string(),
chunk_index,
shard_index,
drive_index,
shard_data_length: data.len() as u64,
checksum,
object_metadata: HashMap::new(),
@@ -1081,6 +1422,7 @@ impl DistributedStore {
key,
chunk.chunk_index,
placement.shard_index,
placement.drive_id.parse::<u32>().unwrap_or(0),
)
.await
.ok()
@@ -1112,6 +1454,7 @@ impl DistributedStore {
key: &str,
chunk_index: u32,
shard_index: u32,
drive_index: u32,
) -> Result<(Vec<u8>, u32)> {
let node_info = self
.state
@@ -1128,6 +1471,7 @@ impl DistributedStore {
key: key.to_string(),
chunk_index,
shard_index,
drive_index,
});
match self.transport.send_shard_read(&conn, &request).await? {
@@ -1143,6 +1487,7 @@ impl DistributedStore {
key: &str,
chunk_index: u32,
shard_index: u32,
drive_index: u32,
) -> Result<()> {
let node_info = self
.state
@@ -1159,6 +1504,7 @@ impl DistributedStore {
key: key.to_string(),
chunk_index,
shard_index,
drive_index,
});
let _response = self.transport.send_request(&conn, &request).await?;
+50 -6
View File
@@ -1,9 +1,9 @@
use super::config::DriveConfig;
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tokio::fs;
use super::config::DriveConfig;
// ============================
// Drive format (on-disk metadata)
@@ -33,6 +33,7 @@ pub enum DriveStatus {
#[derive(Debug, Clone)]
pub struct DriveStats {
pub total_bytes: u64,
pub available_bytes: u64,
pub used_bytes: u64,
pub avg_write_latency_us: u64,
pub avg_read_latency_us: u64,
@@ -45,6 +46,7 @@ impl Default for DriveStats {
fn default() -> Self {
Self {
total_bytes: 0,
available_bytes: 0,
used_bytes: 0,
avg_write_latency_us: 0,
avg_read_latency_us: 0,
@@ -55,7 +57,7 @@ impl Default for DriveStats {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DriveState {
pub path: PathBuf,
pub format: Option<DriveFormat>,
@@ -74,10 +76,15 @@ pub struct DriveManager {
impl DriveManager {
/// Initialize drive manager with configured drive paths.
pub async fn new(config: &DriveConfig) -> Result<Self> {
let mut drives = Vec::with_capacity(config.paths.len());
let paths: Vec<PathBuf> = config.paths.iter().map(PathBuf::from).collect();
Self::from_paths(&paths).await
}
for path_str in &config.paths {
let path = PathBuf::from(path_str);
/// Initialize drive manager from an explicit list of resolved paths.
pub async fn from_paths(paths: &[PathBuf]) -> Result<Self> {
let mut drives = Vec::with_capacity(paths.len());
for path in paths {
let storage_dir = path.join(".smartstorage");
// Ensure the drive directory exists
@@ -92,7 +99,7 @@ impl DriveManager {
};
drives.push(DriveState {
path,
path: path.clone(),
format,
status,
stats: DriveStats::default(),
@@ -154,6 +161,11 @@ impl DriveManager {
&self.drives
}
/// Get a cloneable snapshot of current drive states.
pub fn snapshot(&self) -> Vec<DriveState> {
self.drives.clone()
}
/// Get drives that are online.
pub fn online_drives(&self) -> Vec<usize> {
self.drives
@@ -203,6 +215,11 @@ impl DriveManager {
let _ = fs::remove_file(&probe_path).await;
let latency = start.elapsed();
if let Some((total_bytes, available_bytes, used_bytes)) = filesystem_usage(&drive.path) {
drive.stats.total_bytes = total_bytes;
drive.stats.available_bytes = available_bytes;
drive.stats.used_bytes = used_bytes;
}
drive.stats.avg_write_latency_us = latency.as_micros() as u64;
drive.stats.last_check = Utc::now();
@@ -240,3 +257,30 @@ impl DriveManager {
serde_json::from_str(&content).ok()
}
}
#[cfg(unix)]
fn filesystem_usage(path: &Path) -> Option<(u64, u64, u64)> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let path_bytes = path.as_os_str().as_bytes();
let c_path = CString::new(path_bytes).ok()?;
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 {
return None;
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let free_bytes = stat.f_bfree as u64 * block_size;
let used_bytes = total_bytes.saturating_sub(free_bytes);
Some((total_bytes, available_bytes, used_bytes))
}
#[cfg(not(unix))]
fn filesystem_usage(_path: &Path) -> Option<(u64, u64, u64)> {
None
}
+62 -2
View File
@@ -1,8 +1,10 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::sync::RwLock;
use super::config::ErasureConfig;
use super::erasure::ErasureCoder;
@@ -18,6 +20,7 @@ pub struct HealingService {
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval: Duration,
runtime_state: Arc<RwLock<HealingRuntimeState>>,
}
impl HealingService {
@@ -27,16 +30,27 @@ impl HealingService {
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval_hours: u64,
runtime_state: Arc<RwLock<HealingRuntimeState>>,
) -> Result<Self> {
let scan_interval = Duration::from_secs(scan_interval_hours * 3600);
if let Ok(mut state_guard) = runtime_state.try_write() {
state_guard.scan_interval_ms = scan_interval.as_millis() as u64;
}
Ok(Self {
state,
erasure_coder: ErasureCoder::new(erasure_config)?,
local_shard_stores,
manifest_dir,
scan_interval: Duration::from_secs(scan_interval_hours * 3600),
scan_interval,
runtime_state,
})
}
pub fn runtime_state(&self) -> Arc<RwLock<HealingRuntimeState>> {
self.runtime_state.clone()
}
/// Run the healing loop as a background task.
pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
let mut interval = tokio::time::interval(self.scan_interval);
@@ -47,9 +61,12 @@ impl HealingService {
loop {
tokio::select! {
_ = interval.tick() => {
let started_at = Utc::now();
self.mark_healing_started(started_at).await;
tracing::info!("Starting healing scan");
match self.heal_scan().await {
Ok(stats) => {
self.mark_healing_finished(started_at, Some(stats.clone()), None).await;
tracing::info!(
checked = stats.shards_checked,
healed = stats.shards_healed,
@@ -58,6 +75,7 @@ impl HealingService {
);
}
Err(e) => {
self.mark_healing_finished(started_at, None, Some(e.to_string())).await;
tracing::error!("Healing scan failed: {}", e);
}
}
@@ -70,6 +88,37 @@ impl HealingService {
}
}
async fn mark_healing_started(&self, started_at: DateTime<Utc>) {
let mut runtime_state = self.runtime_state.write().await;
runtime_state.active = true;
runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64;
runtime_state.last_run_started_at = Some(started_at);
runtime_state.last_error = None;
}
async fn mark_healing_finished(
&self,
started_at: DateTime<Utc>,
stats: Option<HealStats>,
last_error: Option<String>,
) {
let finished_at = Utc::now();
let mut runtime_state = self.runtime_state.write().await;
runtime_state.active = false;
runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64;
runtime_state.last_run_completed_at = Some(finished_at);
runtime_state.last_duration_ms = Some(
finished_at
.signed_duration_since(started_at)
.num_milliseconds()
.max(0) as u64,
);
if let Some(stats) = stats {
runtime_state.last_stats = Some(stats);
}
runtime_state.last_error = last_error;
}
/// Scan all manifests for shards on offline nodes, reconstruct and re-place them.
async fn heal_scan(&self) -> Result<HealStats> {
let mut stats = HealStats::default();
@@ -348,9 +397,20 @@ impl HealingService {
}
}
#[derive(Debug, Default)]
#[derive(Debug, Clone, Default)]
pub struct HealStats {
pub shards_checked: u64,
pub shards_healed: u64,
pub errors: u64,
}
#[derive(Debug, Clone, Default)]
pub struct HealingRuntimeState {
pub active: bool,
pub scan_interval_ms: u64,
pub last_run_started_at: Option<DateTime<Utc>>,
pub last_run_completed_at: Option<DateTime<Utc>>,
pub last_duration_ms: Option<u64>,
pub last_stats: Option<HealStats>,
pub last_error: Option<String>,
}
+31 -14
View File
@@ -18,6 +18,7 @@ pub struct MembershipManager {
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
heartbeat_interval: Duration,
heartbeat_timeout: Duration,
local_node_info: NodeInfo,
drive_manager: Option<Arc<Mutex<DriveManager>>>,
}
@@ -27,12 +28,14 @@ impl MembershipManager {
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
heartbeat_interval_ms: u64,
heartbeat_timeout_ms: u64,
local_node_info: NodeInfo,
) -> Self {
Self {
state,
transport,
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
heartbeat_timeout: Duration::from_millis(heartbeat_timeout_ms),
local_node_info,
drive_manager: None,
}
@@ -46,7 +49,7 @@ impl MembershipManager {
/// Join the cluster by contacting seed nodes.
/// Sends a JoinRequest to each seed node until one accepts.
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
pub async fn join_cluster(&self, seed_nodes: &[String], allow_bootstrap_on_failure: bool) -> Result<()> {
if seed_nodes.is_empty() {
tracing::info!("No seed nodes configured, starting as initial cluster node");
self.state.add_node(self.local_node_info.clone()).await;
@@ -75,10 +78,13 @@ impl MembershipManager {
}
}
// If no seed responded, start as a new cluster
tracing::info!("Could not reach any seed nodes, starting as initial cluster node");
self.state.add_node(self.local_node_info.clone()).await;
Ok(())
if allow_bootstrap_on_failure {
tracing::warn!("Could not reach any seed nodes, bootstrapping a new cluster because no persisted topology exists");
self.state.add_node(self.local_node_info.clone()).await;
return Ok(());
}
anyhow::bail!("Could not reach any configured seed nodes; refusing unsafe cluster bootstrap")
}
async fn try_join(&self, addr: SocketAddr) -> Result<()> {
@@ -97,9 +103,14 @@ impl MembershipManager {
ClusterResponse::JoinResponse(join_resp) => {
if join_resp.accepted {
if let Some(topology) = &join_resp.topology {
let topology_contains_self = topology
.nodes
.iter()
.any(|node| node.node_id == self.local_node_info.node_id);
self.state.apply_topology(topology).await;
// Also register self
self.state.add_node(self.local_node_info.clone()).await;
if !topology_contains_self {
self.state.add_node(self.local_node_info.clone()).await;
}
tracing::info!(
"Applied cluster topology (version {}, {} nodes, {} erasure sets)",
topology.version,
@@ -137,7 +148,13 @@ impl MembershipManager {
}
async fn send_heartbeats(&self) {
let peers = self.state.online_peers().await;
let peers = self
.state
.all_nodes()
.await
.into_iter()
.filter(|node| node.info.node_id != self.local_node_info.node_id)
.collect::<Vec<_>>();
let topology_version = self.state.version().await;
let mut responded = Vec::new();
@@ -145,7 +162,7 @@ impl MembershipManager {
let drive_states = self.collect_drive_states().await;
for peer in &peers {
let addr: SocketAddr = match peer.quic_addr.parse() {
let addr: SocketAddr = match peer.info.quic_addr.parse() {
Ok(a) => a,
Err(_) => continue,
};
@@ -158,23 +175,23 @@ impl MembershipManager {
});
match tokio::time::timeout(
Duration::from_secs(5),
self.send_heartbeat_to_peer(&peer.node_id, addr, &heartbeat),
self.heartbeat_timeout,
self.send_heartbeat_to_peer(&peer.info.node_id, addr, &heartbeat),
)
.await
{
Ok(Ok(())) => {
responded.push(peer.node_id.clone());
responded.push(peer.info.node_id.clone());
}
Ok(Err(e)) => {
tracing::debug!(
peer = %peer.node_id,
peer = %peer.info.node_id,
error = %e,
"Heartbeat failed"
);
}
Err(_) => {
tracing::debug!(peer = %peer.node_id, "Heartbeat timed out");
tracing::debug!(peer = %peer.info.node_id, "Heartbeat timed out");
}
}
}
+1
View File
@@ -9,6 +9,7 @@ pub mod erasure;
pub mod healing;
pub mod membership;
pub mod metadata;
pub mod persistence;
pub mod placement;
pub mod protocol;
pub mod quic_transport;
+77
View File
@@ -0,0 +1,77 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tokio::fs;
use super::protocol::ClusterTopology;
const CLUSTER_METADATA_DIR: &str = ".smartstorage/cluster";
const IDENTITY_FILE: &str = "identity.json";
const TOPOLOGY_FILE: &str = "topology.json";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIdentity {
pub schema_version: u32,
pub node_id: String,
pub cluster_id: String,
}
impl ClusterIdentity {
pub fn new(node_id: String, cluster_id: String) -> Self {
Self {
schema_version: 1,
node_id,
cluster_id,
}
}
}
pub fn cluster_metadata_dir(storage_directory: &str) -> PathBuf {
PathBuf::from(storage_directory).join(CLUSTER_METADATA_DIR)
}
pub fn identity_path(metadata_dir: &Path) -> PathBuf {
metadata_dir.join(IDENTITY_FILE)
}
pub fn topology_path(metadata_dir: &Path) -> PathBuf {
metadata_dir.join(TOPOLOGY_FILE)
}
pub async fn load_identity(path: &Path) -> Result<Option<ClusterIdentity>> {
match fs::read_to_string(path).await {
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(error.into()),
}
}
pub async fn persist_identity(path: &Path, identity: &ClusterIdentity) -> Result<()> {
write_json_atomic(path, identity).await
}
pub async fn load_topology(path: &Path) -> Result<Option<ClusterTopology>> {
match fs::read_to_string(path).await {
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(error.into()),
}
}
pub async fn persist_topology(path: &Path, topology: &ClusterTopology) -> Result<()> {
write_json_atomic(path, topology).await
}
async fn write_json_atomic<T: Serialize>(path: &Path, value: &T) -> Result<()> {
let parent = path
.parent()
.ok_or_else(|| anyhow::anyhow!("Cluster metadata path has no parent"))?;
fs::create_dir_all(parent).await?;
let temp_path = path.with_extension("json.tmp");
let content = serde_json::to_string_pretty(value)?;
fs::write(&temp_path, content).await?;
fs::rename(&temp_path, path).await?;
Ok(())
}
+4
View File
@@ -102,6 +102,7 @@ pub struct ShardWriteRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
pub shard_data_length: u64,
pub checksum: u32, // crc32c of shard data
pub object_metadata: HashMap<String, String>,
@@ -121,6 +122,7 @@ pub struct ShardReadRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -139,6 +141,7 @@ pub struct ShardDeleteRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -154,6 +157,7 @@ pub struct ShardHeadRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
+222 -22
View File
@@ -8,6 +8,7 @@ use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
};
use super::shard_store::{ShardId, ShardStore};
use super::state::{ClusterState, NodeStatus};
/// QUIC transport layer for inter-node communication.
///
@@ -26,11 +27,8 @@ impl QuicTransport {
pub async fn new(bind_addr: SocketAddr, local_node_id: String) -> Result<Self> {
let (server_config, client_config) = Self::generate_tls_configs()?;
let endpoint = Endpoint::server(server_config, bind_addr)?;
// Also configure the endpoint for client connections
let mut endpoint_client = endpoint.clone();
endpoint_client.set_default_client_config(client_config);
let mut endpoint = Endpoint::server(server_config, bind_addr)?;
endpoint.set_default_client_config(client_config);
Ok(Self {
endpoint,
@@ -163,7 +161,8 @@ impl QuicTransport {
/// Accept incoming connections and dispatch to the handler.
pub async fn accept_loop(
self: Arc<Self>,
shard_store: Arc<ShardStore>,
shard_stores: Vec<Arc<ShardStore>>,
cluster_state: Option<Arc<ClusterState>>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) {
loop {
@@ -172,11 +171,12 @@ impl QuicTransport {
match incoming {
Some(incoming_conn) => {
let transport = self.clone();
let store = shard_store.clone();
let stores = shard_stores.clone();
let state = cluster_state.clone();
tokio::spawn(async move {
match incoming_conn.await {
Ok(conn) => {
transport.handle_connection(conn, store).await;
transport.handle_connection(conn, stores, state).await;
}
Err(e) => {
tracing::error!("Failed to accept QUIC connection: {}", e);
@@ -194,16 +194,19 @@ impl QuicTransport {
/// Handle a single QUIC connection (may have multiple streams).
async fn handle_connection(
&self,
self: Arc<Self>,
conn: quinn::Connection,
shard_store: Arc<ShardStore>,
shard_stores: Vec<Arc<ShardStore>>,
cluster_state: Option<Arc<ClusterState>>,
) {
loop {
match conn.accept_bi().await {
Ok((send, recv)) => {
let store = shard_store.clone();
let stores = shard_stores.clone();
let state = cluster_state.clone();
let transport = self.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_stream(send, recv, store).await {
if let Err(e) = transport.handle_stream(send, recv, stores, state).await {
tracing::error!("Stream handler error: {}", e);
}
});
@@ -219,9 +222,11 @@ impl QuicTransport {
/// Handle a single bidirectional stream (one request-response exchange).
async fn handle_stream(
self: Arc<Self>,
mut send: quinn::SendStream,
mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>,
shard_stores: Vec<Arc<ShardStore>>,
cluster_state: Option<Arc<ClusterState>>,
) -> Result<()> {
// Read the full request (length-prefixed bincode + optional trailing data)
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
@@ -231,6 +236,7 @@ impl QuicTransport {
ClusterRequest::ShardWrite(write_req) => {
// Shard data follows the header in the raw buffer
let shard_data = &raw[header_len..];
let drive_index = write_req.drive_index;
let shard_id = ShardId {
bucket: write_req.bucket,
@@ -239,9 +245,10 @@ impl QuicTransport {
shard_index: write_req.shard_index,
};
let result = shard_store
.write_shard(&shard_id, &shard_data, write_req.checksum)
.await;
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store.write_shard(&shard_id, &shard_data, write_req.checksum).await,
Err(error) => Err(error),
};
let ack = ShardWriteAck {
request_id: write_req.request_id,
@@ -254,6 +261,7 @@ impl QuicTransport {
}
ClusterRequest::ShardRead(read_req) => {
let drive_index = read_req.drive_index;
let shard_id = ShardId {
bucket: read_req.bucket,
key: read_req.key,
@@ -261,7 +269,15 @@ impl QuicTransport {
shard_index: read_req.shard_index,
};
match shard_store.read_shard(&shard_id).await {
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store,
Err(error) => {
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
return Ok(());
}
};
match store.read_shard(&shard_id).await {
Ok((data, checksum)) => {
let header = ShardReadResponse {
request_id: read_req.request_id,
@@ -293,13 +309,17 @@ impl QuicTransport {
}
ClusterRequest::ShardDelete(del_req) => {
let drive_index = del_req.drive_index;
let shard_id = ShardId {
bucket: del_req.bucket,
key: del_req.key,
chunk_index: del_req.chunk_index,
shard_index: del_req.shard_index,
};
let result = shard_store.delete_shard(&shard_id).await;
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store.delete_shard(&shard_id).await,
Err(error) => Err(error),
};
let ack = protocol::ClusterResponse::ShardDeleteAck(protocol::ShardDeleteAck {
request_id: del_req.request_id,
success: result.is_ok(),
@@ -310,13 +330,22 @@ impl QuicTransport {
}
ClusterRequest::ShardHead(head_req) => {
let drive_index = head_req.drive_index;
let shard_id = ShardId {
bucket: head_req.bucket,
key: head_req.key,
chunk_index: head_req.chunk_index,
shard_index: head_req.shard_index,
};
let resp = match shard_store.head_shard(&shard_id).await {
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store,
Err(error) => {
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
return Ok(());
}
};
let resp = match store.head_shard(&shard_id).await {
Ok(Some(meta)) => protocol::ShardHeadResponse {
request_id: head_req.request_id,
found: true,
@@ -336,9 +365,103 @@ impl QuicTransport {
send.finish()?;
}
// Heartbeat, Join, TopologySync, Heal, and Manifest operations
// will be handled by the membership and coordinator modules.
// For now, send a generic ack.
ClusterRequest::JoinRequest(join_req) => {
let Some(state) = cluster_state else {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "ClusterDisabled".to_string(),
message: "Cluster state is not available".to_string(),
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
return Ok(());
};
let joining_node_id = join_req.node_info.node_id.clone();
state.add_node(join_req.node_info).await;
let topology = state.to_topology().await;
let node_drives: Vec<(String, u32)> = topology
.nodes
.iter()
.map(|node| (node.node_id.clone(), node.drive_count))
.collect();
let erasure_sets = super::placement::form_erasure_sets(
&node_drives,
topology.data_shards + topology.parity_shards,
);
state.set_erasure_sets(erasure_sets).await;
let response_topology = state.to_topology().await;
let response = protocol::encode_response(&ClusterResponse::JoinResponse(
protocol::JoinResponseMessage {
accepted: true,
topology: Some(response_topology.clone()),
error: None,
},
))?;
send.write_all(&response).await?;
send.finish()?;
self.broadcast_topology(&state, Some(response_topology), None, Some(&joining_node_id)).await;
}
ClusterRequest::Heartbeat(heartbeat) => {
let Some(state) = cluster_state else {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "ClusterDisabled".to_string(),
message: "Cluster state is not available".to_string(),
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
return Ok(());
};
let peer_node_id = heartbeat.node_id.clone();
let peer_topology_version = heartbeat.topology_version;
state.record_heartbeat(&heartbeat.node_id).await;
let local_topology_version = state.version().await;
let response = protocol::encode_response(&ClusterResponse::HeartbeatAck(
protocol::HeartbeatAckMessage {
node_id: state.local_node_id().to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
topology_version: local_topology_version,
},
))?;
send.write_all(&response).await?;
send.finish()?;
if local_topology_version > peer_topology_version {
self.broadcast_topology(&state, None, Some(&peer_node_id), None).await;
}
}
ClusterRequest::TopologySync(sync) => {
let Some(state) = cluster_state else {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "ClusterDisabled".to_string(),
message: "Cluster state is not available".to_string(),
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
return Ok(());
};
state.apply_topology(&sync.topology).await;
let response = protocol::encode_response(&ClusterResponse::TopologySyncAck(
protocol::TopologySyncAckMessage {
accepted: true,
current_version: state.version().await,
},
))?;
send.write_all(&response).await?;
send.finish()?;
}
_ => {
let err = protocol::ErrorResponse {
request_id: String::new(),
@@ -354,6 +477,83 @@ impl QuicTransport {
Ok(())
}
fn shard_store_for_drive(
shard_stores: &[Arc<ShardStore>],
drive_index: u32,
) -> Result<Arc<ShardStore>> {
shard_stores
.get(drive_index as usize)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Drive {} not found", drive_index))
}
async fn send_error_response(
send: &mut quinn::SendStream,
code: &str,
message: String,
) -> Result<()> {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: code.to_string(),
message,
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
Ok(())
}
async fn broadcast_topology(
&self,
state: &Arc<ClusterState>,
topology: Option<protocol::ClusterTopology>,
target_node_id: Option<&str>,
skip_node_id: Option<&str>,
) {
let topology = match topology {
Some(topology) => topology,
None => state.to_topology().await,
};
let nodes = state.all_nodes().await;
for node in nodes {
if node.info.node_id == state.local_node_id() {
continue;
}
if let Some(target_node_id) = target_node_id {
if node.info.node_id != target_node_id {
continue;
}
}
if matches!(skip_node_id, Some(skip_node_id) if node.info.node_id == skip_node_id) {
continue;
}
if node.status != NodeStatus::Online {
continue;
}
let addr = match node.info.quic_addr.parse() {
Ok(addr) => addr,
Err(error) => {
tracing::warn!(node = %node.info.node_id, error = %error, "Skipping topology sync for invalid peer address");
continue;
}
};
let conn = match self.get_connection(&node.info.node_id, addr).await {
Ok(conn) => conn,
Err(error) => {
tracing::warn!(node = %node.info.node_id, error = %error, "Failed to connect for topology sync");
continue;
}
};
let request = ClusterRequest::TopologySync(protocol::TopologySyncMessage {
topology: topology.clone(),
});
if let Err(error) = self.send_request(&conn, &request).await {
tracing::warn!(node = %node.info.node_id, error = %error, "Failed to send topology sync");
}
}
}
/// Generate self-signed TLS certificates for cluster-internal communication.
fn generate_tls_configs() -> Result<(QuinnServerConfig, ClientConfig)> {
// Generate self-signed certificate
+102 -50
View File
@@ -1,8 +1,10 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use super::placement::{DriveLocation, ErasureSet};
use super::persistence;
use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo};
/// Node status for tracking liveness.
@@ -26,6 +28,7 @@ pub struct NodeState {
pub struct ClusterState {
inner: Arc<RwLock<ClusterStateInner>>,
local_node_id: String,
topology_path: Option<PathBuf>,
}
struct ClusterStateInner {
@@ -43,6 +46,7 @@ impl ClusterState {
cluster_id: String,
data_shards: usize,
parity_shards: usize,
topology_path: Option<PathBuf>,
) -> Self {
Self {
inner: Arc::new(RwLock::new(ClusterStateInner {
@@ -54,6 +58,7 @@ impl ClusterState {
parity_shards,
})),
local_node_id,
topology_path,
}
}
@@ -61,27 +66,37 @@ impl ClusterState {
&self.local_node_id
}
pub async fn cluster_id(&self) -> String {
self.inner.read().await.cluster_id.clone()
}
/// Register a node in the cluster.
pub async fn add_node(&self, info: NodeInfo) {
let mut inner = self.inner.write().await;
let node_id = info.node_id.clone();
inner.nodes.insert(
node_id,
NodeState {
info,
status: NodeStatus::Online,
missed_heartbeats: 0,
last_heartbeat: chrono::Utc::now(),
},
);
inner.version += 1;
{
let mut inner = self.inner.write().await;
let node_id = info.node_id.clone();
inner.nodes.insert(
node_id,
NodeState {
info,
status: NodeStatus::Online,
missed_heartbeats: 0,
last_heartbeat: chrono::Utc::now(),
},
);
inner.version += 1;
}
self.persist_topology_snapshot().await;
}
/// Remove a node from the cluster.
pub async fn remove_node(&self, node_id: &str) {
let mut inner = self.inner.write().await;
inner.nodes.remove(node_id);
inner.version += 1;
{
let mut inner = self.inner.write().await;
inner.nodes.remove(node_id);
inner.version += 1;
}
self.persist_topology_snapshot().await;
}
/// Update heartbeat for a node (reset missed count).
@@ -133,9 +148,12 @@ impl ClusterState {
/// Set erasure sets (typically done once during cluster formation).
pub async fn set_erasure_sets(&self, sets: Vec<ErasureSet>) {
let mut inner = self.inner.write().await;
inner.erasure_sets = sets;
inner.version += 1;
{
let mut inner = self.inner.write().await;
inner.erasure_sets = sets;
inner.version += 1;
}
self.persist_topology_snapshot().await;
}
/// Get the erasure set for a given object based on consistent hashing.
@@ -244,48 +262,82 @@ impl ClusterState {
/// Import topology from a protocol message (e.g., received from a peer during join).
pub async fn apply_topology(&self, topology: &ClusterTopology) {
let mut inner = self.inner.write().await;
let applied = {
let mut inner = self.inner.write().await;
// Only apply if newer
if topology.version <= inner.version {
return;
}
// Only apply if newer and from the same cluster lineage. A node that has not yet
// joined any topology may adopt the seed cluster ID during its first join.
if topology.version <= inner.version {
return;
}
if topology.cluster_id != inner.cluster_id {
if inner.nodes.is_empty() {
inner.cluster_id = topology.cluster_id.clone();
} else {
return;
}
}
inner.cluster_id = topology.cluster_id.clone();
inner.version = topology.version;
inner.data_shards = topology.data_shards;
inner.parity_shards = topology.parity_shards;
inner.version = topology.version;
inner.data_shards = topology.data_shards;
inner.parity_shards = topology.parity_shards;
// Update nodes
for node_info in &topology.nodes {
if !inner.nodes.contains_key(&node_info.node_id) {
let now = chrono::Utc::now();
for node_info in &topology.nodes {
let existing_status = inner.nodes.get(&node_info.node_id).map(|node| node.status.clone());
let existing_missed_heartbeats = inner
.nodes
.get(&node_info.node_id)
.map(|node| node.missed_heartbeats);
let existing_last_heartbeat = inner
.nodes
.get(&node_info.node_id)
.map(|node| node.last_heartbeat);
inner.nodes.insert(
node_info.node_id.clone(),
NodeState {
info: node_info.clone(),
status: NodeStatus::Online,
missed_heartbeats: 0,
last_heartbeat: chrono::Utc::now(),
status: existing_status.unwrap_or(NodeStatus::Online),
missed_heartbeats: existing_missed_heartbeats.unwrap_or(0),
last_heartbeat: existing_last_heartbeat.unwrap_or(now),
},
);
}
}
// Update erasure sets
inner.erasure_sets = topology
.erasure_sets
.iter()
.map(|set| ErasureSet {
set_id: set.set_id,
drives: set
.drives
.iter()
.map(|d| DriveLocation {
node_id: d.node_id.clone(),
drive_index: d.drive_index,
})
.collect(),
})
.collect();
inner.nodes.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
// Update erasure sets
inner.erasure_sets = topology
.erasure_sets
.iter()
.map(|set| ErasureSet {
set_id: set.set_id,
drives: set
.drives
.iter()
.map(|d| DriveLocation {
node_id: d.node_id.clone(),
drive_index: d.drive_index,
})
.collect(),
})
.collect();
true
};
if applied {
self.persist_topology_snapshot().await;
}
}
async fn persist_topology_snapshot(&self) {
let Some(path) = &self.topology_path else {
return;
};
let topology = self.to_topology().await;
if let Err(error) = persistence::persist_topology(path, &topology).await {
tracing::warn!(error = %error, "Failed to persist cluster topology snapshot");
}
}
}
+105
View File
@@ -4,6 +4,7 @@ use serde_json::Value;
use std::io::Write;
use tokio::io::{AsyncBufReadExt, BufReader};
use crate::config::Credential;
use crate::config::SmartStorageConfig;
use crate::server::StorageServer;
@@ -140,6 +141,110 @@ pub async fn management_loop() -> Result<()> {
}
}
}
"getStorageStats" => {
if let Some(ref s) = server {
match s.store().get_storage_stats().await {
Ok(stats) => match serde_json::to_value(stats) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize storage stats: {}", error),
);
}
},
Err(error) => {
send_error(id, format!("Failed to get storage stats: {}", error));
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"listBucketSummaries" => {
if let Some(ref s) = server {
match s.store().list_bucket_summaries().await {
Ok(summaries) => match serde_json::to_value(summaries) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize bucket summaries: {}", error),
);
}
},
Err(error) => {
send_error(id, format!("Failed to list bucket summaries: {}", error));
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"listCredentials" => {
if let Some(ref s) = server {
match serde_json::to_value(s.list_credentials().await) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize credentials: {}", error),
);
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"replaceCredentials" => {
#[derive(Deserialize)]
struct ReplaceCredentialsParams {
credentials: Vec<Credential>,
}
match serde_json::from_value::<ReplaceCredentialsParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s.replace_credentials(params.credentials).await {
Ok(()) => {
send_response(id, serde_json::json!({}));
}
Err(error) => {
send_error(
id,
format!("Failed to replace credentials: {}", error),
);
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid replaceCredentials params: {}", error));
}
}
}
"getClusterHealth" => {
if let Some(ref s) = server {
match s.store().get_cluster_health().await {
Ok(health) => match serde_json::to_value(health) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize cluster health: {}", error),
);
}
},
Err(error) => {
send_error(id, format!("Failed to get cluster health: {}", error));
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"clusterStatus" => {
send_response(
id,
+291 -33
View File
@@ -11,6 +11,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
@@ -27,6 +28,7 @@ use crate::cluster::coordinator::DistributedStore;
use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager;
use crate::cluster::persistence::{self, ClusterIdentity};
use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo;
use crate::cluster::quic_transport::QuicTransport;
@@ -35,17 +37,46 @@ use crate::cluster::state::ClusterState;
use crate::storage::{FileStore, StorageBackend};
use crate::xml_response;
struct ServerMetrics {
started_at: chrono::DateTime<chrono::Utc>,
total_requests: AtomicU64,
error_responses: AtomicU64,
}
impl ServerMetrics {
fn new() -> Self {
Self {
started_at: chrono::Utc::now(),
total_requests: AtomicU64::new(0),
error_responses: AtomicU64::new(0),
}
}
fn record_response(&self, status: StatusCode) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
if status.as_u16() >= 400 {
self.error_responses.fetch_add(1, Ordering::Relaxed);
}
}
}
pub struct StorageServer {
store: Arc<StorageBackend>,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
shutdown_tx: watch::Sender<bool>,
cluster_shutdown_txs: Vec<watch::Sender<bool>>,
server_handle: tokio::task::JoinHandle<()>,
}
impl StorageServer {
pub async fn start(config: SmartStorageConfig) -> Result<Self> {
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
let mut cluster_shutdown_txs = Vec::new();
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
if cluster_config.enabled {
Self::start_clustered(&config, cluster_config).await?
let (store, shutdown_txs) = Self::start_clustered(&config, cluster_config).await?;
cluster_shutdown_txs = shutdown_txs;
store
} else {
Self::start_standalone(&config).await?
}
@@ -65,7 +96,9 @@ impl StorageServer {
let server_store = store.clone();
let server_config = config.clone();
let server_auth_runtime = auth_runtime.clone();
let server_policy_store = policy_store.clone();
let server_metrics = Arc::new(ServerMetrics::new());
let server_handle = tokio::spawn(async move {
loop {
@@ -78,15 +111,19 @@ impl StorageServer {
let io = TokioIo::new(stream);
let store = server_store.clone();
let cfg = server_config.clone();
let auth_runtime = server_auth_runtime.clone();
let ps = server_policy_store.clone();
let metrics = server_metrics.clone();
tokio::spawn(async move {
let svc = service_fn(move |req: Request<Incoming>| {
let store = store.clone();
let cfg = cfg.clone();
let auth_runtime = auth_runtime.clone();
let ps = ps.clone();
let metrics = metrics.clone();
async move {
handle_request(req, store, cfg, ps).await
handle_request(req, store, cfg, auth_runtime, ps, metrics).await
}
});
@@ -119,12 +156,17 @@ impl StorageServer {
Ok(Self {
store,
auth_runtime,
shutdown_tx,
cluster_shutdown_txs,
server_handle,
})
}
pub async fn stop(self) {
for shutdown_tx in &self.cluster_shutdown_txs {
let _ = shutdown_tx.send(true);
}
let _ = self.shutdown_tx.send(true);
let _ = self.server_handle.await;
}
@@ -133,6 +175,17 @@ impl StorageServer {
&self.store
}
pub async fn list_credentials(&self) -> Vec<crate::auth::CredentialMetadata> {
self.auth_runtime.list_credentials().await
}
pub async fn replace_credentials(
&self,
credentials: Vec<crate::config::Credential>,
) -> Result<(), StorageError> {
self.auth_runtime.replace_credentials(credentials).await
}
async fn start_standalone(config: &SmartStorageConfig) -> Result<Arc<StorageBackend>> {
let store = Arc::new(StorageBackend::Standalone(
FileStore::new(config.storage.directory.clone().into()),
@@ -148,12 +201,37 @@ impl StorageServer {
async fn start_clustered(
config: &SmartStorageConfig,
cluster_config: &crate::cluster::config::ClusterConfig,
) -> Result<Arc<StorageBackend>> {
) -> Result<(Arc<StorageBackend>, Vec<watch::Sender<bool>>)> {
let erasure_config = cluster_config.erasure.clone();
let cluster_metadata_dir = persistence::cluster_metadata_dir(&config.storage.directory);
let identity_path = persistence::identity_path(&cluster_metadata_dir);
let topology_path = persistence::topology_path(&cluster_metadata_dir);
let persisted_identity = persistence::load_identity(&identity_path).await?;
if let (Some(configured_node_id), Some(identity)) = (&cluster_config.node_id, &persisted_identity) {
if configured_node_id != &identity.node_id {
anyhow::bail!(
"Configured cluster node ID '{}' conflicts with persisted node ID '{}'",
configured_node_id,
identity.node_id
);
}
}
let node_id = cluster_config
.node_id
.clone()
.or_else(|| persisted_identity.as_ref().map(|identity| identity.node_id.clone()))
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let cluster_id = persisted_identity
.as_ref()
.map(|identity| identity.cluster_id.clone())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
persistence::persist_identity(
&identity_path,
&ClusterIdentity::new(node_id.clone(), cluster_id.clone()),
)
.await?;
// Determine drive paths
let drive_paths: Vec<std::path::PathBuf> = if cluster_config.drives.paths.is_empty() {
@@ -185,28 +263,37 @@ impl StorageServer {
// Initialize cluster state
let cluster_state = Arc::new(ClusterState::new(
node_id.clone(),
uuid::Uuid::new_v4().to_string(),
cluster_id.clone(),
erasure_config.data_shards,
erasure_config.parity_shards,
Some(topology_path.clone()),
));
// Form erasure sets from local drives (single-node for now)
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
let erasure_sets =
placement::form_erasure_sets(&nodes, erasure_config.total_shards());
let persisted_topology = persistence::load_topology(&topology_path).await?;
let has_persisted_topology = persisted_topology.is_some();
if let Some(topology) = persisted_topology {
if topology.cluster_id != cluster_id {
anyhow::bail!("Persisted topology cluster ID does not match persisted node identity");
}
cluster_state.apply_topology(&topology).await;
} else if cluster_config.seed_nodes.is_empty() {
// Form erasure sets from local drives for a first node bootstrap.
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
let erasure_sets = placement::form_erasure_sets(&nodes, erasure_config.total_shards());
if erasure_sets.is_empty() {
tracing::warn!(
"Not enough drives ({}) for erasure set size ({}). \
Need at least {} drives.",
drive_paths.len(),
erasure_config.total_shards(),
erasure_config.total_shards(),
);
if erasure_sets.is_empty() {
tracing::warn!(
"Not enough drives ({}) for erasure set size ({}). \
Need at least {} drives.",
drive_paths.len(),
erasure_config.total_shards(),
erasure_config.total_shards(),
);
}
cluster_state.set_erasure_sets(erasure_sets).await;
}
cluster_state.set_erasure_sets(erasure_sets).await;
// Register self as a node
let local_node_info = NodeInfo {
node_id: node_id.clone(),
@@ -216,11 +303,9 @@ impl StorageServer {
status: "online".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
};
cluster_state.add_node(local_node_info.clone()).await;
// Initialize drive manager for health monitoring
let drive_manager = Arc::new(tokio::sync::Mutex::new(
DriveManager::new(&cluster_config.drives).await?,
DriveManager::from_paths(&drive_paths).await?,
));
// Join cluster if seed nodes are configured
@@ -229,13 +314,25 @@ impl StorageServer {
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
cluster_config.heartbeat_timeout_ms,
local_node_info,
)
.with_drive_manager(drive_manager),
.with_drive_manager(drive_manager.clone()),
);
membership
.join_cluster(&cluster_config.seed_nodes)
.join_cluster(
&cluster_config.seed_nodes,
cluster_config.seed_nodes.is_empty() && !has_persisted_topology,
)
.await?;
let final_cluster_id = cluster_state.cluster_id().await;
if final_cluster_id != cluster_id {
persistence::persist_identity(
&identity_path,
&ClusterIdentity::new(node_id.clone(), final_cluster_id),
)
.await?;
}
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
@@ -244,31 +341,36 @@ impl StorageServer {
.collect();
// Start QUIC accept loop for incoming connections
let shard_store_for_accept = local_shard_stores[0].clone();
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone();
let cluster_state_for_accept = cluster_state.clone();
let shard_stores_for_accept = local_shard_stores.clone();
tokio::spawn(async move {
transport_clone
.accept_loop(shard_store_for_accept, quic_shutdown_rx)
.accept_loop(shard_stores_for_accept, Some(cluster_state_for_accept), quic_shutdown_rx)
.await;
});
// Start heartbeat loop
let membership_clone = membership.clone();
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
});
// Start healing service
let healing_runtime = Arc::new(tokio::sync::RwLock::new(
crate::cluster::healing::HealingRuntimeState::default(),
));
let healing_service = HealingService::new(
cluster_state.clone(),
&erasure_config,
local_shard_stores.clone(),
manifest_dir.clone(),
24, // scan every 24 hours
healing_runtime.clone(),
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
let (heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
@@ -278,11 +380,16 @@ impl StorageServer {
cluster_state,
transport,
erasure_config,
std::path::PathBuf::from(&config.storage.directory),
drive_paths,
drive_manager,
healing_runtime,
manifest_dir,
buckets_dir,
)?;
distributed_store.initialize_runtime_stats().await;
let store = Arc::new(StorageBackend::Clustered(distributed_store));
if !config.server.silent {
@@ -293,7 +400,7 @@ impl StorageServer {
);
}
Ok(store)
Ok((store, vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx]))
}
}
@@ -375,11 +482,31 @@ fn storage_error_response(err: &StorageError, request_id: &str) -> Response<BoxB
.unwrap()
}
fn json_response(status: StatusCode, value: serde_json::Value, request_id: &str) -> Response<BoxBody> {
Response::builder()
.status(status)
.header("content-type", "application/json")
.header("x-amz-request-id", request_id)
.body(full_body(value.to_string()))
.unwrap()
}
fn text_response(status: StatusCode, content_type: &str, body: String, request_id: &str) -> Response<BoxBody> {
Response::builder()
.status(status)
.header("content-type", content_type)
.header("x-amz-request-id", request_id)
.body(full_body(body))
.unwrap()
}
async fn handle_request(
req: Request<Incoming>,
store: Arc<StorageBackend>,
config: SmartStorageConfig,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
policy_store: Arc<PolicyStore>,
metrics: Arc<ServerMetrics>,
) -> Result<Response<BoxBody>, std::convert::Infallible> {
let request_id = Uuid::new_v4().to_string();
let method = req.method().clone();
@@ -389,6 +516,23 @@ async fn handle_request(
// Handle CORS preflight
if config.cors.enabled && method == Method::OPTIONS {
let resp = build_cors_preflight(&config, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
if method == Method::GET && uri.path().starts_with("/-/") {
let resp = match handle_operational_request(uri.path(), store, &config, &metrics, &request_id).await {
Ok(resp) => resp,
Err(error) => {
tracing::error!(error = %error, "Operational endpoint failed");
json_response(
StatusCode::INTERNAL_SERVER_ERROR,
serde_json::json!({ "ok": false, "error": error.to_string() }),
&request_id,
)
}
};
metrics.record_response(resp.status());
return Ok(resp);
}
@@ -396,7 +540,7 @@ async fn handle_request(
let request_ctx = action::resolve_action(&req);
// Step 2: Auth + policy pipeline
if config.auth.enabled {
if auth_runtime.enabled() {
// Attempt authentication
let identity = {
let has_auth_header = req
@@ -407,11 +551,14 @@ async fn handle_request(
.unwrap_or(false);
if has_auth_header {
match auth::verify_request(&req, &config) {
let credentials = auth_runtime.snapshot_credentials().await;
match auth::verify_request(&req, &credentials) {
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("Auth failed: {}", e.message);
return Ok(storage_error_response(&e, &request_id));
let resp = storage_error_response(&e, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
}
} else {
@@ -421,7 +568,9 @@ async fn handle_request(
// Step 3: Authorization (policy evaluation)
if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await {
return Ok(storage_error_response(&e, &request_id));
let resp = storage_error_response(&e, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
}
@@ -453,9 +602,118 @@ async fn handle_request(
"request"
);
metrics.record_response(response.status());
Ok(response)
}
async fn handle_operational_request(
path: &str,
store: Arc<StorageBackend>,
config: &SmartStorageConfig,
metrics: &ServerMetrics,
request_id: &str,
) -> Result<Response<BoxBody>> {
match path {
"/-/live" | "/-/livez" => Ok(json_response(
StatusCode::OK,
serde_json::json!({
"ok": true,
"status": "alive",
"startedAt": metrics.started_at.timestamp_millis(),
}),
request_id,
)),
"/-/ready" | "/-/readyz" => {
let cluster_health = store.get_cluster_health().await?;
let cluster_ready = !cluster_health.enabled
|| (cluster_health.majority_healthy.unwrap_or(false)
&& cluster_health.quorum_healthy.unwrap_or(false));
let status = if cluster_ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
Ok(json_response(
status,
serde_json::json!({
"ok": cluster_ready,
"status": if cluster_ready { "ready" } else { "degraded" },
"cluster": cluster_health,
}),
request_id,
))
}
"/-/health" | "/-/healthz" => {
let cluster_health = store.get_cluster_health().await?;
let stats = store.get_storage_stats().await?;
Ok(json_response(
StatusCode::OK,
serde_json::json!({
"ok": true,
"status": "healthy",
"version": env!("CARGO_PKG_VERSION"),
"server": {
"address": config.server.address,
"port": config.server.port,
"startedAt": metrics.started_at.timestamp_millis(),
},
"storage": stats,
"cluster": cluster_health,
"metrics": {
"totalRequests": metrics.total_requests.load(Ordering::Relaxed),
"errorResponses": metrics.error_responses.load(Ordering::Relaxed),
},
}),
request_id,
))
}
"/-/metrics" => {
let cluster_health = store.get_cluster_health().await?;
let stats = store.get_storage_stats().await?;
let cluster_enabled = if cluster_health.enabled { 1 } else { 0 };
let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) { 1 } else { 0 };
let body = format!(
"# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\
# TYPE smartstorage_requests_total counter\n\
smartstorage_requests_total {}\n\
# HELP smartstorage_error_responses_total HTTP responses with status >= 400.\n\
# TYPE smartstorage_error_responses_total counter\n\
smartstorage_error_responses_total {}\n\
# HELP smartstorage_buckets_total Runtime bucket count.\n\
# TYPE smartstorage_buckets_total gauge\n\
smartstorage_buckets_total {}\n\
# HELP smartstorage_objects_total Runtime object count.\n\
# TYPE smartstorage_objects_total gauge\n\
smartstorage_objects_total {}\n\
# HELP smartstorage_cluster_enabled Cluster mode enabled.\n\
# TYPE smartstorage_cluster_enabled gauge\n\
smartstorage_cluster_enabled {}\n\
# HELP smartstorage_cluster_quorum_healthy Cluster quorum health.\n\
# TYPE smartstorage_cluster_quorum_healthy gauge\n\
smartstorage_cluster_quorum_healthy {}\n",
metrics.total_requests.load(Ordering::Relaxed),
metrics.error_responses.load(Ordering::Relaxed),
stats.bucket_count,
stats.total_object_count,
cluster_enabled,
quorum_healthy,
);
Ok(text_response(
StatusCode::OK,
"text/plain; version=0.0.4",
body,
request_id,
))
}
_ => Ok(json_response(
StatusCode::NOT_FOUND,
serde_json::json!({ "ok": false, "error": "Unknown operational endpoint" }),
request_id,
)),
}
}
/// Authorize a request based on bucket policies and authentication state.
async fn authorize_request(
ctx: &RequestContext,
+494 -29
View File
@@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::cluster::coordinator::DistributedStore;
@@ -64,6 +65,133 @@ pub struct BucketInfo {
pub creation_date: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketSummary {
pub name: String,
pub object_count: u64,
pub total_size_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub creation_date: Option<i64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageLocationSummary {
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageStats {
pub bucket_count: u64,
pub total_object_count: u64,
pub total_storage_bytes: u64,
pub buckets: Vec<BucketSummary>,
pub storage_directory: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub storage_locations: Vec<StorageLocationSummary>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterPeerHealth {
pub node_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub quic_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub s3_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drive_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub missed_heartbeats: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterDriveHealth {
pub index: u32,
pub path: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_count: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_check: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure_set_id: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterErasureHealth {
pub data_shards: usize,
pub parity_shards: usize,
pub chunk_size_bytes: usize,
pub total_shards: usize,
pub read_quorum: usize,
pub write_quorum: usize,
pub erasure_set_count: usize,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterRepairHealth {
pub active: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub scan_interval_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_started_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_completed_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_duration_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_checked: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_healed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterHealth {
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub quorum_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub majority_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub peers: Option<Vec<ClusterPeerHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drives: Option<Vec<ClusterDriveHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure: Option<ClusterErasureHealth>,
#[serde(skip_serializing_if = "Option::is_none")]
pub repairs: Option<ClusterRepairHealth>,
}
pub struct MultipartUploadInfo {
pub upload_id: String,
pub key: String,
@@ -98,22 +226,186 @@ struct PartMetadata {
last_modified: String,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeBucketStats {
pub object_count: u64,
pub total_size_bytes: u64,
pub creation_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeStatsState {
buckets: HashMap<String, RuntimeBucketStats>,
total_object_count: u64,
total_storage_bytes: u64,
}
impl RuntimeStatsState {
pub(crate) fn replace_buckets(&mut self, buckets: HashMap<String, RuntimeBucketStats>) {
self.total_object_count = buckets.values().map(|bucket| bucket.object_count).sum();
self.total_storage_bytes = buckets.values().map(|bucket| bucket.total_size_bytes).sum();
self.buckets = buckets;
}
pub(crate) fn ensure_bucket(&mut self, name: &str, creation_date: Option<DateTime<Utc>>) {
let bucket = self.buckets.entry(name.to_string()).or_default();
if bucket.creation_date.is_none() {
bucket.creation_date = creation_date;
}
}
pub(crate) fn remove_bucket(&mut self, name: &str) {
if let Some(bucket) = self.buckets.remove(name) {
self.total_object_count = self.total_object_count.saturating_sub(bucket.object_count);
self.total_storage_bytes = self
.total_storage_bytes
.saturating_sub(bucket.total_size_bytes);
}
}
pub(crate) fn upsert_object(
&mut self,
bucket_name: &str,
previous_size: Option<u64>,
new_size: u64,
) {
let bucket_was_present = self.buckets.contains_key(bucket_name);
let bucket = self.buckets.entry(bucket_name.to_string()).or_default();
if let Some(previous_size) = previous_size {
if !bucket_was_present {
bucket.object_count = 1;
self.total_object_count += 1;
}
bucket.total_size_bytes =
bucket.total_size_bytes.saturating_sub(previous_size) + new_size;
self.total_storage_bytes =
self.total_storage_bytes.saturating_sub(previous_size) + new_size;
} else {
bucket.object_count += 1;
bucket.total_size_bytes += new_size;
self.total_object_count += 1;
self.total_storage_bytes += new_size;
}
}
pub(crate) fn remove_object(&mut self, bucket_name: &str, existing_size: Option<u64>) {
let Some(existing_size) = existing_size else {
return;
};
let Some(bucket) = self.buckets.get_mut(bucket_name) else {
return;
};
bucket.object_count = bucket.object_count.saturating_sub(1);
bucket.total_size_bytes = bucket.total_size_bytes.saturating_sub(existing_size);
self.total_object_count = self.total_object_count.saturating_sub(1);
self.total_storage_bytes = self.total_storage_bytes.saturating_sub(existing_size);
}
pub(crate) fn bucket_summaries(&self) -> Vec<BucketSummary> {
let mut buckets: Vec<BucketSummary> = self
.buckets
.iter()
.map(|(name, stats)| BucketSummary {
name: name.clone(),
object_count: stats.object_count,
total_size_bytes: stats.total_size_bytes,
creation_date: stats
.creation_date
.as_ref()
.map(|creation_date| creation_date.timestamp_millis()),
})
.collect();
buckets.sort_by(|a, b| a.name.cmp(&b.name));
buckets
}
pub(crate) fn snapshot(
&self,
storage_directory: &Path,
storage_locations: Vec<StorageLocationSummary>,
) -> StorageStats {
StorageStats {
bucket_count: self.buckets.len() as u64,
total_object_count: self.total_object_count,
total_storage_bytes: self.total_storage_bytes,
buckets: self.bucket_summaries(),
storage_directory: storage_directory.to_string_lossy().to_string(),
storage_locations,
}
}
}
#[derive(Debug, Clone, Copy)]
struct FilesystemUsage {
total_bytes: u64,
available_bytes: u64,
used_bytes: u64,
}
pub(crate) fn storage_location_summary(path: &Path) -> StorageLocationSummary {
let usage = filesystem_usage(path);
StorageLocationSummary {
path: path.to_string_lossy().to_string(),
total_bytes: usage.map(|usage| usage.total_bytes),
available_bytes: usage.map(|usage| usage.available_bytes),
used_bytes: usage.map(|usage| usage.used_bytes),
}
}
#[cfg(unix)]
fn filesystem_usage(path: &Path) -> Option<FilesystemUsage> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let path_bytes = path.as_os_str().as_bytes();
let c_path = CString::new(path_bytes).ok()?;
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 {
return None;
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let free_bytes = stat.f_bfree as u64 * block_size;
Some(FilesystemUsage {
total_bytes,
available_bytes,
used_bytes: total_bytes.saturating_sub(free_bytes),
})
}
#[cfg(not(unix))]
fn filesystem_usage(_path: &Path) -> Option<FilesystemUsage> {
None
}
// ============================
// FileStore
// ============================
pub struct FileStore {
root_dir: PathBuf,
runtime_stats: RwLock<RuntimeStatsState>,
}
impl FileStore {
pub fn new(root_dir: PathBuf) -> Self {
Self { root_dir }
Self {
root_dir,
runtime_stats: RwLock::new(RuntimeStatsState::default()),
}
}
pub async fn initialize(&self) -> Result<()> {
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
@@ -127,9 +419,56 @@ impl FileStore {
}
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.snapshot(
&self.root_dir,
vec![storage_location_summary(&self.root_dir)],
))
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.bucket_summaries())
}
async fn refresh_runtime_stats(&self) {
let buckets = match self.list_buckets().await {
Ok(buckets) => buckets,
Err(error) => {
tracing::warn!(path = %self.root_dir.display(), error = %error, "Failed to initialize runtime stats");
return;
}
};
let mut runtime_buckets = HashMap::new();
for bucket in buckets {
let bucket_path = self.root_dir.join(&bucket.name);
match Self::scan_bucket_objects(&bucket_path).await {
Ok((object_count, total_size_bytes)) => {
runtime_buckets.insert(
bucket.name,
RuntimeBucketStats {
object_count,
total_size_bytes,
creation_date: Some(bucket.creation_date),
},
);
}
Err(error) => {
tracing::warn!(bucket = %bucket.name, error = %error, "Failed to scan bucket for runtime stats");
}
}
}
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.replace_buckets(runtime_buckets);
}
// ============================
// Bucket operations
// ============================
@@ -168,6 +507,7 @@ impl FileStore {
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
let bucket_path = self.root_dir.join(bucket);
fs::create_dir_all(&bucket_path).await?;
self.track_bucket_created(bucket).await;
Ok(())
}
@@ -185,6 +525,7 @@ impl FileStore {
}
fs::remove_dir_all(&bucket_path).await?;
self.track_bucket_deleted(bucket).await;
Ok(())
}
@@ -203,6 +544,8 @@ impl FileStore {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
@@ -243,9 +586,11 @@ impl FileStore {
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(&metadata_path, metadata_json).await?;
Ok(PutResult {
md5: md5_hex,
})
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(bucket, previous_size, object_size)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object(
@@ -310,6 +655,7 @@ impl FileStore {
}
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
let existing_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
let md5_path = format!("{}.md5", object_path.display());
let metadata_path = format!("{}.metadata.json", object_path.display());
@@ -337,6 +683,8 @@ impl FileStore {
current = dir.parent().map(|p| p.to_path_buf());
}
self.track_object_deleted(bucket, existing_size).await;
Ok(())
}
@@ -360,6 +708,8 @@ impl FileStore {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(dest_bucket, dest_key).await;
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent).await?;
}
@@ -387,10 +737,10 @@ impl FileStore {
let md5 = self.read_md5(&dest_path).await;
let last_modified: DateTime<Utc> = file_meta.modified()?.into();
Ok(CopyResult {
md5,
last_modified,
})
self.track_object_upsert(dest_bucket, previous_size, file_meta.len())
.await;
Ok(CopyResult { md5, last_modified })
}
pub async fn list_objects(
@@ -438,11 +788,7 @@ impl FileStore {
if !delimiter.is_empty() {
let remaining = &key[prefix.len()..];
if let Some(delim_idx) = remaining.find(delimiter) {
let cp = format!(
"{}{}",
prefix,
&remaining[..delim_idx + delimiter.len()]
);
let cp = format!("{}{}", prefix, &remaining[..delim_idx + delimiter.len()]);
if common_prefix_set.insert(cp.clone()) {
common_prefixes.push(cp);
}
@@ -458,7 +804,10 @@ impl FileStore {
let object_path = self.object_path(bucket, key);
if let Ok(meta) = fs::metadata(&object_path).await {
let md5 = self.read_md5(&object_path).await;
let last_modified: DateTime<Utc> = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH).into();
let last_modified: DateTime<Utc> = meta
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
.into();
contents.push(ListObjectEntry {
key: key.clone(),
size: meta.len(),
@@ -611,6 +960,8 @@ impl FileStore {
let content = fs::read_to_string(&meta_path).await?;
let meta: MultipartMetadata = serde_json::from_str(&content)?;
let previous_size = self.object_size_if_exists(&meta.bucket, &meta.key).await;
let object_path = self.object_path(&meta.bucket, &meta.key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
@@ -653,12 +1004,14 @@ impl FileStore {
let metadata_json = serde_json::to_string_pretty(&meta.metadata)?;
fs::write(&metadata_path, metadata_json).await?;
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(&meta.bucket, previous_size, object_size)
.await;
// Clean up multipart directory
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(CompleteMultipartResult {
etag,
})
Ok(CompleteMultipartResult { etag })
}
pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
@@ -670,10 +1023,7 @@ impl FileStore {
Ok(())
}
pub async fn list_multipart_uploads(
&self,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
let multipart_dir = self.multipart_dir();
if !multipart_dir.is_dir() {
return Ok(Vec::new());
@@ -712,6 +1062,75 @@ impl FileStore {
// Helpers
// ============================
async fn scan_bucket_objects(bucket_path: &Path) -> Result<(u64, u64)> {
let mut object_count = 0u64;
let mut total_size_bytes = 0u64;
let mut directories = vec![bucket_path.to_path_buf()];
while let Some(directory) = directories.pop() {
let mut entries = match fs::read_dir(&directory).await {
Ok(entries) => entries,
Err(_) => continue,
};
while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
if metadata.is_dir() {
directories.push(entry.path());
continue;
}
let name = entry.file_name().to_string_lossy().to_string();
if name.ends_with("._storage_object") {
object_count += 1;
total_size_bytes += metadata.len();
}
}
}
Ok((object_count, total_size_bytes))
}
async fn bucket_creation_date(&self, bucket: &str) -> Option<DateTime<Utc>> {
let metadata = fs::metadata(self.root_dir.join(bucket)).await.ok()?;
let created_or_modified = metadata.created().unwrap_or(
metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
);
Some(created_or_modified.into())
}
async fn object_size_if_exists(&self, bucket: &str, key: &str) -> Option<u64> {
fs::metadata(self.object_path(bucket, key))
.await
.ok()
.map(|metadata| metadata.len())
}
async fn track_bucket_created(&self, bucket: &str) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
}
async fn track_bucket_deleted(&self, bucket: &str) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_bucket(bucket);
}
async fn track_object_upsert(&self, bucket: &str, previous_size: Option<u64>, new_size: u64) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
runtime_stats.upsert_object(bucket, previous_size, new_size);
}
async fn track_object_deleted(&self, bucket: &str, existing_size: Option<u64>) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_object(bucket, existing_size);
}
fn object_path(&self, bucket: &str, key: &str) -> PathBuf {
let encoded = encode_key(key);
self.root_dir
@@ -815,12 +1234,43 @@ impl StorageBackend {
}
}
pub async fn get_cluster_health(&self) -> Result<ClusterHealth> {
match self {
StorageBackend::Standalone(_) => Ok(ClusterHealth {
enabled: false,
node_id: None,
quorum_healthy: None,
majority_healthy: None,
peers: None,
drives: None,
erasure: None,
repairs: None,
}),
StorageBackend::Clustered(ds) => ds.get_cluster_health().await,
}
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
match self {
StorageBackend::Standalone(fs) => fs.get_storage_stats().await,
StorageBackend::Clustered(ds) => ds.get_storage_stats().await,
}
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
match self {
StorageBackend::Standalone(fs) => fs.list_bucket_summaries().await,
StorageBackend::Clustered(ds) => ds.list_bucket_summaries().await,
}
}
pub async fn initialize(&self) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.initialize().await,
StorageBackend::Clustered(ds) => {
// Ensure policies directory exists
tokio::fs::create_dir_all(ds.policies_dir()).await?;
ds.initialize_runtime_stats().await;
Ok(())
}
}
@@ -911,10 +1361,26 @@ impl StorageBackend {
) -> Result<CopyResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await
fs.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
StorageBackend::Clustered(ds) => {
ds.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await
ds.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
}
}
@@ -929,10 +1395,12 @@ impl StorageBackend {
) -> Result<ListObjectsResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await
fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
StorageBackend::Clustered(ds) => {
ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await
ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
}
}
@@ -979,10 +1447,7 @@ impl StorageBackend {
}
}
pub async fn list_multipart_uploads(
&self,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
match self {
StorageBackend::Standalone(fs) => fs.list_multipart_uploads(bucket).await,
StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await,
+112 -6
View File
@@ -1,16 +1,28 @@
/// <reference types="node" />
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { S3Client, CreateBucketCommand, ListBucketsCommand, PutObjectCommand, GetObjectCommand, DeleteObjectCommand, DeleteBucketCommand } from '@aws-sdk/client-s3';
import { Buffer } from 'buffer';
import { Readable } from 'stream';
import * as smartstorage from '../ts/index.js';
let testSmartStorageInstance: smartstorage.SmartStorage;
let s3Client: S3Client;
const testObjectBody = 'Hello from AWS SDK!';
const testObjectSize = Buffer.byteLength(testObjectBody);
function getBucketSummary(
summaries: smartstorage.IBucketSummary[],
bucketName: string,
): smartstorage.IBucketSummary | undefined {
return summaries.find((summary) => summary.name === bucketName);
}
// Helper to convert stream to string
async function streamToString(stream: Readable): Promise<string> {
const chunks: Buffer[] = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk)));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
@@ -46,28 +58,82 @@ tap.test('should list buckets (empty)', async () => {
expect(response.Buckets!.length).toEqual(0);
});
tap.test('should expose empty runtime stats after startup', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
expect(stats.bucketCount).toEqual(0);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(stats.buckets.length).toEqual(0);
expect(stats.storageDirectory.length > 0).toEqual(true);
});
tap.test('should expose disabled cluster health in standalone mode', async () => {
const clusterHealth = await testSmartStorageInstance.getClusterHealth();
expect(clusterHealth.enabled).toEqual(false);
expect(clusterHealth.nodeId).toEqual(undefined);
expect(clusterHealth.quorumHealthy).toEqual(undefined);
expect(clusterHealth.drives).toEqual(undefined);
});
tap.test('should create a bucket', async () => {
const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' }));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('should list buckets (showing created bucket)', async () => {
tap.test('should create an empty bucket through the bridge', async () => {
const response = await testSmartStorageInstance.createBucket('empty-bucket');
expect(response.name).toEqual('empty-bucket');
});
tap.test('should list buckets (showing created buckets)', async () => {
const response = await s3Client.send(new ListBucketsCommand({}));
expect(response.Buckets!.length).toEqual(1);
expect(response.Buckets![0].Name).toEqual('test-bucket');
expect(response.Buckets!.length).toEqual(2);
expect(response.Buckets!.some((bucket) => bucket.Name === 'test-bucket')).toEqual(true);
expect(response.Buckets!.some((bucket) => bucket.Name === 'empty-bucket')).toEqual(true);
});
tap.test('should expose runtime bucket summaries after bucket creation', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const summaries = await testSmartStorageInstance.listBucketSummaries();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(summaries, 'empty-bucket');
expect(stats.bucketCount).toEqual(2);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(summaries.length).toEqual(2);
expect(testBucketSummary?.objectCount).toEqual(0);
expect(testBucketSummary?.totalSizeBytes).toEqual(0);
expect(typeof testBucketSummary?.creationDate).toEqual('number');
expect(emptyBucketSummary?.objectCount).toEqual(0);
expect(emptyBucketSummary?.totalSizeBytes).toEqual(0);
});
tap.test('should upload an object', async () => {
const response = await s3Client.send(new PutObjectCommand({
Bucket: 'test-bucket',
Key: 'test-file.txt',
Body: 'Hello from AWS SDK!',
Body: testObjectBody,
ContentType: 'text/plain',
}));
expect(response.$metadata.httpStatusCode).toEqual(200);
expect(response.ETag).toBeTypeofString();
});
tap.test('should reflect uploaded object in runtime stats', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket');
expect(stats.bucketCount).toEqual(2);
expect(stats.totalObjectCount).toEqual(1);
expect(stats.totalStorageBytes).toEqual(testObjectSize);
expect(testBucketSummary?.objectCount).toEqual(1);
expect(testBucketSummary?.totalSizeBytes).toEqual(testObjectSize);
expect(emptyBucketSummary?.objectCount).toEqual(0);
expect(emptyBucketSummary?.totalSizeBytes).toEqual(0);
});
tap.test('should download the object', async () => {
const response = await s3Client.send(new GetObjectCommand({
Bucket: 'test-bucket',
@@ -76,7 +142,7 @@ tap.test('should download the object', async () => {
expect(response.$metadata.httpStatusCode).toEqual(200);
const content = await streamToString(response.Body as Readable);
expect(content).toEqual('Hello from AWS SDK!');
expect(content).toEqual(testObjectBody);
});
tap.test('should delete the object', async () => {
@@ -87,6 +153,20 @@ tap.test('should delete the object', async () => {
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('should reflect object deletion in runtime stats', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket');
expect(stats.bucketCount).toEqual(2);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(testBucketSummary?.objectCount).toEqual(0);
expect(testBucketSummary?.totalSizeBytes).toEqual(0);
expect(emptyBucketSummary?.objectCount).toEqual(0);
expect(emptyBucketSummary?.totalSizeBytes).toEqual(0);
});
tap.test('should fail to get deleted object', async () => {
await expect(
s3Client.send(new GetObjectCommand({
@@ -96,11 +176,37 @@ tap.test('should fail to get deleted object', async () => {
).rejects.toThrow();
});
tap.test('should delete the empty bucket', async () => {
const response = await s3Client.send(new DeleteBucketCommand({ Bucket: 'empty-bucket' }));
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('should reflect bucket deletion in runtime stats', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket');
expect(stats.bucketCount).toEqual(1);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(testBucketSummary?.objectCount).toEqual(0);
expect(testBucketSummary?.totalSizeBytes).toEqual(0);
expect(emptyBucketSummary).toEqual(undefined);
});
tap.test('should delete the bucket', async () => {
const response = await s3Client.send(new DeleteBucketCommand({ Bucket: 'test-bucket' }));
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('should expose empty runtime stats after deleting all buckets', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
expect(stats.bucketCount).toEqual(0);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(stats.buckets.length).toEqual(0);
});
tap.test('should stop the storage server', async () => {
await testSmartStorageInstance.stop();
});
+84
View File
@@ -0,0 +1,84 @@
/// <reference types="node" />
import { rm } from 'fs/promises';
import { join } from 'path';
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartstorage from '../ts/index.js';
let clusterStorage: smartstorage.SmartStorage;
const baseDir = join(process.cwd(), '.nogit', `cluster-health-${Date.now()}`);
const drivePaths = Array.from({ length: 6 }, (_value, index) => {
return join(baseDir, `drive-${index + 1}`);
});
const storageDir = join(baseDir, 'storage');
tap.test('setup: start clustered storage server', async () => {
clusterStorage = await smartstorage.SmartStorage.createAndStart({
server: {
port: 3348,
silent: true,
},
storage: {
directory: storageDir,
},
cluster: {
enabled: true,
nodeId: 'cluster-health-node',
quicPort: 4348,
seedNodes: [],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: drivePaths,
},
},
});
});
tap.test('should expose clustered runtime health', async () => {
const health = await clusterStorage.getClusterHealth();
expect(health.enabled).toEqual(true);
expect(health.nodeId).toEqual('cluster-health-node');
expect(health.quorumHealthy).toEqual(true);
expect(health.majorityHealthy).toEqual(true);
expect(Array.isArray(health.peers)).toEqual(true);
expect(health.peers!.length).toEqual(0);
expect(Array.isArray(health.drives)).toEqual(true);
expect(health.drives!.length).toEqual(6);
expect(health.drives!.every((drive) => drive.status === 'online')).toEqual(true);
expect(health.drives!.every((drive) => drivePaths.includes(drive.path))).toEqual(true);
expect(health.drives!.every((drive) => drive.totalBytes !== undefined)).toEqual(true);
expect(health.drives!.every((drive) => drive.usedBytes !== undefined)).toEqual(true);
expect(health.drives!.every((drive) => drive.lastCheck !== undefined)).toEqual(true);
expect(health.drives!.every((drive) => drive.erasureSetId === 0)).toEqual(true);
expect(health.erasure?.dataShards).toEqual(4);
expect(health.erasure?.parityShards).toEqual(2);
expect(health.erasure?.chunkSizeBytes).toEqual(1024 * 1024);
expect(health.erasure?.totalShards).toEqual(6);
expect(health.erasure?.readQuorum).toEqual(4);
expect(health.erasure?.writeQuorum).toEqual(5);
expect(health.erasure?.erasureSetCount).toEqual(1);
expect(health.repairs?.active).toEqual(false);
expect(health.repairs?.scanIntervalMs).toEqual(24 * 60 * 60 * 1000);
});
tap.test('should expose cluster health after bucket creation', async () => {
const bucket = await clusterStorage.createBucket('cluster-health-bucket');
const health = await clusterStorage.getClusterHealth();
expect(bucket.name).toEqual('cluster-health-bucket');
expect(health.enabled).toEqual(true);
expect(health.quorumHealthy).toEqual(true);
expect(health.drives!.length).toEqual(6);
});
tap.test('teardown: stop clustered server and clean files', async () => {
await clusterStorage.stop();
await rm(baseDir, { recursive: true, force: true });
});
export default tap.start()
+317
View File
@@ -0,0 +1,317 @@
/// <reference types="node" />
import { readFile, readdir, rm } from 'fs/promises';
import { join } from 'path';
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { CreateBucketCommand, GetObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { Readable } from 'stream';
import * as smartstorage from '../ts/index.js';
const baseDir = join(process.cwd(), '.nogit', `cluster-multinode-${Date.now()}`);
const nodes: smartstorage.SmartStorage[] = [];
const makeDrivePaths = (nodeId: string) => {
return [1, 2].map((driveIndex) => join(baseDir, nodeId, `drive-${driveIndex}`));
};
const streamToString = async (stream: Readable): Promise<string> => {
const chunks: Buffer[] = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk)));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
};
const fileExistsBelow = async (directory: string, fileName: string): Promise<boolean> => {
let entries;
try {
entries = await readdir(directory, { withFileTypes: true });
} catch {
return false;
}
for (const entry of entries) {
const entryPath = join(directory, entry.name);
if (entry.isFile() && entry.name === fileName) {
return true;
}
if (entry.isDirectory() && await fileExistsBelow(entryPath, fileName)) {
return true;
}
}
return false;
};
const waitFor = async (check: () => Promise<boolean>, timeoutMs = 10000) => {
const deadline = Date.now() + timeoutMs;
let lastError = '';
while (Date.now() < deadline) {
try {
if (await check()) {
return;
}
} catch (error) {
lastError = error instanceof Error ? error.message : String(error);
}
await new Promise((resolve) => setTimeout(resolve, 250));
}
throw new Error(`Timed out waiting for cluster condition${lastError ? `: ${lastError}` : ''}`);
};
tap.test('setup: start three clustered storage nodes', async () => {
await rm(baseDir, { recursive: true, force: true });
const node1 = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3350,
silent: true,
},
storage: {
directory: join(baseDir, 'node-1', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-1',
quicPort: 4350,
seedNodes: [],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-1'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
nodes.push(node1);
await new Promise((resolve) => setTimeout(resolve, 500));
const node2 = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3351,
silent: true,
},
storage: {
directory: join(baseDir, 'node-2', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-2',
quicPort: 4351,
seedNodes: ['127.0.0.1:4350'],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-2'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
nodes.push(node2);
await new Promise((resolve) => setTimeout(resolve, 500));
const node3 = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3352,
silent: true,
},
storage: {
directory: join(baseDir, 'node-3', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-3',
quicPort: 4352,
seedNodes: ['127.0.0.1:4350'],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-3'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
nodes.push(node3);
});
tap.test('seed node should report joined peers and multi-node erasure topology', async () => {
const seed = nodes[0];
await waitFor(async () => {
const health = await seed.getClusterHealth();
if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) {
throw new Error(JSON.stringify(health));
}
return health.peers?.length === 2 && health.erasure?.erasureSetCount === 1;
});
const health = await seed.getClusterHealth();
const peerIds = health.peers!.map((peer) => peer.nodeId).sort();
expect(health.enabled).toEqual(true);
expect(health.nodeId).toEqual('node-1');
expect(health.quorumHealthy).toEqual(true);
expect(health.majorityHealthy).toEqual(true);
expect(peerIds).toEqual(['node-2', 'node-3']);
expect(health.erasure?.totalShards).toEqual(6);
expect(health.erasure?.erasureSetCount).toEqual(1);
});
tap.test('all nodes should converge to the same multi-node topology', async () => {
for (const node of nodes) {
await waitFor(async () => {
const health = await node.getClusterHealth();
if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) {
throw new Error(JSON.stringify(health));
}
return true;
});
}
});
tap.test('seed node should write shards to the declared remote drives', async () => {
const seed = nodes[0];
const descriptor = await seed.getStorageDescriptor();
const client = new S3Client({
endpoint: `http://${descriptor.endpoint}:${descriptor.port}`,
region: 'us-east-1',
credentials: {
accessKeyId: descriptor.accessKey,
secretAccessKey: descriptor.accessSecret,
},
forcePathStyle: true,
});
const bucket = 'multinode-bucket';
const key = 'distributed.txt';
const body = 'hello distributed shards';
await client.send(new CreateBucketCommand({ Bucket: bucket }));
await client.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: body }));
const getResponse = await client.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
expect(await streamToString(getResponse.Body as Readable)).toEqual(body);
const manifestPath = join(
baseDir,
'node-1',
'storage',
'.manifests',
bucket,
`${key}.manifest.json`,
);
const manifest = JSON.parse(await readFile(manifestPath, 'utf8')) as {
chunks: Array<{
shardPlacements: Array<{ shardIndex: number; nodeId: string; driveId: string }>;
}>;
};
const placements = manifest.chunks[0].shardPlacements;
expect(placements.length).toEqual(6);
expect(placements.some((placement) => placement.nodeId === 'node-2' && placement.driveId === '1'))
.toEqual(true);
expect(placements.some((placement) => placement.nodeId === 'node-3' && placement.driveId === '1'))
.toEqual(true);
for (const placement of placements) {
const drivePath = makeDrivePaths(placement.nodeId)[Number(placement.driveId)];
const shardFile = `shard-${placement.shardIndex}.dat`;
expect(await fileExistsBelow(join(drivePath, '.smartstorage', 'data'), shardFile)).toEqual(true);
}
});
tap.test('restarted peer should keep durable identity and rejoin topology', async () => {
await nodes[1].stop();
await new Promise((resolve) => setTimeout(resolve, 500));
nodes[1] = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3351,
silent: true,
},
storage: {
directory: join(baseDir, 'node-2', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-2',
quicPort: 4351,
seedNodes: ['127.0.0.1:4350'],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-2'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
await waitFor(async () => {
const health = await nodes[1].getClusterHealth();
if (health.nodeId !== 'node-2' || health.peers?.length !== 2) {
throw new Error(JSON.stringify(health));
}
return true;
});
const identityPath = join(
baseDir,
'node-2',
'storage',
'.smartstorage',
'cluster',
'identity.json',
);
const topologyPath = join(
baseDir,
'node-2',
'storage',
'.smartstorage',
'cluster',
'topology.json',
);
const identity = JSON.parse(await readFile(identityPath, 'utf8')) as {
nodeId: string;
clusterId: string;
};
const topology = JSON.parse(await readFile(topologyPath, 'utf8')) as {
cluster_id: string;
nodes: Array<{ node_id: string }>;
};
expect(identity.nodeId).toEqual('node-2');
expect(identity.clusterId).toEqual(topology.cluster_id);
expect(topology.nodes.some((node) => node.node_id === 'node-1')).toEqual(true);
expect(topology.nodes.some((node) => node.node_id === 'node-3')).toEqual(true);
});
tap.test('teardown: stop clustered nodes and clean files', async () => {
for (const node of nodes.reverse()) {
await node.stop();
}
await rm(baseDir, { recursive: true, force: true });
});
export default tap.start();
+150
View File
@@ -0,0 +1,150 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import {
CreateBucketCommand,
DeleteBucketCommand,
ListBucketsCommand,
S3Client,
} from '@aws-sdk/client-s3';
import * as smartstorage from '../ts/index.js';
const TEST_PORT = 3349;
const INITIAL_CREDENTIAL: smartstorage.IStorageCredential = {
accessKeyId: 'RUNTIMEINITIAL',
secretAccessKey: 'RUNTIMEINITIALSECRET123',
};
const ROTATED_CREDENTIAL_A: smartstorage.IStorageCredential = {
accessKeyId: 'RUNTIMEA',
secretAccessKey: 'RUNTIMEASECRET123',
};
const ROTATED_CREDENTIAL_B: smartstorage.IStorageCredential = {
accessKeyId: 'RUNTIMEB',
secretAccessKey: 'RUNTIMEBSECRET123',
};
const TEST_BUCKET = 'runtime-credentials-bucket';
let testSmartStorageInstance: smartstorage.SmartStorage;
let initialClient: S3Client;
let rotatedClientA: S3Client;
let rotatedClientB: S3Client;
function createS3Client(credential: smartstorage.IStorageCredential): S3Client {
return new S3Client({
endpoint: `http://localhost:${TEST_PORT}`,
region: 'us-east-1',
credentials: {
accessKeyId: credential.accessKeyId,
secretAccessKey: credential.secretAccessKey,
},
forcePathStyle: true,
});
}
tap.test('setup: start storage server with runtime-managed credentials', async () => {
testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({
server: {
port: TEST_PORT,
silent: true,
region: 'us-east-1',
},
storage: {
cleanSlate: true,
},
auth: {
enabled: true,
credentials: [INITIAL_CREDENTIAL],
},
});
initialClient = createS3Client(INITIAL_CREDENTIAL);
rotatedClientA = createS3Client(ROTATED_CREDENTIAL_A);
rotatedClientB = createS3Client(ROTATED_CREDENTIAL_B);
});
tap.test('startup credentials authenticate successfully', async () => {
const response = await initialClient.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('listCredentials returns active credential metadata without secrets', async () => {
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(1);
expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId);
expect((credentials[0] as any).secretAccessKey).toEqual(undefined);
});
tap.test('invalid replacement input fails cleanly and leaves old credentials active', async () => {
await expect(
testSmartStorageInstance.replaceCredentials([
{
accessKeyId: '',
secretAccessKey: 'invalid-secret',
},
]),
).rejects.toThrow();
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(1);
expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId);
const response = await initialClient.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('replacing credentials swaps the active set atomically', async () => {
await testSmartStorageInstance.replaceCredentials([
ROTATED_CREDENTIAL_A,
ROTATED_CREDENTIAL_B,
]);
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(2);
expect(credentials[0].accessKeyId).toEqual(ROTATED_CREDENTIAL_A.accessKeyId);
expect(credentials[1].accessKeyId).toEqual(ROTATED_CREDENTIAL_B.accessKeyId);
});
tap.test('old credentials stop working immediately for new requests', async () => {
await expect(initialClient.send(new ListBucketsCommand({}))).rejects.toThrow();
});
tap.test('first rotated credential authenticates successfully', async () => {
const response = await rotatedClientA.send(
new CreateBucketCommand({ Bucket: TEST_BUCKET }),
);
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('multiple rotated credentials remain active', async () => {
const response = await rotatedClientB.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
expect(response.Buckets?.some((bucket) => bucket.Name === TEST_BUCKET)).toEqual(true);
});
tap.test('duplicate replacement input fails cleanly without changing the active set', async () => {
await expect(
testSmartStorageInstance.replaceCredentials([
ROTATED_CREDENTIAL_A,
{
accessKeyId: ROTATED_CREDENTIAL_A.accessKeyId,
secretAccessKey: 'another-secret',
},
]),
).rejects.toThrow();
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(2);
expect(credentials[0].accessKeyId).toEqual(ROTATED_CREDENTIAL_A.accessKeyId);
expect(credentials[1].accessKeyId).toEqual(ROTATED_CREDENTIAL_B.accessKeyId);
const response = await rotatedClientA.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('teardown: clean up bucket and stop the storage server', async () => {
const response = await rotatedClientA.send(
new DeleteBucketCommand({ Bucket: TEST_BUCKET }),
);
expect(response.$metadata.httpStatusCode).toEqual(204);
await testSmartStorageInstance.stop();
});
export default tap.start()
+50
View File
@@ -0,0 +1,50 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartstorage from '../ts/index.js';
const TEST_PORT = 3353;
let testSmartStorageInstance: smartstorage.SmartStorage;
tap.test('setup: start storage server for operational endpoint checks', async () => {
testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({
server: {
port: TEST_PORT,
silent: true,
region: 'us-east-1',
},
storage: {
cleanSlate: true,
},
auth: {
enabled: false,
credentials: [],
},
});
});
tap.test('operational endpoints expose live ready health and metrics', async () => {
const live = await fetch(`http://localhost:${TEST_PORT}/-/live`);
expect(live.status).toEqual(200);
expect((await live.json()).status).toEqual('alive');
const ready = await fetch(`http://localhost:${TEST_PORT}/-/ready`);
expect(ready.status).toEqual(200);
expect((await ready.json()).status).toEqual('ready');
const health = await fetch(`http://localhost:${TEST_PORT}/-/health`);
expect(health.status).toEqual(200);
const healthBody = await health.json();
expect(healthBody.ok).toEqual(true);
expect(healthBody.cluster.enabled).toEqual(false);
const metrics = await fetch(`http://localhost:${TEST_PORT}/-/metrics`);
expect(metrics.status).toEqual(200);
const metricsBody = await metrics.text();
expect(metricsBody.includes('smartstorage_requests_total')).toEqual(true);
expect(metricsBody.includes('smartstorage_cluster_enabled 0')).toEqual(true);
});
tap.test('teardown: stop storage server', async () => {
await testSmartStorageInstance.stop();
});
export default tap.start();
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstorage',
version: '6.3.2',
version: '6.4.0',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
}
+138 -4
View File
@@ -1,15 +1,24 @@
import * as plugins from './plugins.js';
import * as paths from './paths.js';
/**
* Authentication configuration
*/
export interface IStorageCredential {
accessKeyId: string;
secretAccessKey: string;
}
export interface IStorageCredentialMetadata {
accessKeyId: string;
}
/**
* Authentication configuration
*/
export interface IAuthConfig {
enabled: boolean;
credentials: Array<{
accessKeyId: string;
secretAccessKey: string;
}>;
credentials: IStorageCredential[];
}
/**
@@ -113,6 +122,105 @@ export interface ISmartStorageConfig {
cluster?: IClusterConfig;
}
/**
* Logical bucket stats maintained by the Rust runtime.
* Values are initialized from native storage on startup and updated on smartstorage mutations.
*/
export interface IBucketSummary {
name: string;
objectCount: number;
totalSizeBytes: number;
creationDate?: number;
}
/**
* Filesystem-level capacity snapshot for the storage directory or configured drive path.
*/
export interface IStorageLocationSummary {
path: string;
totalBytes?: number;
availableBytes?: number;
usedBytes?: number;
}
/**
* Runtime storage stats served by the Rust core without issuing S3 list calls.
*/
export interface IStorageStats {
bucketCount: number;
totalObjectCount: number;
totalStorageBytes: number;
buckets: IBucketSummary[];
storageDirectory: string;
storageLocations?: IStorageLocationSummary[];
}
/**
* Known peer status from the local node's current cluster view.
*/
export interface IClusterPeerHealth {
nodeId: string;
status: 'online' | 'suspect' | 'offline';
quicAddress?: string;
s3Address?: string;
driveCount?: number;
lastHeartbeat?: number;
missedHeartbeats?: number;
}
/**
* Local drive health as measured by smartstorage's runtime probes.
*/
export interface IClusterDriveHealth {
index: number;
path: string;
status: 'online' | 'degraded' | 'offline' | 'healing';
totalBytes?: number;
usedBytes?: number;
availableBytes?: number;
errorCount?: number;
lastError?: string;
lastCheck?: number;
erasureSetId?: number;
}
export interface IClusterErasureHealth {
dataShards: number;
parityShards: number;
chunkSizeBytes: number;
totalShards: number;
readQuorum: number;
writeQuorum: number;
erasureSetCount: number;
}
export interface IClusterRepairHealth {
active: boolean;
scanIntervalMs?: number;
lastRunStartedAt?: number;
lastRunCompletedAt?: number;
lastDurationMs?: number;
shardsChecked?: number;
shardsHealed?: number;
failed?: number;
lastError?: string;
}
/**
* Cluster runtime health from the Rust core.
* When clustering is disabled, the response is `{ enabled: false }`.
*/
export interface IClusterHealth {
enabled: boolean;
nodeId?: string;
quorumHealthy?: boolean;
majorityHealthy?: boolean;
peers?: IClusterPeerHealth[];
drives?: IClusterDriveHealth[];
erasure?: IClusterErasureHealth;
repairs?: IClusterRepairHealth;
}
/**
* Default configuration values
*/
@@ -205,6 +313,11 @@ type TRustStorageCommands = {
start: { params: { config: Required<ISmartStorageConfig> }; result: {} };
stop: { params: {}; result: {} };
createBucket: { params: { name: string }; result: {} };
getStorageStats: { params: {}; result: IStorageStats };
listBucketSummaries: { params: {}; result: IBucketSummary[] };
listCredentials: { params: {}; result: IStorageCredentialMetadata[] };
replaceCredentials: { params: { credentials: IStorageCredential[] }; result: {} };
getClusterHealth: { params: {}; result: IClusterHealth };
};
/**
@@ -274,6 +387,27 @@ export class SmartStorage {
return { name: bucketNameArg };
}
public async getStorageStats(): Promise<IStorageStats> {
return this.bridge.sendCommand('getStorageStats', {});
}
public async listBucketSummaries(): Promise<IBucketSummary[]> {
return this.bridge.sendCommand('listBucketSummaries', {});
}
public async listCredentials(): Promise<IStorageCredentialMetadata[]> {
return this.bridge.sendCommand('listCredentials', {});
}
public async replaceCredentials(credentials: IStorageCredential[]): Promise<void> {
await this.bridge.sendCommand('replaceCredentials', { credentials });
this.config.auth.credentials = credentials.map((credential) => ({ ...credential }));
}
public async getClusterHealth(): Promise<IClusterHealth> {
return this.bridge.sendCommand('getClusterHealth', {});
}
public async stop() {
await this.bridge.sendCommand('stop', {});
this.bridge.kill();
+2
View File
@@ -5,6 +5,8 @@
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"verbatimModuleSyntax": true,
"types": ["node"],
"ignoreDeprecations": "6.0",
"baseUrl": ".",
"paths": {}
},