Compare commits

..

11 Commits

Author SHA1 Message Date
jkunz c2b40ee240 v6.3.3
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-19 12:22:53 +00:00
jkunz 0db138bf42 fix(build): rename npmextra config to .smartconfig and refresh build metadata 2026-04-19 12:22:53 +00:00
jkunz 0e9862efca feat: enhance storage stats and cluster health reporting
- Introduced new data structures for bucket and storage statistics, including BucketSummary, StorageStats, and ClusterHealth.
- Implemented runtime statistics tracking for buckets, including object count and total size.
- Added methods to retrieve storage stats and bucket summaries in the FileStore.
- Enhanced the SmartStorage interface to expose storage stats and cluster health.
- Implemented tests for runtime stats, cluster health, and credential management.
- Added support for runtime-managed credentials with atomic replacement.
- Improved filesystem usage reporting for storage locations.
2026-04-19 11:57:28 +00:00
jkunz c683b02e8c v6.3.2
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-23 21:21:50 +00:00
jkunz b64be03c2f fix(docs): update license ownership and correct README license file reference 2026-03-23 21:21:50 +00:00
jkunz 494dac1267 v6.3.1
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:19:51 +00:00
jkunz cea3407777 fix(cluster): improve shard reconstruction validation and start background healing service 2026-03-21 22:19:51 +00:00
jkunz a009d990d0 v6.3.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:04:36 +00:00
jkunz 08d545f5db feat(readme): document distributed cluster mode, erasure coding, and QUIC-based architecture 2026-03-21 22:04:36 +00:00
jkunz a0a282c712 v6.2.0
Default (tags) / security (push) Failing after 0s
Default (tags) / test (push) Failing after 0s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-03-21 22:00:41 +00:00
jkunz 3eb0045676 feat(cluster): add shard healing, drive health heartbeats, and clustered policy directory support 2026-03-21 22:00:41 +00:00
27 changed files with 4470 additions and 1953 deletions
View File
+1 -1
View File
@@ -1,7 +1,7 @@
{
"json.schemas": [
{
"fileMatch": ["/npmextra.json"],
"fileMatch": ["/.smartconfig.json"],
"schema": {
"type": "object",
"properties": {
+62
View File
@@ -1,5 +1,67 @@
# Changelog
## 2026-04-19 - 6.3.3 - fix(build)
rename npmextra config to .smartconfig and refresh build metadata
- renames the published project config file from npmextra.json to .smartconfig.json
- updates build and tooling dependencies to newer patch and minor versions
- adds Node type configuration and TypeScript deprecation handling in tsconfig
- refreshes README documentation to match the current build command and runtime management APIs
## Next - feat(credentials)
add runtime credential management APIs
- Expose `listCredentials()` and `replaceCredentials()` through the Rust bridge and the `SmartStorage` TypeScript API.
- Move request authentication onto a native runtime credential store so credential replacement is atomic and effective for new requests immediately without a restart.
- Validate replacement input cleanly by rejecting empty replacement sets, empty credential fields, and duplicate `accessKeyId` values.
- Add runtime credential rotation tests covering initial auth, revocation of old credentials, multiple active credentials, and invalid replacements.
## Next - feat(cluster-health)
add runtime cluster and drive health introspection
- Expose `getClusterHealth()` through the Rust bridge and the `SmartStorage` TypeScript API.
- Report native cluster mode state including local node id, peer status, local drive probe health, quorum health, erasure settings, and tracked healing runtime state.
- Return a clear `{ enabled: false }` response when clustering is not active instead of synthesizing config-based data.
- Add standalone and single-node cluster tests plus README documentation for the best-effort semantics of peer and repair health values.
## Next - feat(stats)
add runtime bucket summaries and storage stats
- Expose `getStorageStats()` and `listBucketSummaries()` through the Rust bridge and the `SmartStorage` TypeScript API.
- Maintain native runtime stats for bucket counts, object counts, and logical stored bytes, initialized from on-disk state at startup and updated on bucket/object mutations.
- Include cheap filesystem-capacity snapshots for the storage directory or configured cluster drive paths.
- Add AWS SDK integration coverage for object add, delete, and bucket delete stats flows and document the cache consistency semantics in the README.
## 2026-03-23 - 6.3.2 - fix(docs)
update license ownership and correct README license file reference
- Adjusts the copyright holder name in the license file
- Fixes the README link to match the lowercase license filename
## 2026-03-21 - 6.3.1 - fix(cluster)
improve shard reconstruction validation and start background healing service
- use the erasure read quorum when reconstructing chunks instead of assuming data shard count
- verify reconstructed shards before writing healed data back to disk
- start the healing service during server initialization with shared local shard stores
- simplify QUIC request handling by decoding the full request buffer including trailing shard data
- clean up unused variables and imports across cluster modules
## 2026-03-21 - 6.3.0 - feat(readme)
document distributed cluster mode, erasure coding, and QUIC-based architecture
- Expand README overview and feature matrix to highlight clustering, multi-drive awareness, and distributed storage capabilities
- Add standalone and cluster mode usage examples plus cluster configuration options
- Document clustering internals including erasure coding, quorum behavior, QUIC transport, self-healing, and on-disk layout
## 2026-03-21 - 6.2.0 - feat(cluster)
add shard healing, drive health heartbeats, and clustered policy directory support
- implements manifest-based healing that scans affected shards on offline nodes, reconstructs data with erasure coding, and rewrites recovered shards to local storage
- includes drive status reporting in membership heartbeats by wiring DriveManager health checks into cluster heartbeat messages
- adds clustered policies directory initialization and exposes policy storage paths from the distributed coordinator
- extends distributed coordinator support for remote shard read and delete operations plus multipart upload session metadata
## 2026-03-21 - 6.1.0 - feat(cluster)
add clustered storage backend with QUIC transport, erasure coding, and shard management
+1 -1
View File
@@ -1,4 +1,4 @@
Copyright (c) 2021 Lossless GmbH (hello@lossless.com)
Copyright (c) 2021 Task Venture Capital GmbH (hello@task.vc)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
+10 -10
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartstorage",
"version": "6.1.0",
"version": "6.3.3",
"private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js",
@@ -15,14 +15,14 @@
"buildDocs": "tsdoc"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.1014.0",
"@git.zone/tsbuild": "^4.3.0",
"@git.zone/tsbundle": "^2.9.1",
"@git.zone/tsrun": "^2.0.1",
"@git.zone/tsrust": "^1.3.0",
"@git.zone/tstest": "^3.5.0",
"@push.rocks/smartbucket": "^4.5.1",
"@types/node": "^25.5.0"
"@aws-sdk/client-s3": "^3.1032.0",
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.2",
"@git.zone/tsrust": "^1.3.2",
"@git.zone/tstest": "^3.6.3",
"@push.rocks/smartbucket": "^4.6.0",
"@types/node": "^25.6.0"
},
"browserslist": [
"last 1 chrome versions"
@@ -37,7 +37,7 @@
"dist_ts_web/**/*",
"assets/**/*",
"cli.js",
"npmextra.json",
".smartconfig.json",
"readme.md"
],
"dependencies": {
+1594 -1724
View File
File diff suppressed because it is too large Load Diff
+14 -3
View File
@@ -11,6 +11,9 @@
- **Bucket policies** (AWS/MinIO-compatible JSON policies, public access support)
- CORS support
- ListBuckets, ListObjects (v1/v2), CopyObject
- Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans)
- Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state)
- Runtime credential listing and atomic replacement via the Rust bridge
## Architecture
@@ -20,6 +23,7 @@
- `management.rs` - IPC loop (newline-delimited JSON over stdin/stdout)
- `server.rs` - hyper 1.x HTTP server, routing, CORS, auth+policy pipeline, all S3-compatible handlers
- `storage.rs` - FileStore: filesystem-backed storage, multipart manager, `.policies/` dir
- `storage.rs` also owns the runtime stats cache and standalone storage scans used by the bridge stats API
- `xml_response.rs` - S3-compatible XML response builders
- `error.rs` - StorageError codes with HTTP status mapping
- `auth.rs` - AWS SigV4 signature verification (HMAC-SHA256, clock skew, constant-time compare)
@@ -37,6 +41,11 @@
| `start` | `{ config: ISmartStorageConfig }` | Init storage + HTTP server |
| `stop` | `{}` | Graceful shutdown |
| `createBucket` | `{ name: string }` | Create bucket directory |
| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots |
| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries |
| `listCredentials` | `{}` | Return the active runtime auth credential set |
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
### Storage Layout
- Objects: `{root}/{bucket}/{key}._storage_object`
@@ -47,9 +56,9 @@
## Build
- `pnpm build` runs `tsrust && tsbuild --web --allowimplicitany`
- `pnpm build` runs `tsrust && tsbuild tsfolders --allowimplicitany`
- `tsrust` compiles Rust to `dist_rust/ruststorage`
- Targets: linux_amd64, linux_arm64 (configured in npmextra.json)
- Targets: linux_amd64, linux_arm64 (configured in .smartconfig.json)
## Dependencies
@@ -60,7 +69,9 @@
## Testing
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility (10 tests, auth disabled, port 3337)
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337)
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
- `test/test.auth.node.ts` - Auth + bucket policy integration (20 tests, auth enabled, port 3344)
- `test/test.policy-crud.node.ts` - Policy API CRUD + validation edge cases (17 tests, port 3345)
- `test/test.policy-eval.node.ts` - Policy evaluation: principals, actions, resources, deny-vs-allow (22 tests, port 3346)
+323 -47
View File
@@ -1,6 +1,6 @@
# @push.rocks/smartstorage
A high-performance, S3-compatible local storage server powered by a **Rust core** with a clean TypeScript API. Drop-in replacement for AWS S3 during development and testing — no cloud, no Docker, no MinIO. Just `npm install` and go.
A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just `npm install` and go. 🚀
## Issue Reporting and Security
@@ -15,23 +15,37 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
| Large file uploads | Streaming, zero-copy | Yes | OOM risk |
| Range requests | Seek-based | Yes | Full read |
| Language | Rust + TypeScript | Go | JavaScript |
| Multipart uploads | Full support | Yes | No |
| Multipart uploads | Full support | Yes | No |
| Auth | AWS SigV4 (full verification) | Full IAM | Basic |
| Bucket policies | IAM-style evaluation | Yes | No |
| Clustering | ✅ Erasure-coded, QUIC | Yes | No |
| Multi-drive awareness | ✅ Per-drive health | Yes | No |
### Core Features
- **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure
- **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client
- **Filesystem-backed storage** — buckets map to directories, objects to files
- **Streaming multipart uploads** — large files without memory pressure
- **Byte-range requests** — `seek()` directly to the requested byte offset
- **AWS SigV4 authentication** — full signature verification with constant-time comparison and 15-min clock skew enforcement
- **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation, wildcard matching, and anonymous access support
- **CORS middleware** — configurable cross-origin support
- **Structured logging** — tracing-based, error through debug levels
- **Clean slate mode** — wipe storage on startup for test isolation
- **Test-first design** — start/stop in milliseconds, no port conflicts
- 🦀 **Rust-powered HTTP server** — hyper 1.x with streaming I/O, zero-copy, backpressure
- 📦 **Full S3-compatible API** — works with AWS SDK v3, SmartBucket, any S3 client
- 💾 **Filesystem-backed storage** — buckets map to directories, objects to files
- 📤 **Streaming multipart uploads** — large files without memory pressure
- 📐 **Byte-range requests**`seek()` directly to the requested byte offset
- 🔐 **AWS SigV4 authentication** — full signature verification with constant-time comparison
- 📋 **Bucket policies** — IAM-style JSON policies with Allow/Deny evaluation and wildcard matching
- 🌐 **CORS middleware** — configurable cross-origin support
- 🧹 **Clean slate mode**wipe storage on startup for test isolation
- 📊 **Runtime storage stats**cheap bucket summaries and global counts without S3 list scans
- 🔑 **Runtime credential rotation**list and replace active auth credentials without mutating internals
-**Test-first design** — start/stop in milliseconds, no port conflicts
### Clustering Features
- 🔗 **Erasure coding** — Reed-Solomon (configurable k data + m parity shards) for storage efficiency and fault tolerance
- 🚄 **QUIC transport** — multiplexed, encrypted inter-node communication via `quinn` with zero head-of-line blocking
- 💽 **Multi-drive awareness** — each node manages multiple independent storage paths with health monitoring
- 🩺 **Cluster health introspection** — query native node, drive, quorum, and healing status for product dashboards
- 🤝 **Cluster membership** — static seed config + runtime join, heartbeat-based failure detection
- ✍️ **Quorum writes** — data is only acknowledged after k+1 shards are persisted
- 📖 **Quorum reads** — reconstruct from any k available shards, local-first fast path
- 🩹 **Self-healing** — background scanner detects and reconstructs missing/corrupt shards
## Installation
@@ -43,6 +57,8 @@ pnpm add @push.rocks/smartstorage -D
## Quick Start
### Standalone Mode (Dev & Test)
```typescript
import { SmartStorage } from '@push.rocks/smartstorage';
@@ -63,6 +79,31 @@ const descriptor = await storage.getStorageDescriptor();
await storage.stop();
```
### Cluster Mode (Distributed)
```typescript
import { SmartStorage } from '@push.rocks/smartstorage';
const storage = await SmartStorage.createAndStart({
server: { port: 3000 },
cluster: {
enabled: true,
nodeId: 'node-1',
quicPort: 4000,
seedNodes: ['192.168.1.11:4000', '192.168.1.12:4000'],
erasure: {
dataShards: 4, // k: minimum shards to reconstruct data
parityShards: 2, // m: fault tolerance (can lose up to m shards)
},
drives: {
paths: ['/mnt/disk1', '/mnt/disk2', '/mnt/disk3'],
},
},
});
```
Objects are automatically split into chunks (default 4 MB), erasure-coded into 6 shards (4 data + 2 parity), and distributed across drives/nodes. Any 4 of 6 shards can reconstruct the original data.
## Configuration
All config fields are optional — sensible defaults are applied automatically.
@@ -75,7 +116,7 @@ const config: ISmartStorageConfig = {
port: 3000, // Default: 3000
address: '0.0.0.0', // Default: '0.0.0.0'
silent: false, // Default: false
region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing
region: 'us-east-1', // Default: 'us-east-1' — used for SigV4 signing
},
storage: {
directory: './my-data', // Default: .nogit/bucketsDir
@@ -111,6 +152,22 @@ const config: ISmartStorageConfig = {
expirationDays: 7,
cleanupIntervalMinutes: 60,
},
cluster: { // Optional — omit for standalone mode
enabled: true,
nodeId: 'node-1', // Auto-generated UUID if omitted
quicPort: 4000, // Default: 4000
seedNodes: [], // Addresses of existing cluster members
erasure: {
dataShards: 4, // Default: 4
parityShards: 2, // Default: 2
chunkSizeBytes: 4194304, // Default: 4 MB
},
drives: {
paths: ['/mnt/disk1', '/mnt/disk2'],
},
heartbeatIntervalMs: 5000, // Default: 5000
heartbeatTimeoutMs: 30000, // Default: 30000
},
};
const storage = await SmartStorage.createAndStart(config);
@@ -147,6 +204,112 @@ const storage = await SmartStorage.createAndStart({
});
```
## Runtime Credentials
```typescript
const credentials = await storage.listCredentials();
await storage.replaceCredentials([
{
accessKeyId: 'ADMINA',
secretAccessKey: 'super-secret-a',
},
{
accessKeyId: 'ADMINB',
secretAccessKey: 'super-secret-b',
},
]);
```
```typescript
interface IStorageCredential {
accessKeyId: string;
secretAccessKey: string;
}
```
- `listCredentials()` returns the Rust core's current runtime credential set.
- `replaceCredentials()` swaps the full set atomically. On success, new requests use the new set immediately and the old credentials stop authenticating immediately.
- Requests that were already authenticated before the replacement keep running; auth is evaluated when each request starts.
- No restart is required.
- Replacement input must contain at least one credential, each `accessKeyId` and `secretAccessKey` must be non-empty, and `accessKeyId` values must be unique.
## Runtime Stats
```typescript
const stats = await storage.getStorageStats();
const bucketSummaries = await storage.listBucketSummaries();
console.log(stats.bucketCount);
console.log(stats.totalObjectCount);
console.log(stats.totalStorageBytes);
console.log(bucketSummaries[0]?.name, bucketSummaries[0]?.objectCount);
```
```typescript
interface IBucketSummary {
name: string;
objectCount: number;
totalSizeBytes: number;
creationDate?: number;
}
interface IStorageLocationSummary {
path: string;
totalBytes?: number;
availableBytes?: number;
usedBytes?: number;
}
interface IStorageStats {
bucketCount: number;
totalObjectCount: number;
totalStorageBytes: number;
buckets: IBucketSummary[];
storageDirectory: string;
storageLocations?: IStorageLocationSummary[];
}
```
- `bucketCount`, `totalObjectCount`, `totalStorageBytes`, and per-bucket totals are logical object stats maintained by the Rust runtime. They count object payload bytes, not sidecar files or erasure-coded shard overhead.
- smartstorage initializes these values from native on-disk state at startup, then keeps them in memory and updates them when bucket/object mutations succeed. Stats reads do not issue S3 `ListObjects` or rescan every object.
- Values are exact for mutations performed through smartstorage after startup. Direct filesystem edits outside smartstorage are not watched; restart the server to resync.
- `storageLocations` is a cheap filesystem-capacity snapshot. Standalone mode reports the storage directory. Cluster mode reports the configured drive paths.
## Cluster Health
```typescript
const clusterHealth = await storage.getClusterHealth();
if (!clusterHealth.enabled) {
console.log('Cluster mode is disabled');
} else {
console.log(clusterHealth.nodeId, clusterHealth.quorumHealthy);
console.log(clusterHealth.peers);
console.log(clusterHealth.drives);
}
```
```typescript
interface IClusterHealth {
enabled: boolean;
nodeId?: string;
quorumHealthy?: boolean;
majorityHealthy?: boolean;
peers?: IClusterPeerHealth[];
drives?: IClusterDriveHealth[];
erasure?: IClusterErasureHealth;
repairs?: IClusterRepairHealth;
}
```
- `getClusterHealth()` is served by the Rust core. The TypeScript wrapper does not infer values from static config.
- Standalone mode returns `{ enabled: false }`.
- Peer status is the local node's current view of cluster membership and heartbeats, so it is best-effort and may lag real network state.
- Drive health is based on live native probe checks on the configured local drive paths. Capacity values are cheap filesystem snapshots.
- `quorumHealthy` means the local node currently sees majority quorum and enough available placements in every erasure set to satisfy the configured write quorum.
- Repair fields expose the background healer's currently available runtime state. They are best-effort and limited to what the engine tracks today, such as whether a scan is active, the last completed run, and the last error.
## Usage with AWS SDK v3
```typescript
@@ -207,7 +370,7 @@ const files = await dir.listFiles();
## Multipart Uploads
For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory.
For files larger than 5 MB, use multipart uploads. smartstorage handles them with **streaming I/O** — parts are written directly to disk, never buffered in memory. In cluster mode, each part is independently erasure-coded and distributed.
```typescript
import {
@@ -255,8 +418,6 @@ When `auth.enabled` is `true`, the auth pipeline works as follows:
### Setting a Bucket Policy
Use the S3 `PutBucketPolicy` API (or any S3 client that supports it):
```typescript
import { PutBucketPolicyCommand } from '@aws-sdk/client-s3';
@@ -294,6 +455,81 @@ await client.send(new PutBucketPolicyCommand({
Deleting a bucket automatically removes its associated policy.
## Clustering Deep Dive 🔗
smartstorage can run as a distributed storage cluster where multiple nodes cooperate to store and retrieve data with built-in redundancy.
### How It Works
```
Client ──HTTP PUT──▶ Node A (coordinator)
├─ Split object into 4 MB chunks
├─ Erasure-code each chunk (4 data + 2 parity = 6 shards)
├──QUIC──▶ Node B (shard writes)
├──QUIC──▶ Node C (shard writes)
└─ Local disk (shard writes)
```
1. **Any node can coordinate** — the client connects to any cluster member
2. **Objects are chunked** — large objects split into fixed-size pieces (default 4 MB)
3. **Each chunk is erasure-coded** — Reed-Solomon produces k data + m parity shards
4. **Shards are distributed** — placed across different nodes and drives for fault isolation
5. **Quorum guarantees consistency** — writes need k+1 acks, reads need k shards
### Erasure Coding
With the default `4+2` configuration:
- Storage overhead: **33%** (vs. 200% for 3x replication)
- Fault tolerance: **any 2 drives/nodes can fail** simultaneously
- Read efficiency: only **4 of 6 shards** needed to reconstruct data
| Config | Total Shards | Overhead | Tolerance | Min Nodes |
|--------|-------------|----------|-----------|-----------|
| 4+2 | 6 | 33% | 2 failures | 3 |
| 6+3 | 9 | 50% | 3 failures | 5 |
| 2+1 | 3 | 50% | 1 failure | 2 |
### QUIC Transport
Inter-node communication uses [QUIC](https://en.wikipedia.org/wiki/QUIC) via the `quinn` library:
- 🔒 **Built-in TLS** — self-signed certs auto-generated at cluster init
- 🔀 **Multiplexed streams** — concurrent shard transfers without head-of-line blocking
-**Connection pooling** — persistent connections to peer nodes
- 🌊 **Natural backpressure** — QUIC flow control prevents overloading slow peers
### Cluster Membership
- **Static seed nodes** — initial cluster defined in config
- **Runtime join** — new nodes can join a running cluster
- **Heartbeat monitoring** — every 5s (configurable), with suspect/offline detection
- **Split-brain prevention** — nodes only mark peers offline when they have majority
### Self-Healing
A background scanner periodically (default: every 24h):
1. Checks shard checksums (CRC32C) for bit-rot detection
2. Identifies shards on offline nodes
3. Reconstructs missing shards from remaining data using Reed-Solomon
4. Places healed shards on healthy drives
Healing runs at low priority to avoid impacting foreground I/O.
### Erasure Set Formation
Drives are organized into fixed **erasure sets** at cluster initialization:
```
3 nodes × 4 drives each = 12 drives total
With 6-shard erasure sets → 2 erasure sets
Set 0: Node1-Disk0, Node2-Disk0, Node3-Disk0, Node1-Disk1, Node2-Disk1, Node3-Disk1
Set 1: Node1-Disk2, Node2-Disk2, Node3-Disk2, Node1-Disk3, Node2-Disk3, Node3-Disk3
```
Drives are interleaved across nodes for maximum fault isolation. New nodes form new erasure sets — existing data is never rebalanced.
## Testing Integration
```typescript
@@ -353,36 +589,62 @@ Get connection details for S3-compatible clients. Returns:
| `accessSecret` | `string` | Secret key from first configured credential |
| `useSsl` | `boolean` | Always `false` (plain HTTP) |
#### `getStorageStats(): Promise<IStorageStats>`
Read cached logical bucket and object totals from the Rust runtime without issuing S3 list calls.
#### `listBucketSummaries(): Promise<IBucketSummary[]>`
Get per-bucket logical object counts and total payload sizes.
#### `listCredentials(): Promise<IStorageCredential[]>`
Return the currently active runtime credential set.
#### `replaceCredentials(credentials: IStorageCredential[]): Promise<void>`
Atomically replace the active runtime credential set without restarting the server.
#### `getClusterHealth(): Promise<IClusterHealth>`
Read the Rust core's current cluster, drive, quorum, and repair health snapshot. Standalone mode returns `{ enabled: false }`.
## Architecture
smartstorage uses a **hybrid Rust + TypeScript** architecture:
```
┌─────────────────────────────────┐
│ Your Code (AWS SDK, etc.) │
│ ↕ HTTP (localhost:3000) │
├─────────────────────────────────┤
│ ruststorage binary (Rust) │
│ ├─ hyper 1.x HTTP server │
│ ├─ S3 path-style routing │
│ ├─ Streaming storage layer
├─ Multipart manager
├─ SigV4 auth + policy engine
├─ CORS middleware
└─ S3 XML response builder
├─────────────────────────────────┤
TypeScript (thin IPC wrapper)
├─ SmartStorage class
│ ├─ RustBridge (stdin/stdout)
─ Config & S3 descriptor
└─────────────────────────────────┘
┌──────────────────────────────────────────────
│ Your Code (AWS SDK, SmartBucket, etc.)
│ ↕ HTTP (localhost:3000)
├──────────────────────────────────────────────
│ ruststorage binary (Rust)
│ ├─ hyper 1.x HTTP server
│ ├─ S3 path-style routing
│ ├─ StorageBackend (Standalone or Clustered)
│ ├─ FileStore (single-node mode)
│ └─ DistributedStore (cluster mode)
│ ├─ ErasureCoder (Reed-Solomon)
│ ├─ ShardStore (per-drive storage)
│ │ ├─ QuicTransport (quinn) │
│ ├─ ClusterState & Membership
│ └─ HealingService
│ ├─ SigV4 auth + policy engine
─ CORS middleware
│ └─ S3 XML response builder │
├──────────────────────────────────────────────┤
│ TypeScript (thin IPC wrapper) │
│ ├─ SmartStorage class │
│ ├─ RustBridge (stdin/stdout JSON IPC) │
│ └─ Config & S3 descriptor │
└──────────────────────────────────────────────┘
```
**Why Rust?** The TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
**Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
**IPC Protocol:** TypeScript spawns the `ruststorage` binary with `--management` and communicates via newline-delimited JSON over stdin/stdout. Commands: `start`, `stop`, `createBucket`.
**IPC Protocol:** TypeScript communicates with the `ruststorage` binary over newline-delimited JSON via stdin/stdout. The current management commands are `start`, `stop`, `createBucket`, `getStorageStats`, `listBucketSummaries`, `listCredentials`, `replaceCredentials`, and `getClusterHealth`.
### S3-Compatible Operations Supported
### S3-Compatible Operations
| Operation | Method | Path |
|-----------|--------|------|
@@ -407,31 +669,45 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
### On-Disk Format
**Standalone mode:**
```
{storage.directory}/
{bucket}/
{key}._storage_object # Object data
{key}._storage_object # Object data
{key}._storage_object.metadata.json # Metadata (content-type, x-amz-meta-*, etc.)
{key}._storage_object.md5 # Cached MD5 hash
{key}._storage_object.md5 # Cached MD5 hash
.multipart/
{upload-id}/
metadata.json # Upload metadata (bucket, key, parts)
part-1 # Part data files
part-2
...
metadata.json # Upload metadata
part-1, part-2, ... # Part data files
.policies/
{bucket}.policy.json # Bucket policy (IAM JSON format)
{bucket}.policy.json # Bucket policy (IAM JSON format)
```
**Cluster mode:**
```
{drive_path}/.smartstorage/
format.json # Drive metadata (cluster ID, erasure set)
data/{bucket}/{key_hash}/{key}/
chunk-{N}/shard-{M}.dat # Erasure-coded shard data
chunk-{N}/shard-{M}.meta # Shard metadata (checksum, size)
{storage.directory}/
.manifests/{bucket}/
{key}.manifest.json # Object manifest (shard placements, checksums)
.buckets/{bucket}/ # Bucket metadata
.policies/{bucket}.policy.json # Bucket policies
```
## Related Packages
- [`@push.rocks/smartbucket`](https://code.foss.global/push.rocks/smartbucket) — High-level S3-compatible abstraction layer
- [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript <-> Rust IPC bridge
- [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) — TypeScript Rust IPC bridge
- [`@git.zone/tsrust`](https://code.foss.global/git.zone/tsrust) — Rust cross-compilation for npm packages
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
+1
View File
@@ -1346,6 +1346,7 @@ dependencies = [
"http-body-util",
"hyper",
"hyper-util",
"libc",
"md-5",
"percent-encoding",
"quick-xml",
+1
View File
@@ -41,3 +41,4 @@ dashmap = "6"
hmac = "0.12"
sha2 = "0.10"
hex = "0.4"
libc = "0.2"
+71 -8
View File
@@ -2,9 +2,10 @@ use hmac::{Hmac, Mac};
use hyper::body::Incoming;
use hyper::Request;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use tokio::sync::RwLock;
use crate::config::{Credential, SmartStorageConfig};
use crate::config::{AuthConfig, Credential};
use crate::error::StorageError;
type HmacSha256 = Hmac<Sha256>;
@@ -27,7 +28,7 @@ struct SigV4Header {
/// Verify the request's SigV4 signature. Returns the caller identity on success.
pub fn verify_request(
req: &Request<Incoming>,
config: &SmartStorageConfig,
credentials: &[Credential],
) -> Result<AuthenticatedIdentity, StorageError> {
let auth_header = req
.headers()
@@ -47,7 +48,7 @@ pub fn verify_request(
let parsed = parse_auth_header(auth_header)?;
// Look up credential
let credential = find_credential(&parsed.access_key_id, config)
let credential = find_credential(&parsed.access_key_id, credentials)
.ok_or_else(StorageError::invalid_access_key_id)?;
// Get x-amz-date
@@ -163,14 +164,76 @@ fn parse_auth_header(header: &str) -> Result<SigV4Header, StorageError> {
}
/// Find a credential by access key ID.
fn find_credential<'a>(access_key_id: &str, config: &'a SmartStorageConfig) -> Option<&'a Credential> {
config
.auth
.credentials
fn find_credential<'a>(access_key_id: &str, credentials: &'a [Credential]) -> Option<&'a Credential> {
credentials
.iter()
.find(|c| c.access_key_id == access_key_id)
}
#[derive(Debug)]
pub struct RuntimeCredentialStore {
enabled: bool,
credentials: RwLock<Vec<Credential>>,
}
impl RuntimeCredentialStore {
pub fn new(config: &AuthConfig) -> Self {
Self {
enabled: config.enabled,
credentials: RwLock::new(config.credentials.clone()),
}
}
pub fn enabled(&self) -> bool {
self.enabled
}
pub async fn list_credentials(&self) -> Vec<Credential> {
self.credentials.read().await.clone()
}
pub async fn snapshot_credentials(&self) -> Vec<Credential> {
self.credentials.read().await.clone()
}
pub async fn replace_credentials(&self, credentials: Vec<Credential>) -> Result<(), StorageError> {
validate_credentials(&credentials)?;
*self.credentials.write().await = credentials;
Ok(())
}
}
fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> {
if credentials.is_empty() {
return Err(StorageError::invalid_request(
"Credential replacement requires at least one credential.",
));
}
let mut seen_access_keys = HashSet::new();
for credential in credentials {
if credential.access_key_id.trim().is_empty() {
return Err(StorageError::invalid_request(
"Credential accessKeyId must not be empty.",
));
}
if credential.secret_access_key.trim().is_empty() {
return Err(StorageError::invalid_request(
"Credential secretAccessKey must not be empty.",
));
}
if !seen_access_keys.insert(credential.access_key_id.as_str()) {
return Err(StorageError::invalid_request(
"Credential accessKeyId values must be unique.",
));
}
}
Ok(())
}
/// Check clock skew (15 minutes max).
fn check_clock_skew(amz_date: &str) -> Result<(), StorageError> {
// Parse ISO 8601 basic format: YYYYMMDDTHHMMSSZ
File diff suppressed because it is too large Load Diff
+50 -6
View File
@@ -1,9 +1,9 @@
use super::config::DriveConfig;
use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tokio::fs;
use super::config::DriveConfig;
// ============================
// Drive format (on-disk metadata)
@@ -33,6 +33,7 @@ pub enum DriveStatus {
#[derive(Debug, Clone)]
pub struct DriveStats {
pub total_bytes: u64,
pub available_bytes: u64,
pub used_bytes: u64,
pub avg_write_latency_us: u64,
pub avg_read_latency_us: u64,
@@ -45,6 +46,7 @@ impl Default for DriveStats {
fn default() -> Self {
Self {
total_bytes: 0,
available_bytes: 0,
used_bytes: 0,
avg_write_latency_us: 0,
avg_read_latency_us: 0,
@@ -55,7 +57,7 @@ impl Default for DriveStats {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DriveState {
pub path: PathBuf,
pub format: Option<DriveFormat>,
@@ -74,10 +76,15 @@ pub struct DriveManager {
impl DriveManager {
/// Initialize drive manager with configured drive paths.
pub async fn new(config: &DriveConfig) -> Result<Self> {
let mut drives = Vec::with_capacity(config.paths.len());
let paths: Vec<PathBuf> = config.paths.iter().map(PathBuf::from).collect();
Self::from_paths(&paths).await
}
for path_str in &config.paths {
let path = PathBuf::from(path_str);
/// Initialize drive manager from an explicit list of resolved paths.
pub async fn from_paths(paths: &[PathBuf]) -> Result<Self> {
let mut drives = Vec::with_capacity(paths.len());
for path in paths {
let storage_dir = path.join(".smartstorage");
// Ensure the drive directory exists
@@ -92,7 +99,7 @@ impl DriveManager {
};
drives.push(DriveState {
path,
path: path.clone(),
format,
status,
stats: DriveStats::default(),
@@ -154,6 +161,11 @@ impl DriveManager {
&self.drives
}
/// Get a cloneable snapshot of current drive states.
pub fn snapshot(&self) -> Vec<DriveState> {
self.drives.clone()
}
/// Get drives that are online.
pub fn online_drives(&self) -> Vec<usize> {
self.drives
@@ -203,6 +215,11 @@ impl DriveManager {
let _ = fs::remove_file(&probe_path).await;
let latency = start.elapsed();
if let Some((total_bytes, available_bytes, used_bytes)) = filesystem_usage(&drive.path) {
drive.stats.total_bytes = total_bytes;
drive.stats.available_bytes = available_bytes;
drive.stats.used_bytes = used_bytes;
}
drive.stats.avg_write_latency_us = latency.as_micros() as u64;
drive.stats.last_check = Utc::now();
@@ -240,3 +257,30 @@ impl DriveManager {
serde_json::from_str(&content).ok()
}
}
#[cfg(unix)]
fn filesystem_usage(path: &Path) -> Option<(u64, u64, u64)> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let path_bytes = path.as_os_str().as_bytes();
let c_path = CString::new(path_bytes).ok()?;
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 {
return None;
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let free_bytes = stat.f_bfree as u64 * block_size;
let used_bytes = total_bytes.saturating_sub(free_bytes);
Some((total_bytes, available_bytes, used_bytes))
}
#[cfg(not(unix))]
fn filesystem_usage(_path: &Path) -> Option<(u64, u64, u64)> {
None
}
+342 -18
View File
@@ -1,23 +1,54 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs;
use tokio::sync::RwLock;
use super::coordinator::DistributedStore;
use super::config::ErasureConfig;
use super::erasure::ErasureCoder;
use super::metadata::ObjectManifest;
use super::shard_store::{ShardId, ShardStore};
use super::state::ClusterState;
/// Background healing service that scans for under-replicated shards
/// and reconstructs them.
pub struct HealingService {
state: Arc<ClusterState>,
erasure_coder: ErasureCoder,
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval: Duration,
runtime_state: Arc<RwLock<HealingRuntimeState>>,
}
impl HealingService {
pub fn new(state: Arc<ClusterState>, scan_interval_hours: u64) -> Self {
Self {
state,
scan_interval: Duration::from_secs(scan_interval_hours * 3600),
pub fn new(
state: Arc<ClusterState>,
erasure_config: &ErasureConfig,
local_shard_stores: Vec<Arc<ShardStore>>,
manifest_dir: PathBuf,
scan_interval_hours: u64,
runtime_state: Arc<RwLock<HealingRuntimeState>>,
) -> Result<Self> {
let scan_interval = Duration::from_secs(scan_interval_hours * 3600);
if let Ok(mut state_guard) = runtime_state.try_write() {
state_guard.scan_interval_ms = scan_interval.as_millis() as u64;
}
Ok(Self {
state,
erasure_coder: ErasureCoder::new(erasure_config)?,
local_shard_stores,
manifest_dir,
scan_interval,
runtime_state,
})
}
pub fn runtime_state(&self) -> Arc<RwLock<HealingRuntimeState>> {
self.runtime_state.clone()
}
/// Run the healing loop as a background task.
@@ -30,9 +61,12 @@ impl HealingService {
loop {
tokio::select! {
_ = interval.tick() => {
let started_at = Utc::now();
self.mark_healing_started(started_at).await;
tracing::info!("Starting healing scan");
match self.heal_scan().await {
Ok(stats) => {
self.mark_healing_finished(started_at, Some(stats.clone()), None).await;
tracing::info!(
checked = stats.shards_checked,
healed = stats.shards_healed,
@@ -41,6 +75,7 @@ impl HealingService {
);
}
Err(e) => {
self.mark_healing_finished(started_at, None, Some(e.to_string())).await;
tracing::error!("Healing scan failed: {}", e);
}
}
@@ -53,7 +88,38 @@ impl HealingService {
}
}
/// Scan for offline nodes and identify objects that need healing.
async fn mark_healing_started(&self, started_at: DateTime<Utc>) {
let mut runtime_state = self.runtime_state.write().await;
runtime_state.active = true;
runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64;
runtime_state.last_run_started_at = Some(started_at);
runtime_state.last_error = None;
}
async fn mark_healing_finished(
&self,
started_at: DateTime<Utc>,
stats: Option<HealStats>,
last_error: Option<String>,
) {
let finished_at = Utc::now();
let mut runtime_state = self.runtime_state.write().await;
runtime_state.active = false;
runtime_state.scan_interval_ms = self.scan_interval.as_millis() as u64;
runtime_state.last_run_completed_at = Some(finished_at);
runtime_state.last_duration_ms = Some(
finished_at
.signed_duration_since(started_at)
.num_milliseconds()
.max(0) as u64,
);
if let Some(stats) = stats {
runtime_state.last_stats = Some(stats);
}
runtime_state.last_error = last_error;
}
/// Scan all manifests for shards on offline nodes, reconstruct and re-place them.
async fn heal_scan(&self) -> Result<HealStats> {
let mut stats = HealStats::default();
@@ -63,30 +129,288 @@ impl HealingService {
return Ok(stats);
}
tracing::info!(
"Found {} offline nodes, scanning for affected shards",
offline_nodes.len()
);
// Check that we have majority before healing
// (prevents healing during split-brain)
// Check that we have majority before healing (split-brain prevention)
if !self.state.has_majority().await {
tracing::warn!("No majority quorum, skipping heal to prevent split-brain");
return Ok(stats);
}
// TODO: Iterate all manifests, find shards on offline nodes,
// reconstruct from remaining shards and place on healthy nodes.
// This requires access to the DistributedStore and manifest listing
// which will be wired in when the full healing pipeline is implemented.
tracing::info!(
"Found {} offline nodes, scanning for affected shards",
offline_nodes.len()
);
// Iterate all bucket directories under manifest_dir
let mut bucket_entries = match fs::read_dir(&self.manifest_dir).await {
Ok(e) => e,
Err(_) => return Ok(stats),
};
while let Some(bucket_entry) = bucket_entries.next_entry().await? {
if !bucket_entry.metadata().await?.is_dir() {
continue;
}
let bucket_name = bucket_entry.file_name().to_string_lossy().to_string();
if bucket_name.starts_with('.') {
continue;
}
// Scan manifests in this bucket
self.heal_bucket(&bucket_name, &offline_nodes, &mut stats)
.await;
// Yield to avoid starving foreground I/O
tokio::task::yield_now().await;
}
Ok(stats)
}
async fn heal_bucket(
&self,
bucket: &str,
offline_nodes: &[String],
stats: &mut HealStats,
) {
let bucket_dir = self.manifest_dir.join(bucket);
let manifests = match self.collect_manifests(&bucket_dir).await {
Ok(m) => m,
Err(e) => {
tracing::warn!(bucket = bucket, error = %e, "Failed to list manifests");
stats.errors += 1;
return;
}
};
let local_id = self.state.local_node_id().to_string();
for manifest in &manifests {
for chunk in &manifest.chunks {
// Check if any shard in this chunk is on an offline node
let affected: Vec<_> = chunk
.shard_placements
.iter()
.filter(|p| offline_nodes.contains(&p.node_id))
.collect();
if affected.is_empty() {
continue;
}
stats.shards_checked += chunk.shard_placements.len() as u64;
// Try to reconstruct missing shards from available ones
let k = manifest.data_shards;
let total = manifest.data_shards + manifest.parity_shards;
// Count available shards (those NOT on offline nodes)
let available_count = chunk
.shard_placements
.iter()
.filter(|p| !offline_nodes.contains(&p.node_id))
.count();
if available_count < k {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
available = available_count,
needed = k,
"Cannot heal chunk: not enough available shards"
);
stats.errors += 1;
continue;
}
// Fetch available shards (only local ones for now)
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut fetched = 0usize;
for placement in &chunk.shard_placements {
if offline_nodes.contains(&placement.node_id) {
continue; // Skip offline nodes
}
if fetched >= k {
break;
}
if placement.node_id == local_id {
let shard_id = ShardId {
bucket: manifest.bucket.clone(),
key: manifest.key.clone(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
if let Some(store) = self.local_shard_stores.get(store_idx) {
if let Ok((data, _)) = store.read_shard(&shard_id).await {
shards[placement.shard_index as usize] = Some(data);
fetched += 1;
}
}
}
// TODO: fetch from other online remote nodes
}
if fetched < k {
tracing::warn!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Not enough local shards to heal, skipping"
);
continue;
}
// Reconstruct all shards
let reconstructed = match self.erasure_coder.decode_chunk(
&mut shards,
chunk.data_size,
) {
Ok(_) => true,
Err(e) => {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
error = %e,
"Reconstruction failed"
);
stats.errors += 1;
false
}
};
if !reconstructed {
continue;
}
// Re-encode to get all shards back (including the missing ones)
let full_data_size = chunk.data_size;
let mut data_buf = Vec::with_capacity(full_data_size);
for i in 0..k {
if let Some(ref shard) = shards[i] {
data_buf.extend_from_slice(shard);
}
}
data_buf.truncate(full_data_size);
let all_shards = match self.erasure_coder.encode_chunk(&data_buf) {
Ok(s) => s,
Err(e) => {
tracing::error!(error = %e, "Re-encoding for heal failed");
stats.errors += 1;
continue;
}
};
// Verify reconstructed shards are consistent
if !self.erasure_coder.verify(&all_shards).unwrap_or(false) {
tracing::error!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
"Shard verification failed after reconstruction"
);
stats.errors += 1;
continue;
}
// Write the missing shards to the first available local drive
for affected_placement in &affected {
let shard_idx = affected_placement.shard_index as usize;
if shard_idx < all_shards.len() {
let shard_data = &all_shards[shard_idx];
let checksum = crc32c::crc32c(shard_data);
let shard_id = ShardId {
bucket: manifest.bucket.clone(),
key: manifest.key.clone(),
chunk_index: chunk.chunk_index,
shard_index: affected_placement.shard_index,
};
// Place on first available local drive
if let Some(store) = self.local_shard_stores.first() {
match store.write_shard(&shard_id, shard_data, checksum).await {
Ok(()) => {
stats.shards_healed += 1;
tracing::info!(
bucket = manifest.bucket,
key = manifest.key,
chunk = chunk.chunk_index,
shard = affected_placement.shard_index,
"Shard healed successfully"
);
}
Err(e) => {
tracing::error!(error = %e, "Failed to write healed shard");
stats.errors += 1;
}
}
}
}
}
tokio::task::yield_now().await;
}
}
}
/// Collect all manifests under a bucket directory.
async fn collect_manifests(&self, dir: &std::path::Path) -> Result<Vec<ObjectManifest>> {
let mut manifests = Vec::new();
self.collect_manifests_recursive(dir, &mut manifests).await?;
Ok(manifests)
}
fn collect_manifests_recursive<'a>(
&'a self,
dir: &'a std::path::Path,
manifests: &'a mut Vec<ObjectManifest>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
let mut entries = match fs::read_dir(dir).await {
Ok(e) => e,
Err(_) => return Ok(()),
};
while let Some(entry) = entries.next_entry().await? {
let meta = entry.metadata().await?;
let name = entry.file_name().to_string_lossy().to_string();
if meta.is_dir() {
self.collect_manifests_recursive(&entry.path(), manifests)
.await?;
} else if name.ends_with(".manifest.json") {
if let Ok(content) = fs::read_to_string(entry.path()).await {
if let Ok(manifest) = serde_json::from_str::<ObjectManifest>(&content) {
manifests.push(manifest);
}
}
}
}
Ok(())
})
}
}
#[derive(Debug, Default)]
#[derive(Debug, Clone, Default)]
pub struct HealStats {
pub shards_checked: u64,
pub shards_healed: u64,
pub errors: u64,
}
#[derive(Debug, Clone, Default)]
pub struct HealingRuntimeState {
pub active: bool,
pub scan_interval_ms: u64,
pub last_run_started_at: Option<DateTime<Utc>>,
pub last_run_completed_at: Option<DateTime<Utc>>,
pub last_duration_ms: Option<u64>,
pub last_stats: Option<HealStats>,
pub last_error: Option<String>,
}
+44 -2
View File
@@ -3,8 +3,12 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use super::drive_manager::{DriveManager, DriveStatus};
use super::protocol::{
ClusterRequest, ClusterResponse, HeartbeatMessage, JoinRequestMessage, NodeInfo,
ClusterRequest, ClusterResponse, DriveStateInfo, HeartbeatMessage, JoinRequestMessage,
NodeInfo,
};
use super::quic_transport::QuicTransport;
use super::state::ClusterState;
@@ -15,6 +19,7 @@ pub struct MembershipManager {
transport: Arc<QuicTransport>,
heartbeat_interval: Duration,
local_node_info: NodeInfo,
drive_manager: Option<Arc<Mutex<DriveManager>>>,
}
impl MembershipManager {
@@ -29,9 +34,16 @@ impl MembershipManager {
transport,
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
local_node_info,
drive_manager: None,
}
}
/// Set the drive manager for health reporting in heartbeats.
pub fn with_drive_manager(mut self, dm: Arc<Mutex<DriveManager>>) -> Self {
self.drive_manager = Some(dm);
self
}
/// Join the cluster by contacting seed nodes.
/// Sends a JoinRequest to each seed node until one accepts.
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
@@ -129,6 +141,9 @@ impl MembershipManager {
let topology_version = self.state.version().await;
let mut responded = Vec::new();
// Collect drive health states
let drive_states = self.collect_drive_states().await;
for peer in &peers {
let addr: SocketAddr = match peer.quic_addr.parse() {
Ok(a) => a,
@@ -138,7 +153,7 @@ impl MembershipManager {
let heartbeat = ClusterRequest::Heartbeat(HeartbeatMessage {
node_id: self.local_node_info.node_id.clone(),
timestamp: chrono::Utc::now().to_rfc3339(),
drive_states: Vec::new(), // TODO: populate from DriveManager
drive_states: drive_states.clone(),
topology_version,
});
@@ -181,4 +196,31 @@ impl MembershipManager {
let _response = self.transport.send_request(&conn, heartbeat).await?;
Ok(())
}
/// Collect drive health states from the DriveManager, if available.
async fn collect_drive_states(&self) -> Vec<DriveStateInfo> {
let dm = match &self.drive_manager {
Some(dm) => dm,
None => return Vec::new(),
};
let mut manager = dm.lock().await;
let results = manager.check_all_drives().await;
results
.into_iter()
.map(|(idx, status)| {
let status_str = match status {
DriveStatus::Online => "online",
DriveStatus::Degraded => "degraded",
DriveStatus::Offline => "offline",
DriveStatus::Healing => "healing",
};
DriveStateInfo {
drive_index: idx as u32,
status: status_str.to_string(),
}
})
.collect()
}
}
+4
View File
@@ -1,3 +1,7 @@
// Cluster modules contain forward-looking public API that is incrementally wired.
// Allow dead_code for methods/structs not yet called from the main server path.
#![allow(dead_code)]
pub mod config;
pub mod coordinator;
pub mod drive_manager;
+5 -15
View File
@@ -4,8 +4,6 @@ use quinn::{ClientConfig, Endpoint, ServerConfig as QuinnServerConfig};
use rustls::pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
};
@@ -225,20 +223,14 @@ impl QuicTransport {
mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>,
) -> Result<()> {
// Read the length-prefixed request header
let mut len_buf = [0u8; 4];
recv.read_exact(&mut len_buf).await?;
let msg_len = u32::from_le_bytes(len_buf) as usize;
let mut msg_buf = vec![0u8; msg_len];
recv.read_exact(&mut msg_buf).await?;
let request: ClusterRequest = bincode::deserialize(&msg_buf)?;
// Read the full request (length-prefixed bincode + optional trailing data)
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
let (request, header_len) = protocol::decode_request(&raw)?;
match request {
ClusterRequest::ShardWrite(write_req) => {
// Read shard data from the stream
let mut shard_data = vec![0u8; write_req.shard_data_length as usize];
recv.read_exact(&mut shard_data).await?;
// Shard data follows the header in the raw buffer
let shard_data = &raw[header_len..];
let shard_id = ShardId {
bucket: write_req.bucket,
@@ -348,8 +340,6 @@ impl QuicTransport {
// will be handled by the membership and coordinator modules.
// For now, send a generic ack.
_ => {
let response_data = recv.read_to_end(0).await.unwrap_or_default();
drop(response_data);
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "NotImplemented".to_string(),
+1 -1
View File
@@ -1,6 +1,6 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use tokio::fs;
use tokio::io::AsyncWriteExt;
+105
View File
@@ -4,6 +4,7 @@ use serde_json::Value;
use std::io::Write;
use tokio::io::{AsyncBufReadExt, BufReader};
use crate::config::Credential;
use crate::config::SmartStorageConfig;
use crate::server::StorageServer;
@@ -140,6 +141,110 @@ pub async fn management_loop() -> Result<()> {
}
}
}
"getStorageStats" => {
if let Some(ref s) = server {
match s.store().get_storage_stats().await {
Ok(stats) => match serde_json::to_value(stats) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize storage stats: {}", error),
);
}
},
Err(error) => {
send_error(id, format!("Failed to get storage stats: {}", error));
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"listBucketSummaries" => {
if let Some(ref s) = server {
match s.store().list_bucket_summaries().await {
Ok(summaries) => match serde_json::to_value(summaries) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize bucket summaries: {}", error),
);
}
},
Err(error) => {
send_error(id, format!("Failed to list bucket summaries: {}", error));
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"listCredentials" => {
if let Some(ref s) = server {
match serde_json::to_value(s.list_credentials().await) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize credentials: {}", error),
);
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"replaceCredentials" => {
#[derive(Deserialize)]
struct ReplaceCredentialsParams {
credentials: Vec<Credential>,
}
match serde_json::from_value::<ReplaceCredentialsParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s.replace_credentials(params.credentials).await {
Ok(()) => {
send_response(id, serde_json::json!({}));
}
Err(error) => {
send_error(
id,
format!("Failed to replace credentials: {}", error),
);
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid replaceCredentials params: {}", error));
}
}
}
"getClusterHealth" => {
if let Some(ref s) = server {
match s.store().get_cluster_health().await {
Ok(health) => match serde_json::to_value(health) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize cluster health: {}", error),
);
}
},
Err(error) => {
send_error(id, format!("Failed to get cluster health: {}", error));
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"clusterStatus" => {
send_response(
id,
+69 -13
View File
@@ -24,7 +24,8 @@ use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore;
use crate::cluster::config::ErasureConfig;
use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager;
use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo;
@@ -36,12 +37,14 @@ use crate::xml_response;
pub struct StorageServer {
store: Arc<StorageBackend>,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
shutdown_tx: watch::Sender<bool>,
server_handle: tokio::task::JoinHandle<()>,
}
impl StorageServer {
pub async fn start(config: SmartStorageConfig) -> Result<Self> {
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
if cluster_config.enabled {
Self::start_clustered(&config, cluster_config).await?
@@ -64,6 +67,7 @@ impl StorageServer {
let server_store = store.clone();
let server_config = config.clone();
let server_auth_runtime = auth_runtime.clone();
let server_policy_store = policy_store.clone();
let server_handle = tokio::spawn(async move {
@@ -77,15 +81,17 @@ impl StorageServer {
let io = TokioIo::new(stream);
let store = server_store.clone();
let cfg = server_config.clone();
let auth_runtime = server_auth_runtime.clone();
let ps = server_policy_store.clone();
tokio::spawn(async move {
let svc = service_fn(move |req: Request<Incoming>| {
let store = store.clone();
let cfg = cfg.clone();
let auth_runtime = auth_runtime.clone();
let ps = ps.clone();
async move {
handle_request(req, store, cfg, ps).await
handle_request(req, store, cfg, auth_runtime, ps).await
}
});
@@ -118,6 +124,7 @@ impl StorageServer {
Ok(Self {
store,
auth_runtime,
shutdown_tx,
server_handle,
})
@@ -132,6 +139,17 @@ impl StorageServer {
&self.store
}
pub async fn list_credentials(&self) -> Vec<crate::config::Credential> {
self.auth_runtime.list_credentials().await
}
pub async fn replace_credentials(
&self,
credentials: Vec<crate::config::Credential>,
) -> Result<(), StorageError> {
self.auth_runtime.replace_credentials(credentials).await
}
async fn start_standalone(config: &SmartStorageConfig) -> Result<Arc<StorageBackend>> {
let store = Arc::new(StorageBackend::Standalone(
FileStore::new(config.storage.directory.clone().into()),
@@ -217,20 +235,34 @@ impl StorageServer {
};
cluster_state.add_node(local_node_info.clone()).await;
// Join cluster if seed nodes are configured
let membership = Arc::new(MembershipManager::new(
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
local_node_info,
// Initialize drive manager for health monitoring
let drive_manager = Arc::new(tokio::sync::Mutex::new(
DriveManager::from_paths(&drive_paths).await?,
));
// Join cluster if seed nodes are configured
let membership = Arc::new(
MembershipManager::new(
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
local_node_info,
)
.with_drive_manager(drive_manager.clone()),
);
membership
.join_cluster(&cluster_config.seed_nodes)
.await?;
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
.iter()
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
// Start QUIC accept loop for incoming connections
let shard_store_for_accept = Arc::new(ShardStore::new(drive_paths[0].clone()));
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let shard_store_for_accept = local_shard_stores[0].clone();
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone();
tokio::spawn(async move {
transport_clone
@@ -240,21 +272,43 @@ impl StorageServer {
// Start heartbeat loop
let membership_clone = membership.clone();
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
});
// Start healing service
let healing_runtime = Arc::new(tokio::sync::RwLock::new(
crate::cluster::healing::HealingRuntimeState::default(),
));
let healing_service = HealingService::new(
cluster_state.clone(),
&erasure_config,
local_shard_stores.clone(),
manifest_dir.clone(),
24, // scan every 24 hours
healing_runtime.clone(),
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
// Create distributed store
let distributed_store = DistributedStore::new(
cluster_state,
transport,
erasure_config,
std::path::PathBuf::from(&config.storage.directory),
drive_paths,
drive_manager,
healing_runtime,
manifest_dir,
buckets_dir,
)?;
distributed_store.initialize_runtime_stats().await;
let store = Arc::new(StorageBackend::Clustered(distributed_store));
if !config.server.silent {
@@ -351,6 +405,7 @@ async fn handle_request(
req: Request<Incoming>,
store: Arc<StorageBackend>,
config: SmartStorageConfig,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
policy_store: Arc<PolicyStore>,
) -> Result<Response<BoxBody>, std::convert::Infallible> {
let request_id = Uuid::new_v4().to_string();
@@ -368,7 +423,7 @@ async fn handle_request(
let request_ctx = action::resolve_action(&req);
// Step 2: Auth + policy pipeline
if config.auth.enabled {
if auth_runtime.enabled() {
// Attempt authentication
let identity = {
let has_auth_header = req
@@ -379,7 +434,8 @@ async fn handle_request(
.unwrap_or(false);
if has_auth_header {
match auth::verify_request(&req, &config) {
let credentials = auth_runtime.snapshot_credentials().await;
match auth::verify_request(&req, &credentials) {
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("Auth failed: {}", e.message);
+500 -31
View File
@@ -8,6 +8,7 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
use tokio::sync::RwLock;
use uuid::Uuid;
use crate::cluster::coordinator::DistributedStore;
@@ -64,6 +65,133 @@ pub struct BucketInfo {
pub creation_date: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketSummary {
pub name: String,
pub object_count: u64,
pub total_size_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub creation_date: Option<i64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageLocationSummary {
pub path: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StorageStats {
pub bucket_count: u64,
pub total_object_count: u64,
pub total_storage_bytes: u64,
pub buckets: Vec<BucketSummary>,
pub storage_directory: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub storage_locations: Vec<StorageLocationSummary>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterPeerHealth {
pub node_id: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub quic_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub s3_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drive_count: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub missed_heartbeats: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterDriveHealth {
pub index: u32,
pub path: String,
pub status: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub used_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub available_bytes: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_count: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_check: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure_set_id: Option<u32>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterErasureHealth {
pub data_shards: usize,
pub parity_shards: usize,
pub chunk_size_bytes: usize,
pub total_shards: usize,
pub read_quorum: usize,
pub write_quorum: usize,
pub erasure_set_count: usize,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterRepairHealth {
pub active: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub scan_interval_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_started_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_run_completed_at: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_duration_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_checked: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub shards_healed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failed: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterHealth {
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub quorum_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub majority_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub peers: Option<Vec<ClusterPeerHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub drives: Option<Vec<ClusterDriveHealth>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure: Option<ClusterErasureHealth>,
#[serde(skip_serializing_if = "Option::is_none")]
pub repairs: Option<ClusterRepairHealth>,
}
pub struct MultipartUploadInfo {
pub upload_id: String,
pub key: String,
@@ -98,22 +226,186 @@ struct PartMetadata {
last_modified: String,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeBucketStats {
pub object_count: u64,
pub total_size_bytes: u64,
pub creation_date: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct RuntimeStatsState {
buckets: HashMap<String, RuntimeBucketStats>,
total_object_count: u64,
total_storage_bytes: u64,
}
impl RuntimeStatsState {
pub(crate) fn replace_buckets(&mut self, buckets: HashMap<String, RuntimeBucketStats>) {
self.total_object_count = buckets.values().map(|bucket| bucket.object_count).sum();
self.total_storage_bytes = buckets.values().map(|bucket| bucket.total_size_bytes).sum();
self.buckets = buckets;
}
pub(crate) fn ensure_bucket(&mut self, name: &str, creation_date: Option<DateTime<Utc>>) {
let bucket = self.buckets.entry(name.to_string()).or_default();
if bucket.creation_date.is_none() {
bucket.creation_date = creation_date;
}
}
pub(crate) fn remove_bucket(&mut self, name: &str) {
if let Some(bucket) = self.buckets.remove(name) {
self.total_object_count = self.total_object_count.saturating_sub(bucket.object_count);
self.total_storage_bytes = self
.total_storage_bytes
.saturating_sub(bucket.total_size_bytes);
}
}
pub(crate) fn upsert_object(
&mut self,
bucket_name: &str,
previous_size: Option<u64>,
new_size: u64,
) {
let bucket_was_present = self.buckets.contains_key(bucket_name);
let bucket = self.buckets.entry(bucket_name.to_string()).or_default();
if let Some(previous_size) = previous_size {
if !bucket_was_present {
bucket.object_count = 1;
self.total_object_count += 1;
}
bucket.total_size_bytes =
bucket.total_size_bytes.saturating_sub(previous_size) + new_size;
self.total_storage_bytes =
self.total_storage_bytes.saturating_sub(previous_size) + new_size;
} else {
bucket.object_count += 1;
bucket.total_size_bytes += new_size;
self.total_object_count += 1;
self.total_storage_bytes += new_size;
}
}
pub(crate) fn remove_object(&mut self, bucket_name: &str, existing_size: Option<u64>) {
let Some(existing_size) = existing_size else {
return;
};
let Some(bucket) = self.buckets.get_mut(bucket_name) else {
return;
};
bucket.object_count = bucket.object_count.saturating_sub(1);
bucket.total_size_bytes = bucket.total_size_bytes.saturating_sub(existing_size);
self.total_object_count = self.total_object_count.saturating_sub(1);
self.total_storage_bytes = self.total_storage_bytes.saturating_sub(existing_size);
}
pub(crate) fn bucket_summaries(&self) -> Vec<BucketSummary> {
let mut buckets: Vec<BucketSummary> = self
.buckets
.iter()
.map(|(name, stats)| BucketSummary {
name: name.clone(),
object_count: stats.object_count,
total_size_bytes: stats.total_size_bytes,
creation_date: stats
.creation_date
.as_ref()
.map(|creation_date| creation_date.timestamp_millis()),
})
.collect();
buckets.sort_by(|a, b| a.name.cmp(&b.name));
buckets
}
pub(crate) fn snapshot(
&self,
storage_directory: &Path,
storage_locations: Vec<StorageLocationSummary>,
) -> StorageStats {
StorageStats {
bucket_count: self.buckets.len() as u64,
total_object_count: self.total_object_count,
total_storage_bytes: self.total_storage_bytes,
buckets: self.bucket_summaries(),
storage_directory: storage_directory.to_string_lossy().to_string(),
storage_locations,
}
}
}
#[derive(Debug, Clone, Copy)]
struct FilesystemUsage {
total_bytes: u64,
available_bytes: u64,
used_bytes: u64,
}
pub(crate) fn storage_location_summary(path: &Path) -> StorageLocationSummary {
let usage = filesystem_usage(path);
StorageLocationSummary {
path: path.to_string_lossy().to_string(),
total_bytes: usage.map(|usage| usage.total_bytes),
available_bytes: usage.map(|usage| usage.available_bytes),
used_bytes: usage.map(|usage| usage.used_bytes),
}
}
#[cfg(unix)]
fn filesystem_usage(path: &Path) -> Option<FilesystemUsage> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let path_bytes = path.as_os_str().as_bytes();
let c_path = CString::new(path_bytes).ok()?;
let mut stat: libc::statvfs = unsafe { std::mem::zeroed() };
if unsafe { libc::statvfs(c_path.as_ptr(), &mut stat) } != 0 {
return None;
}
let block_size = stat.f_frsize as u64;
let total_bytes = stat.f_blocks as u64 * block_size;
let available_bytes = stat.f_bavail as u64 * block_size;
let free_bytes = stat.f_bfree as u64 * block_size;
Some(FilesystemUsage {
total_bytes,
available_bytes,
used_bytes: total_bytes.saturating_sub(free_bytes),
})
}
#[cfg(not(unix))]
fn filesystem_usage(_path: &Path) -> Option<FilesystemUsage> {
None
}
// ============================
// FileStore
// ============================
pub struct FileStore {
root_dir: PathBuf,
runtime_stats: RwLock<RuntimeStatsState>,
}
impl FileStore {
pub fn new(root_dir: PathBuf) -> Self {
Self { root_dir }
Self {
root_dir,
runtime_stats: RwLock::new(RuntimeStatsState::default()),
}
}
pub async fn initialize(&self) -> Result<()> {
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
@@ -127,9 +419,56 @@ impl FileStore {
}
fs::create_dir_all(&self.root_dir).await?;
fs::create_dir_all(self.policies_dir()).await?;
self.refresh_runtime_stats().await;
Ok(())
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.snapshot(
&self.root_dir,
vec![storage_location_summary(&self.root_dir)],
))
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
let runtime_stats = self.runtime_stats.read().await;
Ok(runtime_stats.bucket_summaries())
}
async fn refresh_runtime_stats(&self) {
let buckets = match self.list_buckets().await {
Ok(buckets) => buckets,
Err(error) => {
tracing::warn!(path = %self.root_dir.display(), error = %error, "Failed to initialize runtime stats");
return;
}
};
let mut runtime_buckets = HashMap::new();
for bucket in buckets {
let bucket_path = self.root_dir.join(&bucket.name);
match Self::scan_bucket_objects(&bucket_path).await {
Ok((object_count, total_size_bytes)) => {
runtime_buckets.insert(
bucket.name,
RuntimeBucketStats {
object_count,
total_size_bytes,
creation_date: Some(bucket.creation_date),
},
);
}
Err(error) => {
tracing::warn!(bucket = %bucket.name, error = %error, "Failed to scan bucket for runtime stats");
}
}
}
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.replace_buckets(runtime_buckets);
}
// ============================
// Bucket operations
// ============================
@@ -168,6 +507,7 @@ impl FileStore {
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
let bucket_path = self.root_dir.join(bucket);
fs::create_dir_all(&bucket_path).await?;
self.track_bucket_created(bucket).await;
Ok(())
}
@@ -185,6 +525,7 @@ impl FileStore {
}
fs::remove_dir_all(&bucket_path).await?;
self.track_bucket_deleted(bucket).await;
Ok(())
}
@@ -203,6 +544,8 @@ impl FileStore {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
@@ -243,9 +586,11 @@ impl FileStore {
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(&metadata_path, metadata_json).await?;
Ok(PutResult {
md5: md5_hex,
})
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(bucket, previous_size, object_size)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object(
@@ -310,6 +655,7 @@ impl FileStore {
}
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
let existing_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
let md5_path = format!("{}.md5", object_path.display());
let metadata_path = format!("{}.metadata.json", object_path.display());
@@ -337,6 +683,8 @@ impl FileStore {
current = dir.parent().map(|p| p.to_path_buf());
}
self.track_object_deleted(bucket, existing_size).await;
Ok(())
}
@@ -360,6 +708,8 @@ impl FileStore {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(dest_bucket, dest_key).await;
if let Some(parent) = dest_path.parent() {
fs::create_dir_all(parent).await?;
}
@@ -387,10 +737,10 @@ impl FileStore {
let md5 = self.read_md5(&dest_path).await;
let last_modified: DateTime<Utc> = file_meta.modified()?.into();
Ok(CopyResult {
md5,
last_modified,
})
self.track_object_upsert(dest_bucket, previous_size, file_meta.len())
.await;
Ok(CopyResult { md5, last_modified })
}
pub async fn list_objects(
@@ -438,11 +788,7 @@ impl FileStore {
if !delimiter.is_empty() {
let remaining = &key[prefix.len()..];
if let Some(delim_idx) = remaining.find(delimiter) {
let cp = format!(
"{}{}",
prefix,
&remaining[..delim_idx + delimiter.len()]
);
let cp = format!("{}{}", prefix, &remaining[..delim_idx + delimiter.len()]);
if common_prefix_set.insert(cp.clone()) {
common_prefixes.push(cp);
}
@@ -458,7 +804,10 @@ impl FileStore {
let object_path = self.object_path(bucket, key);
if let Ok(meta) = fs::metadata(&object_path).await {
let md5 = self.read_md5(&object_path).await;
let last_modified: DateTime<Utc> = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH).into();
let last_modified: DateTime<Utc> = meta
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH)
.into();
contents.push(ListObjectEntry {
key: key.clone(),
size: meta.len(),
@@ -611,6 +960,8 @@ impl FileStore {
let content = fs::read_to_string(&meta_path).await?;
let meta: MultipartMetadata = serde_json::from_str(&content)?;
let previous_size = self.object_size_if_exists(&meta.bucket, &meta.key).await;
let object_path = self.object_path(&meta.bucket, &meta.key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
@@ -653,12 +1004,14 @@ impl FileStore {
let metadata_json = serde_json::to_string_pretty(&meta.metadata)?;
fs::write(&metadata_path, metadata_json).await?;
let object_size = fs::metadata(&object_path).await?.len();
self.track_object_upsert(&meta.bucket, previous_size, object_size)
.await;
// Clean up multipart directory
let _ = fs::remove_dir_all(&upload_dir).await;
Ok(CompleteMultipartResult {
etag,
})
Ok(CompleteMultipartResult { etag })
}
pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> {
@@ -670,10 +1023,7 @@ impl FileStore {
Ok(())
}
pub async fn list_multipart_uploads(
&self,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
let multipart_dir = self.multipart_dir();
if !multipart_dir.is_dir() {
return Ok(Vec::new());
@@ -712,6 +1062,75 @@ impl FileStore {
// Helpers
// ============================
async fn scan_bucket_objects(bucket_path: &Path) -> Result<(u64, u64)> {
let mut object_count = 0u64;
let mut total_size_bytes = 0u64;
let mut directories = vec![bucket_path.to_path_buf()];
while let Some(directory) = directories.pop() {
let mut entries = match fs::read_dir(&directory).await {
Ok(entries) => entries,
Err(_) => continue,
};
while let Some(entry) = entries.next_entry().await? {
let metadata = entry.metadata().await?;
if metadata.is_dir() {
directories.push(entry.path());
continue;
}
let name = entry.file_name().to_string_lossy().to_string();
if name.ends_with("._storage_object") {
object_count += 1;
total_size_bytes += metadata.len();
}
}
}
Ok((object_count, total_size_bytes))
}
async fn bucket_creation_date(&self, bucket: &str) -> Option<DateTime<Utc>> {
let metadata = fs::metadata(self.root_dir.join(bucket)).await.ok()?;
let created_or_modified = metadata.created().unwrap_or(
metadata
.modified()
.unwrap_or(std::time::SystemTime::UNIX_EPOCH),
);
Some(created_or_modified.into())
}
async fn object_size_if_exists(&self, bucket: &str, key: &str) -> Option<u64> {
fs::metadata(self.object_path(bucket, key))
.await
.ok()
.map(|metadata| metadata.len())
}
async fn track_bucket_created(&self, bucket: &str) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
}
async fn track_bucket_deleted(&self, bucket: &str) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_bucket(bucket);
}
async fn track_object_upsert(&self, bucket: &str, previous_size: Option<u64>, new_size: u64) {
let creation_date = self.bucket_creation_date(bucket).await;
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.ensure_bucket(bucket, creation_date);
runtime_stats.upsert_object(bucket, previous_size, new_size);
}
async fn track_object_deleted(&self, bucket: &str, existing_size: Option<u64>) {
let mut runtime_stats = self.runtime_stats.write().await;
runtime_stats.remove_object(bucket, existing_size);
}
fn object_path(&self, bucket: &str, key: &str) -> PathBuf {
let encoded = encode_key(key);
self.root_dir
@@ -811,14 +1230,49 @@ impl StorageBackend {
pub fn policies_dir(&self) -> std::path::PathBuf {
match self {
StorageBackend::Standalone(fs) => fs.policies_dir(),
StorageBackend::Clustered(_) => PathBuf::from(".policies"), // TODO: proper policies in cluster mode
StorageBackend::Clustered(ds) => ds.policies_dir(),
}
}
pub async fn get_cluster_health(&self) -> Result<ClusterHealth> {
match self {
StorageBackend::Standalone(_) => Ok(ClusterHealth {
enabled: false,
node_id: None,
quorum_healthy: None,
majority_healthy: None,
peers: None,
drives: None,
erasure: None,
repairs: None,
}),
StorageBackend::Clustered(ds) => ds.get_cluster_health().await,
}
}
pub async fn get_storage_stats(&self) -> Result<StorageStats> {
match self {
StorageBackend::Standalone(fs) => fs.get_storage_stats().await,
StorageBackend::Clustered(ds) => ds.get_storage_stats().await,
}
}
pub async fn list_bucket_summaries(&self) -> Result<Vec<BucketSummary>> {
match self {
StorageBackend::Standalone(fs) => fs.list_bucket_summaries().await,
StorageBackend::Clustered(ds) => ds.list_bucket_summaries().await,
}
}
pub async fn initialize(&self) -> Result<()> {
match self {
StorageBackend::Standalone(fs) => fs.initialize().await,
StorageBackend::Clustered(_) => Ok(()), // Cluster init happens separately
StorageBackend::Clustered(ds) => {
// Ensure policies directory exists
tokio::fs::create_dir_all(ds.policies_dir()).await?;
ds.initialize_runtime_stats().await;
Ok(())
}
}
}
@@ -907,10 +1361,26 @@ impl StorageBackend {
) -> Result<CopyResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await
fs.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
StorageBackend::Clustered(ds) => {
ds.copy_object(src_bucket, src_key, dest_bucket, dest_key, metadata_directive, new_metadata).await
ds.copy_object(
src_bucket,
src_key,
dest_bucket,
dest_key,
metadata_directive,
new_metadata,
)
.await
}
}
}
@@ -925,10 +1395,12 @@ impl StorageBackend {
) -> Result<ListObjectsResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await
fs.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
StorageBackend::Clustered(ds) => {
ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token).await
ds.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
.await
}
}
}
@@ -975,10 +1447,7 @@ impl StorageBackend {
}
}
pub async fn list_multipart_uploads(
&self,
bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
pub async fn list_multipart_uploads(&self, bucket: &str) -> Result<Vec<MultipartUploadInfo>> {
match self {
StorageBackend::Standalone(fs) => fs.list_multipart_uploads(bucket).await,
StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await,
+112 -6
View File
@@ -1,16 +1,28 @@
/// <reference types="node" />
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { S3Client, CreateBucketCommand, ListBucketsCommand, PutObjectCommand, GetObjectCommand, DeleteObjectCommand, DeleteBucketCommand } from '@aws-sdk/client-s3';
import { Buffer } from 'buffer';
import { Readable } from 'stream';
import * as smartstorage from '../ts/index.js';
let testSmartStorageInstance: smartstorage.SmartStorage;
let s3Client: S3Client;
const testObjectBody = 'Hello from AWS SDK!';
const testObjectSize = Buffer.byteLength(testObjectBody);
function getBucketSummary(
summaries: smartstorage.IBucketSummary[],
bucketName: string,
): smartstorage.IBucketSummary | undefined {
return summaries.find((summary) => summary.name === bucketName);
}
// Helper to convert stream to string
async function streamToString(stream: Readable): Promise<string> {
const chunks: Buffer[] = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk) => chunks.push(Buffer.from(chunk)));
stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk)));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
@@ -46,28 +58,82 @@ tap.test('should list buckets (empty)', async () => {
expect(response.Buckets!.length).toEqual(0);
});
tap.test('should expose empty runtime stats after startup', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
expect(stats.bucketCount).toEqual(0);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(stats.buckets.length).toEqual(0);
expect(stats.storageDirectory.length > 0).toEqual(true);
});
tap.test('should expose disabled cluster health in standalone mode', async () => {
const clusterHealth = await testSmartStorageInstance.getClusterHealth();
expect(clusterHealth.enabled).toEqual(false);
expect(clusterHealth.nodeId).toEqual(undefined);
expect(clusterHealth.quorumHealthy).toEqual(undefined);
expect(clusterHealth.drives).toEqual(undefined);
});
tap.test('should create a bucket', async () => {
const response = await s3Client.send(new CreateBucketCommand({ Bucket: 'test-bucket' }));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('should list buckets (showing created bucket)', async () => {
tap.test('should create an empty bucket through the bridge', async () => {
const response = await testSmartStorageInstance.createBucket('empty-bucket');
expect(response.name).toEqual('empty-bucket');
});
tap.test('should list buckets (showing created buckets)', async () => {
const response = await s3Client.send(new ListBucketsCommand({}));
expect(response.Buckets!.length).toEqual(1);
expect(response.Buckets![0].Name).toEqual('test-bucket');
expect(response.Buckets!.length).toEqual(2);
expect(response.Buckets!.some((bucket) => bucket.Name === 'test-bucket')).toEqual(true);
expect(response.Buckets!.some((bucket) => bucket.Name === 'empty-bucket')).toEqual(true);
});
tap.test('should expose runtime bucket summaries after bucket creation', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const summaries = await testSmartStorageInstance.listBucketSummaries();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(summaries, 'empty-bucket');
expect(stats.bucketCount).toEqual(2);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(summaries.length).toEqual(2);
expect(testBucketSummary?.objectCount).toEqual(0);
expect(testBucketSummary?.totalSizeBytes).toEqual(0);
expect(typeof testBucketSummary?.creationDate).toEqual('number');
expect(emptyBucketSummary?.objectCount).toEqual(0);
expect(emptyBucketSummary?.totalSizeBytes).toEqual(0);
});
tap.test('should upload an object', async () => {
const response = await s3Client.send(new PutObjectCommand({
Bucket: 'test-bucket',
Key: 'test-file.txt',
Body: 'Hello from AWS SDK!',
Body: testObjectBody,
ContentType: 'text/plain',
}));
expect(response.$metadata.httpStatusCode).toEqual(200);
expect(response.ETag).toBeTypeofString();
});
tap.test('should reflect uploaded object in runtime stats', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket');
expect(stats.bucketCount).toEqual(2);
expect(stats.totalObjectCount).toEqual(1);
expect(stats.totalStorageBytes).toEqual(testObjectSize);
expect(testBucketSummary?.objectCount).toEqual(1);
expect(testBucketSummary?.totalSizeBytes).toEqual(testObjectSize);
expect(emptyBucketSummary?.objectCount).toEqual(0);
expect(emptyBucketSummary?.totalSizeBytes).toEqual(0);
});
tap.test('should download the object', async () => {
const response = await s3Client.send(new GetObjectCommand({
Bucket: 'test-bucket',
@@ -76,7 +142,7 @@ tap.test('should download the object', async () => {
expect(response.$metadata.httpStatusCode).toEqual(200);
const content = await streamToString(response.Body as Readable);
expect(content).toEqual('Hello from AWS SDK!');
expect(content).toEqual(testObjectBody);
});
tap.test('should delete the object', async () => {
@@ -87,6 +153,20 @@ tap.test('should delete the object', async () => {
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('should reflect object deletion in runtime stats', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket');
expect(stats.bucketCount).toEqual(2);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(testBucketSummary?.objectCount).toEqual(0);
expect(testBucketSummary?.totalSizeBytes).toEqual(0);
expect(emptyBucketSummary?.objectCount).toEqual(0);
expect(emptyBucketSummary?.totalSizeBytes).toEqual(0);
});
tap.test('should fail to get deleted object', async () => {
await expect(
s3Client.send(new GetObjectCommand({
@@ -96,11 +176,37 @@ tap.test('should fail to get deleted object', async () => {
).rejects.toThrow();
});
tap.test('should delete the empty bucket', async () => {
const response = await s3Client.send(new DeleteBucketCommand({ Bucket: 'empty-bucket' }));
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('should reflect bucket deletion in runtime stats', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
const testBucketSummary = getBucketSummary(stats.buckets, 'test-bucket');
const emptyBucketSummary = getBucketSummary(stats.buckets, 'empty-bucket');
expect(stats.bucketCount).toEqual(1);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(testBucketSummary?.objectCount).toEqual(0);
expect(testBucketSummary?.totalSizeBytes).toEqual(0);
expect(emptyBucketSummary).toEqual(undefined);
});
tap.test('should delete the bucket', async () => {
const response = await s3Client.send(new DeleteBucketCommand({ Bucket: 'test-bucket' }));
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('should expose empty runtime stats after deleting all buckets', async () => {
const stats = await testSmartStorageInstance.getStorageStats();
expect(stats.bucketCount).toEqual(0);
expect(stats.totalObjectCount).toEqual(0);
expect(stats.totalStorageBytes).toEqual(0);
expect(stats.buckets.length).toEqual(0);
});
tap.test('should stop the storage server', async () => {
await testSmartStorageInstance.stop();
});
+84
View File
@@ -0,0 +1,84 @@
/// <reference types="node" />
import { rm } from 'fs/promises';
import { join } from 'path';
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartstorage from '../ts/index.js';
let clusterStorage: smartstorage.SmartStorage;
const baseDir = join(process.cwd(), '.nogit', `cluster-health-${Date.now()}`);
const drivePaths = Array.from({ length: 6 }, (_value, index) => {
return join(baseDir, `drive-${index + 1}`);
});
const storageDir = join(baseDir, 'storage');
tap.test('setup: start clustered storage server', async () => {
clusterStorage = await smartstorage.SmartStorage.createAndStart({
server: {
port: 3348,
silent: true,
},
storage: {
directory: storageDir,
},
cluster: {
enabled: true,
nodeId: 'cluster-health-node',
quicPort: 4348,
seedNodes: [],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: drivePaths,
},
},
});
});
tap.test('should expose clustered runtime health', async () => {
const health = await clusterStorage.getClusterHealth();
expect(health.enabled).toEqual(true);
expect(health.nodeId).toEqual('cluster-health-node');
expect(health.quorumHealthy).toEqual(true);
expect(health.majorityHealthy).toEqual(true);
expect(Array.isArray(health.peers)).toEqual(true);
expect(health.peers!.length).toEqual(0);
expect(Array.isArray(health.drives)).toEqual(true);
expect(health.drives!.length).toEqual(6);
expect(health.drives!.every((drive) => drive.status === 'online')).toEqual(true);
expect(health.drives!.every((drive) => drivePaths.includes(drive.path))).toEqual(true);
expect(health.drives!.every((drive) => drive.totalBytes !== undefined)).toEqual(true);
expect(health.drives!.every((drive) => drive.usedBytes !== undefined)).toEqual(true);
expect(health.drives!.every((drive) => drive.lastCheck !== undefined)).toEqual(true);
expect(health.drives!.every((drive) => drive.erasureSetId === 0)).toEqual(true);
expect(health.erasure?.dataShards).toEqual(4);
expect(health.erasure?.parityShards).toEqual(2);
expect(health.erasure?.chunkSizeBytes).toEqual(1024 * 1024);
expect(health.erasure?.totalShards).toEqual(6);
expect(health.erasure?.readQuorum).toEqual(4);
expect(health.erasure?.writeQuorum).toEqual(5);
expect(health.erasure?.erasureSetCount).toEqual(1);
expect(health.repairs?.active).toEqual(false);
expect(health.repairs?.scanIntervalMs).toEqual(24 * 60 * 60 * 1000);
});
tap.test('should expose cluster health after bucket creation', async () => {
const bucket = await clusterStorage.createBucket('cluster-health-bucket');
const health = await clusterStorage.getClusterHealth();
expect(bucket.name).toEqual('cluster-health-bucket');
expect(health.enabled).toEqual(true);
expect(health.quorumHealthy).toEqual(true);
expect(health.drives!.length).toEqual(6);
});
tap.test('teardown: stop clustered server and clean files', async () => {
await clusterStorage.stop();
await rm(baseDir, { recursive: true, force: true });
});
export default tap.start()
+150
View File
@@ -0,0 +1,150 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import {
CreateBucketCommand,
DeleteBucketCommand,
ListBucketsCommand,
S3Client,
} from '@aws-sdk/client-s3';
import * as smartstorage from '../ts/index.js';
const TEST_PORT = 3349;
const INITIAL_CREDENTIAL: smartstorage.IStorageCredential = {
accessKeyId: 'RUNTIMEINITIAL',
secretAccessKey: 'RUNTIMEINITIALSECRET123',
};
const ROTATED_CREDENTIAL_A: smartstorage.IStorageCredential = {
accessKeyId: 'RUNTIMEA',
secretAccessKey: 'RUNTIMEASECRET123',
};
const ROTATED_CREDENTIAL_B: smartstorage.IStorageCredential = {
accessKeyId: 'RUNTIMEB',
secretAccessKey: 'RUNTIMEBSECRET123',
};
const TEST_BUCKET = 'runtime-credentials-bucket';
let testSmartStorageInstance: smartstorage.SmartStorage;
let initialClient: S3Client;
let rotatedClientA: S3Client;
let rotatedClientB: S3Client;
function createS3Client(credential: smartstorage.IStorageCredential): S3Client {
return new S3Client({
endpoint: `http://localhost:${TEST_PORT}`,
region: 'us-east-1',
credentials: {
accessKeyId: credential.accessKeyId,
secretAccessKey: credential.secretAccessKey,
},
forcePathStyle: true,
});
}
tap.test('setup: start storage server with runtime-managed credentials', async () => {
testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({
server: {
port: TEST_PORT,
silent: true,
region: 'us-east-1',
},
storage: {
cleanSlate: true,
},
auth: {
enabled: true,
credentials: [INITIAL_CREDENTIAL],
},
});
initialClient = createS3Client(INITIAL_CREDENTIAL);
rotatedClientA = createS3Client(ROTATED_CREDENTIAL_A);
rotatedClientB = createS3Client(ROTATED_CREDENTIAL_B);
});
tap.test('startup credentials authenticate successfully', async () => {
const response = await initialClient.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('listCredentials returns the active startup credential set', async () => {
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(1);
expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId);
expect(credentials[0].secretAccessKey).toEqual(INITIAL_CREDENTIAL.secretAccessKey);
});
tap.test('invalid replacement input fails cleanly and leaves old credentials active', async () => {
await expect(
testSmartStorageInstance.replaceCredentials([
{
accessKeyId: '',
secretAccessKey: 'invalid-secret',
},
]),
).rejects.toThrow();
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(1);
expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId);
const response = await initialClient.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('replacing credentials swaps the active set atomically', async () => {
await testSmartStorageInstance.replaceCredentials([
ROTATED_CREDENTIAL_A,
ROTATED_CREDENTIAL_B,
]);
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(2);
expect(credentials[0].accessKeyId).toEqual(ROTATED_CREDENTIAL_A.accessKeyId);
expect(credentials[1].accessKeyId).toEqual(ROTATED_CREDENTIAL_B.accessKeyId);
});
tap.test('old credentials stop working immediately for new requests', async () => {
await expect(initialClient.send(new ListBucketsCommand({}))).rejects.toThrow();
});
tap.test('first rotated credential authenticates successfully', async () => {
const response = await rotatedClientA.send(
new CreateBucketCommand({ Bucket: TEST_BUCKET }),
);
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('multiple rotated credentials remain active', async () => {
const response = await rotatedClientB.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
expect(response.Buckets?.some((bucket) => bucket.Name === TEST_BUCKET)).toEqual(true);
});
tap.test('duplicate replacement input fails cleanly without changing the active set', async () => {
await expect(
testSmartStorageInstance.replaceCredentials([
ROTATED_CREDENTIAL_A,
{
accessKeyId: ROTATED_CREDENTIAL_A.accessKeyId,
secretAccessKey: 'another-secret',
},
]),
).rejects.toThrow();
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(2);
expect(credentials[0].accessKeyId).toEqual(ROTATED_CREDENTIAL_A.accessKeyId);
expect(credentials[1].accessKeyId).toEqual(ROTATED_CREDENTIAL_B.accessKeyId);
const response = await rotatedClientA.send(new ListBucketsCommand({}));
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('teardown: clean up bucket and stop the storage server', async () => {
const response = await rotatedClientA.send(
new DeleteBucketCommand({ Bucket: TEST_BUCKET }),
);
expect(response.$metadata.httpStatusCode).toEqual(204);
await testSmartStorageInstance.stop();
});
export default tap.start()
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstorage',
version: '6.1.0',
version: '6.3.3',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
}
+134 -4
View File
@@ -1,15 +1,20 @@
import * as plugins from './plugins.js';
import * as paths from './paths.js';
/**
* Authentication configuration
*/
export interface IStorageCredential {
accessKeyId: string;
secretAccessKey: string;
}
/**
* Authentication configuration
*/
export interface IAuthConfig {
enabled: boolean;
credentials: Array<{
accessKeyId: string;
secretAccessKey: string;
}>;
credentials: IStorageCredential[];
}
/**
@@ -113,6 +118,105 @@ export interface ISmartStorageConfig {
cluster?: IClusterConfig;
}
/**
* Logical bucket stats maintained by the Rust runtime.
* Values are initialized from native storage on startup and updated on smartstorage mutations.
*/
export interface IBucketSummary {
name: string;
objectCount: number;
totalSizeBytes: number;
creationDate?: number;
}
/**
* Filesystem-level capacity snapshot for the storage directory or configured drive path.
*/
export interface IStorageLocationSummary {
path: string;
totalBytes?: number;
availableBytes?: number;
usedBytes?: number;
}
/**
* Runtime storage stats served by the Rust core without issuing S3 list calls.
*/
export interface IStorageStats {
bucketCount: number;
totalObjectCount: number;
totalStorageBytes: number;
buckets: IBucketSummary[];
storageDirectory: string;
storageLocations?: IStorageLocationSummary[];
}
/**
* Known peer status from the local node's current cluster view.
*/
export interface IClusterPeerHealth {
nodeId: string;
status: 'online' | 'suspect' | 'offline';
quicAddress?: string;
s3Address?: string;
driveCount?: number;
lastHeartbeat?: number;
missedHeartbeats?: number;
}
/**
* Local drive health as measured by smartstorage's runtime probes.
*/
export interface IClusterDriveHealth {
index: number;
path: string;
status: 'online' | 'degraded' | 'offline' | 'healing';
totalBytes?: number;
usedBytes?: number;
availableBytes?: number;
errorCount?: number;
lastError?: string;
lastCheck?: number;
erasureSetId?: number;
}
export interface IClusterErasureHealth {
dataShards: number;
parityShards: number;
chunkSizeBytes: number;
totalShards: number;
readQuorum: number;
writeQuorum: number;
erasureSetCount: number;
}
export interface IClusterRepairHealth {
active: boolean;
scanIntervalMs?: number;
lastRunStartedAt?: number;
lastRunCompletedAt?: number;
lastDurationMs?: number;
shardsChecked?: number;
shardsHealed?: number;
failed?: number;
lastError?: string;
}
/**
* Cluster runtime health from the Rust core.
* When clustering is disabled, the response is `{ enabled: false }`.
*/
export interface IClusterHealth {
enabled: boolean;
nodeId?: string;
quorumHealthy?: boolean;
majorityHealthy?: boolean;
peers?: IClusterPeerHealth[];
drives?: IClusterDriveHealth[];
erasure?: IClusterErasureHealth;
repairs?: IClusterRepairHealth;
}
/**
* Default configuration values
*/
@@ -205,6 +309,11 @@ type TRustStorageCommands = {
start: { params: { config: Required<ISmartStorageConfig> }; result: {} };
stop: { params: {}; result: {} };
createBucket: { params: { name: string }; result: {} };
getStorageStats: { params: {}; result: IStorageStats };
listBucketSummaries: { params: {}; result: IBucketSummary[] };
listCredentials: { params: {}; result: IStorageCredential[] };
replaceCredentials: { params: { credentials: IStorageCredential[] }; result: {} };
getClusterHealth: { params: {}; result: IClusterHealth };
};
/**
@@ -274,6 +383,27 @@ export class SmartStorage {
return { name: bucketNameArg };
}
public async getStorageStats(): Promise<IStorageStats> {
return this.bridge.sendCommand('getStorageStats', {});
}
public async listBucketSummaries(): Promise<IBucketSummary[]> {
return this.bridge.sendCommand('listBucketSummaries', {});
}
public async listCredentials(): Promise<IStorageCredential[]> {
return this.bridge.sendCommand('listCredentials', {});
}
public async replaceCredentials(credentials: IStorageCredential[]): Promise<void> {
await this.bridge.sendCommand('replaceCredentials', { credentials });
this.config.auth.credentials = credentials.map((credential) => ({ ...credential }));
}
public async getClusterHealth(): Promise<IClusterHealth> {
return this.bridge.sendCommand('getClusterHealth', {});
}
public async stop() {
await this.bridge.sendCommand('stop', {});
this.bridge.kill();
+2
View File
@@ -5,6 +5,8 @@
"moduleResolution": "NodeNext",
"esModuleInterop": true,
"verbatimModuleSyntax": true,
"types": ["node"],
"ignoreDeprecations": "6.0",
"baseUrl": ".",
"paths": {}
},