Compare commits

...

6 Commits

Author SHA1 Message Date
jkunz 7020810b5e v6.5.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-05-02 11:14:15 +00:00
jkunz 7f2546e041 feat(bucket-tenants): add persisted bucket-scoped tenant credentials with bucket export and import APIs 2026-05-02 11:14:15 +00:00
jkunz 53d663597a v6.4.1
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-30 08:23:22 +00:00
jkunz 440197ccf3 fix(build): tighten TypeScript compiler settings and refresh package metadata 2026-04-30 08:23:22 +00:00
jkunz c8d3ed79aa v6.4.0
Default (tags) / security (push) Failing after 1s
Default (tags) / test (push) Failing after 1s
Default (tags) / release (push) Has been skipped
Default (tags) / metadata (push) Has been skipped
2026-04-30 06:08:42 +00:00
jkunz a31e477359 feat(cluster,server,auth): add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings 2026-04-30 06:08:42 +00:00
28 changed files with 3072 additions and 495 deletions
+2 -2
View File
@@ -44,9 +44,9 @@
}
},
"@git.zone/tsdoc": {
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.\n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.\n\nUse of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District Court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
},
"@ship.zone/szci": {
"npmGlobalTools": []
}
}
}
+26
View File
@@ -1,5 +1,31 @@
# Changelog
## 2026-05-02 - 6.5.0 - feat(bucket-tenants)
add persisted bucket-scoped tenant credentials with bucket export and import APIs
- Adds bucket tenant management APIs for creating, rotating, listing, retrieving, and deleting scoped per-bucket credentials.
- Persists runtime credentials under the storage directory so tenant and replaced credentials survive restarts.
- Enforces tenant bucket isolation in auth, including blocking cross-bucket access and copy operations.
- Adds bucket export/import support using the smartstorage.bucket.v1 JSON format.
- Introduces health and metrics APIs plus test coverage for tenant lifecycle, persistence, policy retention, and AWS SDK compatibility.
## 2026-04-30 - 6.4.1 - fix(build)
tighten TypeScript compiler settings and refresh package metadata
- enable noImplicitAny in tsconfig and align the build script with strict compilation
- update package metadata including author, repository URL, and pnpm version
- bump dependency versions for @aws-sdk/client-s3 and @tsclass/tsclass
- refresh README hints and legal text to match the current package setup
## 2026-04-30 - 6.4.0 - feat(cluster,server,auth)
add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings
- persist cluster identity and topology snapshots under .smartstorage/cluster to support safer clustered restarts and seed-node joins
- add unauthenticated /-/live, /-/ready, /-/health, and /-/metrics endpoints with basic request and storage metrics
- route clustered shard read/write/delete/head operations by drive index and handle join, heartbeat, and topology sync over QUIC
- change runtime credential listing to return access-key metadata only, excluding secretAccessKey values
- add tests for operational endpoints and multi-node cluster persistence and recovery behavior
## 2026-04-19 - 6.3.3 - fix(build)
rename npmextra config to .smartconfig and refresh build metadata
+4 -2
View File
@@ -1,4 +1,6 @@
Copyright (c) 2021 Task Venture Capital GmbH (hello@task.vc)
MIT License
Copyright (c) Task Venture Capital GmbH
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
@@ -16,4 +18,4 @@ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
SOFTWARE.
+7 -10
View File
@@ -1,21 +1,21 @@
{
"name": "@push.rocks/smartstorage",
"version": "6.3.3",
"version": "6.5.0",
"private": false,
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
"main": "dist_ts/index.js",
"typings": "dist_ts/index.d.ts",
"type": "module",
"author": "Lossless GmbH",
"author": "Task Venture Capital GmbH",
"license": "MIT",
"scripts": {
"test:before": "(tsrust)",
"test": "(tstest test/ --web --verbose --logfile --timeout 60)",
"build": "(tsrust && tsbuild tsfolders --allowimplicitany)",
"build": "(tsrust && tsbuild tsfolders)",
"buildDocs": "tsdoc"
},
"devDependencies": {
"@aws-sdk/client-s3": "^3.1032.0",
"@aws-sdk/client-s3": "^3.1039.0",
"@git.zone/tsbuild": "^4.4.0",
"@git.zone/tsbundle": "^2.10.0",
"@git.zone/tsrun": "^2.0.2",
@@ -43,7 +43,7 @@
"dependencies": {
"@push.rocks/smartpath": "^6.0.0",
"@push.rocks/smartrust": "^1.3.2",
"@tsclass/tsclass": "^9.5.0"
"@tsclass/tsclass": "^9.5.1"
},
"keywords": [
"smartstorage",
@@ -67,13 +67,10 @@
"homepage": "https://code.foss.global/push.rocks/smartstorage#readme",
"repository": {
"type": "git",
"url": "ssh://git@code.foss.global:29419/push.rocks/smartstorage.git"
"url": "https://code.foss.global/push.rocks/smartstorage.git"
},
"bugs": {
"url": "https://code.foss.global/push.rocks/smartstorage/issues"
},
"packageManager": "pnpm@10.14.0+sha512.ad27a79641b49c3e481a16a805baa71817a04bbe06a38d17e60e2eaee83f6a146c6a688125f5792e48dd5ba30e7da52a5cda4c3992b9ccf333f9ce223af84748",
"pnpm": {
"overrides": {}
}
"packageManager": "pnpm@10.28.2"
}
+273 -259
View File
File diff suppressed because it is too large Load Diff
+27 -2
View File
@@ -1,6 +1,6 @@
# Project Hints for smartstorage
## Current State (v6.0.0)
## Current State (v6.4.0)
- **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge
- High-performance: streaming I/O, zero-copy, backpressure, range seek
@@ -14,6 +14,12 @@
- Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans)
- Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state)
- Runtime credential listing and atomic replacement via the Rust bridge
- Runtime credentials persist under `{storage}/.smartstorage/credentials.json`
- Bucket tenant APIs provision scoped per-bucket credentials and enforce the scope before bucket-policy/default-auth authorization
- Per-bucket export/import uses `smartstorage.bucket.v1` JSON with object payloads encoded per object
- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/`
- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics`
- Runtime credential listing returns access-key metadata only; secrets are write-only
## Architecture
@@ -41,22 +47,39 @@
| `start` | `{ config: ISmartStorageConfig }` | Init storage + HTTP server |
| `stop` | `{}` | Graceful shutdown |
| `createBucket` | `{ name: string }` | Create bucket directory |
| `createBucketTenant` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Create bucket and scoped persisted credential |
| `deleteBucketTenant` | `{ bucketName, accessKeyId? }` | Revoke scoped credential or delete tenant bucket recursively |
| `rotateBucketTenantCredentials` | `{ bucketName, accessKeyId, secretAccessKey, region? }` | Replace scoped credential for one bucket |
| `listBucketTenants` | `{}` | Return scoped credential metadata |
| `getBucketTenantCredential` | `{ bucketName }` | Return one scoped credential including secret for descriptor generation |
| `exportBucket` | `{ bucketName }` | Export one bucket's objects and metadata |
| `importBucket` | `{ bucketName, source }` | Import a `smartstorage.bucket.v1` bucket export |
| `getStorageStats` | `{}` | Return cached bucket/global runtime stats + storage location capacity snapshots |
| `listBucketSummaries` | `{}` | Return cached per-bucket runtime summaries |
| `listCredentials` | `{}` | Return the active runtime auth credential set |
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
### Operational HTTP Endpoints
| Endpoint | Purpose |
|----------|---------|
| `GET /-/live` | Process liveness |
| `GET /-/ready` | S3 readiness and cluster quorum readiness |
| `GET /-/health` | JSON storage, cluster, and runtime health |
| `GET /-/metrics` | Prometheus text metrics |
### Storage Layout
- Objects: `{root}/{bucket}/{key}._storage_object`
- Metadata: `{root}/{bucket}/{key}._storage_object.metadata.json`
- MD5: `{root}/{bucket}/{key}._storage_object.md5`
- Multipart: `{root}/.multipart/{upload_id}/part-{N}`
- Policies: `{root}/.policies/{bucket}.policy.json`
- Runtime credentials: `{root}/.smartstorage/credentials.json`
## Build
- `pnpm build` runs `tsrust && tsbuild tsfolders --allowimplicitany`
- `pnpm build` runs `tsrust && tsbuild tsfolders`
- `tsrust` compiles Rust to `dist_rust/ruststorage`
- Targets: linux_amd64, linux_arm64 (configured in .smartconfig.json)
@@ -70,7 +93,9 @@
## Testing
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337)
- `test/test.bucket-tenants.node.ts` - bucket tenant provisioning, per-bucket isolation, restart persistence, export/import, policy persistence, rotation, revoke/delete, AWS SDK v3 compatibility (12 tests, port 3361)
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353)
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
- `test/test.auth.node.ts` - Auth + bucket policy integration (20 tests, auth enabled, port 3344)
- `test/test.policy-crud.node.ts` - Policy API CRUD + validation edge cases (17 tests, port 3345)
+102 -4
View File
@@ -1,6 +1,6 @@
# @push.rocks/smartstorage
A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just `npm install` and go. 🚀
A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just install the package and go. 🚀
## Issue Reporting and Security
@@ -34,6 +34,7 @@ For reporting bugs, issues, or security vulnerabilities, please visit [community
- 🧹 **Clean slate mode** — wipe storage on startup for test isolation
- 📊 **Runtime storage stats** — cheap bucket summaries and global counts without S3 list scans
- 🔑 **Runtime credential rotation** — list and replace active auth credentials without mutating internals
- 🧩 **Bucket tenants** — provision one scoped S3 credential per bucket with restart persistence
-**Test-first design** — start/stop in milliseconds, no port conflicts
### Clustering Features
@@ -225,15 +226,76 @@ await storage.replaceCredentials([
interface IStorageCredential {
accessKeyId: string;
secretAccessKey: string;
bucketName?: string;
region?: string;
}
```
- `listCredentials()` returns the Rust core's current runtime credential set.
- `replaceCredentials()` swaps the full set atomically. On success, new requests use the new set immediately and the old credentials stop authenticating immediately.
- `replaceCredentials()` swaps the full set atomically and persists it under the storage root. On success, new requests use the new set immediately and the old credentials stop authenticating immediately.
- Requests that were already authenticated before the replacement keep running; auth is evaluated when each request starts.
- No restart is required.
- No restart is required, and runtime-created credentials survive restart unless `storage.cleanSlate` removes the storage directory.
- Replacement input must contain at least one credential, each `accessKeyId` and `secretAccessKey` must be non-empty, and `accessKeyId` values must be unique.
## Bucket Tenants
Bucket tenants are designed for platform services that need one bucket and one scoped S3 credential per app. Tenant credentials are enforced by the auth layer before the normal bucket-policy/default-auth pipeline, so a scoped credential cannot list all buckets or access another bucket even when it has a valid SigV4 signature.
```typescript
const tenant = await storage.createBucketTenant({
bucketName: 'workapp-123',
});
// Directly usable by AWS SDK v3 or env injection
const client = new S3Client({
endpoint: `http://${tenant.endpoint}:${tenant.port}`,
region: tenant.region,
credentials: {
accessKeyId: tenant.accessKeyId,
secretAccessKey: tenant.secretAccessKey,
},
forcePathStyle: true,
});
console.log(tenant.env.S3_BUCKET);
console.log(tenant.env.AWS_ACCESS_KEY_ID);
```
```typescript
await storage.rotateBucketTenantCredentials({ bucketName: 'workapp-123' });
await storage.deleteBucketTenant({ bucketName: 'workapp-123', accessKeyId: tenant.accessKeyId });
const descriptor = await storage.getBucketTenantDescriptor({ bucketName: 'workapp-123' });
const tenants = await storage.listBucketTenants();
```
- `createBucketTenant()` creates the bucket if needed and stores a scoped credential for that bucket.
- `rotateBucketTenantCredentials()` replaces the active scoped credential for the bucket and persists the new credential.
- `deleteBucketTenant({ bucketName, accessKeyId })` revokes one scoped credential and keeps the bucket.
- `deleteBucketTenant({ bucketName })` revokes scoped credentials for the bucket and deletes the bucket contents recursively.
- Tenant credentials can list, read, write, and delete objects in their assigned bucket, but cannot list all buckets, access other buckets, copy from other buckets, delete buckets, or mutate bucket policies.
- Bucket tenant APIs require `auth.enabled: true`.
## Bucket Backup/Restore
```typescript
const appBackup = await storage.exportBucket({ bucketName: 'workapp-123' });
await storage.importBucket({ bucketName: 'workapp-123-restore', source: appBackup });
```
- `exportBucket()` returns a self-contained `smartstorage.bucket.v1` JSON export with only the selected bucket's objects and object metadata.
- `importBucket()` creates the target bucket if needed and restores the exported objects into that bucket.
- Exports do not include credentials, policies, or unrelated tenant data.
## Health and Metrics APIs
```typescript
const health = await storage.getHealth();
const metrics = await storage.getMetrics();
```
- `getHealth()` reports running state, storage directory, auth enabled state, credential counts, bucket count, object count, total bytes, and cluster health.
- `getMetrics()` returns numeric counters and a Prometheus text snippet for bucket, object, byte, tenant credential, and cluster-enabled metrics.
## Runtime Stats
```typescript
@@ -577,6 +639,34 @@ Gracefully stop the server and kill the Rust process.
Create a storage bucket.
#### `createBucketTenant(options): Promise<IBucketTenantDescriptor>`
Create a bucket tenant with a generated or supplied scoped credential. Options: `{ bucketName, accessKeyId?, secretAccessKey?, region? }`.
#### `deleteBucketTenant(options): Promise<void>`
Revoke a tenant credential or delete the full tenant bucket. Options: `{ bucketName, accessKeyId? }`.
#### `rotateBucketTenantCredentials(options): Promise<IBucketTenantDescriptor>`
Replace the scoped credential for a bucket tenant. Options: `{ bucketName, accessKeyId?, secretAccessKey?, region? }`.
#### `listBucketTenants(): Promise<IBucketTenantMetadata[]>`
List scoped tenant credential metadata without returning secrets.
#### `getBucketTenantDescriptor(options): Promise<IBucketTenantDescriptor>`
Return endpoint, port, region, bucket, access key, secret key, SSL flag, legacy descriptor fields, and S3/AWS env values for the bucket tenant.
#### `exportBucket(options): Promise<IBucketExport>`
Export one bucket's objects and metadata into a `smartstorage.bucket.v1` JSON object.
#### `importBucket(options): Promise<void>`
Import a `smartstorage.bucket.v1` JSON object into the target bucket. Options: `{ bucketName, source }`.
#### `getStorageDescriptor(options?): Promise<IS3Descriptor>`
Get connection details for S3-compatible clients. Returns:
@@ -609,6 +699,14 @@ Atomically replace the active runtime credential set without restarting the serv
Read the Rust core's current cluster, drive, quorum, and repair health snapshot. Standalone mode returns `{ enabled: false }`.
#### `getHealth(): Promise<ISmartStorageHealth>`
Return running state, storage directory, auth state, credential counts, bucket count, object count, total bytes, and cluster health.
#### `getMetrics(): Promise<ISmartStorageMetrics>`
Return numeric metrics plus a Prometheus text snippet for operational scraping.
## Architecture
smartstorage uses a **hybrid Rust + TypeScript** architecture:
@@ -642,7 +740,7 @@ smartstorage uses a **hybrid Rust + TypeScript** architecture:
**Why Rust?** The original TypeScript implementation had critical perf issues: OOM on multipart uploads (parts buffered in memory), double stream copying, file descriptor leaks on HEAD requests, full-file reads for range requests, and no backpressure. The Rust binary solves all of these with streaming I/O, zero-copy, and direct `seek()` for range requests.
**IPC Protocol:** TypeScript communicates with the `ruststorage` binary over newline-delimited JSON via stdin/stdout. The current management commands are `start`, `stop`, `createBucket`, `getStorageStats`, `listBucketSummaries`, `listCredentials`, `replaceCredentials`, and `getClusterHealth`.
**IPC Protocol:** TypeScript communicates with the `ruststorage` binary over newline-delimited JSON via stdin/stdout. The current management commands are `start`, `stop`, `createBucket`, `createBucketTenant`, `deleteBucketTenant`, `rotateBucketTenantCredentials`, `listBucketTenants`, `getBucketTenantCredential`, `exportBucket`, `importBucket`, `getStorageStats`, `listBucketSummaries`, `listCredentials`, `replaceCredentials`, and `getClusterHealth`.
### S3-Compatible Operations
+17
View File
@@ -57,6 +57,7 @@ pub struct RequestContext {
pub action: StorageAction,
pub bucket: Option<String>,
pub key: Option<String>,
pub source_bucket: Option<String>,
}
impl RequestContext {
@@ -90,6 +91,7 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
action: StorageAction::ListAllMyBuckets,
bucket: None,
key: None,
source_bucket: None,
}
}
1 => {
@@ -113,6 +115,7 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
action,
bucket: Some(bucket),
key: None,
source_bucket: None,
}
}
2 => {
@@ -123,6 +126,18 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
let has_part_number = query.contains_key("partNumber");
let has_upload_id = query.contains_key("uploadId");
let has_uploads = query.contains_key("uploads");
let source_bucket = if has_copy_source {
req.headers()
.get("x-amz-copy-source")
.and_then(|value| value.to_str().ok())
.map(|source| {
let source = source.trim_start_matches('/');
let first_slash = source.find('/').unwrap_or(source.len());
percent_decode(&source[..first_slash])
})
} else {
None
};
let action = match &method {
&Method::PUT if has_part_number && has_upload_id => StorageAction::UploadPart,
@@ -141,12 +156,14 @@ pub fn resolve_action(req: &Request<Incoming>) -> RequestContext {
action,
bucket: Some(bucket),
key: Some(key),
source_bucket,
}
}
_ => RequestContext {
action: StorageAction::ListAllMyBuckets,
bucket: None,
key: None,
source_bucket: None,
},
}
}
+195 -22
View File
@@ -3,6 +3,8 @@ use hyper::body::Incoming;
use hyper::Request;
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use tokio::fs;
use tokio::sync::RwLock;
use crate::config::{AuthConfig, Credential};
@@ -14,6 +16,7 @@ type HmacSha256 = Hmac<Sha256>;
#[derive(Debug, Clone)]
pub struct AuthenticatedIdentity {
pub access_key_id: String,
pub bucket_name: Option<String>,
}
/// Parsed components of an AWS4-HMAC-SHA256 Authorization header.
@@ -56,11 +59,7 @@ pub fn verify_request(
.headers()
.get("x-amz-date")
.and_then(|v| v.to_str().ok())
.or_else(|| {
req.headers()
.get("date")
.and_then(|v| v.to_str().ok())
})
.or_else(|| req.headers().get("date").and_then(|v| v.to_str().ok()))
.ok_or_else(|| StorageError::missing_security_header("Missing x-amz-date header"))?;
// Enforce 15-min clock skew
@@ -77,10 +76,7 @@ pub fn verify_request(
let canonical_request = build_canonical_request(req, &parsed.signed_headers, content_sha256);
// Build string to sign
let scope = format!(
"{}/{}/s3/aws4_request",
parsed.date_stamp, parsed.region
);
let scope = format!("{}/{}/s3/aws4_request", parsed.date_stamp, parsed.region);
let canonical_hash = hex::encode(Sha256::digest(canonical_request.as_bytes()));
let string_to_sign = format!(
"AWS4-HMAC-SHA256\n{}\n{}\n{}",
@@ -105,6 +101,7 @@ pub fn verify_request(
Ok(AuthenticatedIdentity {
access_key_id: parsed.access_key_id,
bucket_name: credential.bucket_name.clone(),
})
}
@@ -131,10 +128,9 @@ fn parse_auth_header(header: &str) -> Result<SigV4Header, StorageError> {
}
}
let credential_str = credential_str
.ok_or_else(StorageError::authorization_header_malformed)?;
let signed_headers_str = signed_headers_str
.ok_or_else(StorageError::authorization_header_malformed)?;
let credential_str = credential_str.ok_or_else(StorageError::authorization_header_malformed)?;
let signed_headers_str =
signed_headers_str.ok_or_else(StorageError::authorization_header_malformed)?;
let signature = signature_str
.ok_or_else(StorageError::authorization_header_malformed)?
.to_string();
@@ -164,7 +160,10 @@ fn parse_auth_header(header: &str) -> Result<SigV4Header, StorageError> {
}
/// Find a credential by access key ID.
fn find_credential<'a>(access_key_id: &str, credentials: &'a [Credential]) -> Option<&'a Credential> {
fn find_credential<'a>(
access_key_id: &str,
credentials: &'a [Credential],
) -> Option<&'a Credential> {
credentials
.iter()
.find(|c| c.access_key_id == access_key_id)
@@ -174,33 +173,206 @@ fn find_credential<'a>(access_key_id: &str, credentials: &'a [Credential]) -> Op
pub struct RuntimeCredentialStore {
enabled: bool,
credentials: RwLock<Vec<Credential>>,
persistence_path: Option<PathBuf>,
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CredentialMetadata {
pub access_key_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub bucket_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
}
#[derive(Debug, Clone, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketTenantMetadata {
pub bucket_name: String,
pub access_key_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
}
impl RuntimeCredentialStore {
pub fn new(config: &AuthConfig) -> Self {
Self {
pub async fn new(
config: &AuthConfig,
persistence_path: Option<PathBuf>,
) -> anyhow::Result<Self> {
let credentials = match persistence_path.as_ref() {
Some(path) if path.exists() => {
let content = fs::read_to_string(path).await?;
let credentials: Vec<Credential> = serde_json::from_str(&content)?;
validate_credentials(&credentials)
.map_err(|error| anyhow::anyhow!(error.message))?;
credentials
}
_ => config.credentials.clone(),
};
Ok(Self {
enabled: config.enabled,
credentials: RwLock::new(config.credentials.clone()),
}
credentials: RwLock::new(credentials),
persistence_path,
})
}
pub fn enabled(&self) -> bool {
self.enabled
}
pub async fn list_credentials(&self) -> Vec<Credential> {
self.credentials.read().await.clone()
pub async fn list_credentials(&self) -> Vec<CredentialMetadata> {
self.credentials
.read()
.await
.iter()
.map(|credential| CredentialMetadata {
access_key_id: credential.access_key_id.clone(),
bucket_name: credential.bucket_name.clone(),
region: credential.region.clone(),
})
.collect()
}
pub async fn snapshot_credentials(&self) -> Vec<Credential> {
self.credentials.read().await.clone()
}
pub async fn replace_credentials(&self, credentials: Vec<Credential>) -> Result<(), StorageError> {
pub async fn replace_credentials(
&self,
credentials: Vec<Credential>,
) -> Result<(), StorageError> {
validate_credentials(&credentials)?;
self.persist_credentials(&credentials).await?;
*self.credentials.write().await = credentials;
Ok(())
}
pub async fn replace_bucket_tenant_credential(
&self,
bucket_name: &str,
mut credential: Credential,
) -> Result<Credential, StorageError> {
validate_bucket_scope(bucket_name)?;
credential.bucket_name = Some(bucket_name.to_string());
let mut credentials = self.credentials.read().await.clone();
if credentials.iter().any(|existing| {
existing.access_key_id == credential.access_key_id
&& existing.bucket_name.as_deref() != Some(bucket_name)
}) {
return Err(StorageError::invalid_request(
"Credential accessKeyId is already assigned to another principal.",
));
}
credentials.retain(|existing| existing.bucket_name.as_deref() != Some(bucket_name));
credentials.push(credential.clone());
validate_credentials(&credentials)?;
self.persist_credentials(&credentials).await?;
*self.credentials.write().await = credentials;
Ok(credential)
}
pub async fn remove_bucket_tenant_credentials(
&self,
bucket_name: &str,
access_key_id: Option<&str>,
) -> Result<usize, StorageError> {
validate_bucket_scope(bucket_name)?;
let mut credentials = self.credentials.read().await.clone();
let before = credentials.len();
credentials.retain(|credential| {
if credential.bucket_name.as_deref() != Some(bucket_name) {
return true;
}
if let Some(access_key_id) = access_key_id {
credential.access_key_id != access_key_id
} else {
false
}
});
let removed = before.saturating_sub(credentials.len());
if credentials.is_empty() {
return Err(StorageError::invalid_request(
"Cannot remove the last active credential.",
));
}
self.persist_credentials(&credentials).await?;
*self.credentials.write().await = credentials;
Ok(removed)
}
pub async fn list_bucket_tenants(&self) -> Vec<BucketTenantMetadata> {
let mut tenants: Vec<BucketTenantMetadata> = self
.credentials
.read()
.await
.iter()
.filter_map(|credential| {
credential
.bucket_name
.as_ref()
.map(|bucket_name| BucketTenantMetadata {
bucket_name: bucket_name.clone(),
access_key_id: credential.access_key_id.clone(),
region: credential.region.clone(),
})
})
.collect();
tenants.sort_by(|a, b| {
a.bucket_name
.cmp(&b.bucket_name)
.then_with(|| a.access_key_id.cmp(&b.access_key_id))
});
tenants
}
pub async fn get_bucket_tenant_credential(&self, bucket_name: &str) -> Option<Credential> {
self.credentials
.read()
.await
.iter()
.find(|credential| credential.bucket_name.as_deref() == Some(bucket_name))
.cloned()
}
async fn persist_credentials(&self, credentials: &[Credential]) -> Result<(), StorageError> {
let Some(path) = self.persistence_path.as_ref() else {
return Ok(());
};
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.await
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
}
let temp_path = path.with_extension("json.tmp");
let json = serde_json::to_string_pretty(credentials)
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
fs::write(&temp_path, json)
.await
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
fs::rename(&temp_path, path)
.await
.map_err(|error| StorageError::internal_error(&error.to_string()))?;
Ok(())
}
}
fn validate_bucket_scope(bucket_name: &str) -> Result<(), StorageError> {
if bucket_name.trim().is_empty() {
return Err(StorageError::invalid_request(
"Bucket tenant bucketName must not be empty.",
));
}
Ok(())
}
fn validate_credentials(credentials: &[Credential]) -> Result<(), StorageError> {
@@ -240,7 +412,8 @@ fn check_clock_skew(amz_date: &str) -> Result<(), StorageError> {
let parsed = chrono::NaiveDateTime::parse_from_str(amz_date, "%Y%m%dT%H%M%SZ")
.map_err(|_| StorageError::authorization_header_malformed())?;
let request_time = chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(parsed, chrono::Utc);
let request_time =
chrono::DateTime::<chrono::Utc>::from_naive_utc_and_offset(parsed, chrono::Utc);
let now = chrono::Utc::now();
let diff = (now - request_time).num_seconds().unsigned_abs();
+88 -8
View File
@@ -21,11 +21,10 @@ use super::quic_transport::QuicTransport;
use super::shard_store::{ShardId, ShardStore};
use super::state::{ClusterState, NodeStatus};
use crate::storage::{
storage_location_summary, BucketInfo, BucketSummary, ClusterDriveHealth,
ClusterErasureHealth, ClusterHealth, ClusterPeerHealth, ClusterRepairHealth,
CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry,
ListObjectsResult, MultipartUploadInfo, PutResult, RuntimeBucketStats,
RuntimeStatsState, StorageLocationSummary, StorageStats,
storage_location_summary, BucketInfo, BucketSummary, ClusterDriveHealth, ClusterErasureHealth,
ClusterHealth, ClusterPeerHealth, ClusterRepairHealth, CompleteMultipartResult, CopyResult,
GetResult, HeadResult, ListObjectEntry, ListObjectsResult, MultipartUploadInfo, PutResult,
RuntimeBucketStats, RuntimeStatsState, StorageLocationSummary, StorageStats,
};
use serde::{Deserialize, Serialize};
@@ -170,7 +169,8 @@ impl DistributedStore {
let peers = self.peer_health(&nodes);
let drives = self.drive_health(&drive_states, &erasure_sets);
let repairs = self.repair_health().await;
let quorum_healthy = majority_healthy && self.quorum_is_healthy(&nodes, &drive_states, &erasure_sets);
let quorum_healthy =
majority_healthy && self.quorum_is_healthy(&nodes, &drive_states, &erasure_sets);
Ok(ClusterHealth {
enabled: true,
@@ -291,6 +291,69 @@ impl DistributedStore {
Ok(PutResult { md5: md5_hex })
}
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
if !self.bucket_exists(bucket).await {
return Err(crate::error::StorageError::no_such_bucket().into());
}
let previous_size = self.manifest_size_if_exists(bucket, key).await;
let erasure_set = self
.state
.get_erasure_set_for_object(bucket, key)
.await
.ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?;
let mut chunks = Vec::new();
for (chunk_index, chunk_data) in data
.chunks(self.erasure_config.chunk_size_bytes)
.enumerate()
{
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
bucket,
key,
chunk_index as u32,
chunk_data,
)
.await?;
chunks.push(chunk_manifest);
}
let md5_hex = format!("{:x}", Md5::digest(data));
let now = Utc::now().to_rfc3339();
let manifest = ObjectManifest {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: uuid::Uuid::new_v4().to_string(),
size: data.len() as u64,
content_md5: md5_hex.clone(),
content_type: metadata
.get("content-type")
.cloned()
.unwrap_or_else(|| "binary/octet-stream".to_string()),
metadata,
created_at: now.clone(),
last_modified: now,
data_shards: self.erasure_config.data_shards,
parity_shards: self.erasure_config.parity_shards,
chunk_size: self.erasure_config.chunk_size_bytes,
chunks,
};
self.store_manifest(&manifest).await?;
self.track_object_upsert(bucket, previous_size, manifest.size)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object(
&self,
bucket: &str,
@@ -408,6 +471,7 @@ impl DistributedStore {
key,
chunk.chunk_index,
placement.shard_index,
placement.drive_id.parse::<u32>().unwrap_or(0),
)
.await
{
@@ -930,6 +994,7 @@ impl DistributedStore {
&part_info.part_key,
chunk.chunk_index,
placement.shard_index,
placement.drive_id.parse::<u32>().unwrap_or(0),
)
.await;
}
@@ -1031,7 +1096,11 @@ impl DistributedStore {
peers
}
fn drive_health(&self, drive_states: &[DriveState], erasure_sets: &[ErasureSet]) -> Vec<ClusterDriveHealth> {
fn drive_health(
&self,
drive_states: &[DriveState],
erasure_sets: &[ErasureSet],
) -> Vec<ClusterDriveHealth> {
let local_node_id = self.state.local_node_id();
let mut drive_to_set = HashMap::new();
for erasure_set in erasure_sets {
@@ -1116,7 +1185,10 @@ impl DistributedStore {
.unwrap_or(false);
}
matches!(node_statuses.get(drive.node_id.as_str()), Some(NodeStatus::Online))
matches!(
node_statuses.get(drive.node_id.as_str()),
Some(NodeStatus::Online)
)
})
.count();
@@ -1271,6 +1343,7 @@ impl DistributedStore {
key,
chunk_index,
shard_idx as u32,
drive.drive_index,
shard_data,
checksum,
)
@@ -1330,6 +1403,7 @@ impl DistributedStore {
key: &str,
chunk_index: u32,
shard_index: u32,
drive_index: u32,
data: &[u8],
checksum: u32,
) -> Result<()> {
@@ -1348,6 +1422,7 @@ impl DistributedStore {
key: key.to_string(),
chunk_index,
shard_index,
drive_index,
shard_data_length: data.len() as u64,
checksum,
object_metadata: HashMap::new(),
@@ -1417,6 +1492,7 @@ impl DistributedStore {
key,
chunk.chunk_index,
placement.shard_index,
placement.drive_id.parse::<u32>().unwrap_or(0),
)
.await
.ok()
@@ -1448,6 +1524,7 @@ impl DistributedStore {
key: &str,
chunk_index: u32,
shard_index: u32,
drive_index: u32,
) -> Result<(Vec<u8>, u32)> {
let node_info = self
.state
@@ -1464,6 +1541,7 @@ impl DistributedStore {
key: key.to_string(),
chunk_index,
shard_index,
drive_index,
});
match self.transport.send_shard_read(&conn, &request).await? {
@@ -1479,6 +1557,7 @@ impl DistributedStore {
key: &str,
chunk_index: u32,
shard_index: u32,
drive_index: u32,
) -> Result<()> {
let node_info = self
.state
@@ -1495,6 +1574,7 @@ impl DistributedStore {
key: key.to_string(),
chunk_index,
shard_index,
drive_index,
});
let _response = self.transport.send_request(&conn, &request).await?;
+31 -14
View File
@@ -18,6 +18,7 @@ pub struct MembershipManager {
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
heartbeat_interval: Duration,
heartbeat_timeout: Duration,
local_node_info: NodeInfo,
drive_manager: Option<Arc<Mutex<DriveManager>>>,
}
@@ -27,12 +28,14 @@ impl MembershipManager {
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
heartbeat_interval_ms: u64,
heartbeat_timeout_ms: u64,
local_node_info: NodeInfo,
) -> Self {
Self {
state,
transport,
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
heartbeat_timeout: Duration::from_millis(heartbeat_timeout_ms),
local_node_info,
drive_manager: None,
}
@@ -46,7 +49,7 @@ impl MembershipManager {
/// Join the cluster by contacting seed nodes.
/// Sends a JoinRequest to each seed node until one accepts.
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
pub async fn join_cluster(&self, seed_nodes: &[String], allow_bootstrap_on_failure: bool) -> Result<()> {
if seed_nodes.is_empty() {
tracing::info!("No seed nodes configured, starting as initial cluster node");
self.state.add_node(self.local_node_info.clone()).await;
@@ -75,10 +78,13 @@ impl MembershipManager {
}
}
// If no seed responded, start as a new cluster
tracing::info!("Could not reach any seed nodes, starting as initial cluster node");
self.state.add_node(self.local_node_info.clone()).await;
Ok(())
if allow_bootstrap_on_failure {
tracing::warn!("Could not reach any seed nodes, bootstrapping a new cluster because no persisted topology exists");
self.state.add_node(self.local_node_info.clone()).await;
return Ok(());
}
anyhow::bail!("Could not reach any configured seed nodes; refusing unsafe cluster bootstrap")
}
async fn try_join(&self, addr: SocketAddr) -> Result<()> {
@@ -97,9 +103,14 @@ impl MembershipManager {
ClusterResponse::JoinResponse(join_resp) => {
if join_resp.accepted {
if let Some(topology) = &join_resp.topology {
let topology_contains_self = topology
.nodes
.iter()
.any(|node| node.node_id == self.local_node_info.node_id);
self.state.apply_topology(topology).await;
// Also register self
self.state.add_node(self.local_node_info.clone()).await;
if !topology_contains_self {
self.state.add_node(self.local_node_info.clone()).await;
}
tracing::info!(
"Applied cluster topology (version {}, {} nodes, {} erasure sets)",
topology.version,
@@ -137,7 +148,13 @@ impl MembershipManager {
}
async fn send_heartbeats(&self) {
let peers = self.state.online_peers().await;
let peers = self
.state
.all_nodes()
.await
.into_iter()
.filter(|node| node.info.node_id != self.local_node_info.node_id)
.collect::<Vec<_>>();
let topology_version = self.state.version().await;
let mut responded = Vec::new();
@@ -145,7 +162,7 @@ impl MembershipManager {
let drive_states = self.collect_drive_states().await;
for peer in &peers {
let addr: SocketAddr = match peer.quic_addr.parse() {
let addr: SocketAddr = match peer.info.quic_addr.parse() {
Ok(a) => a,
Err(_) => continue,
};
@@ -158,23 +175,23 @@ impl MembershipManager {
});
match tokio::time::timeout(
Duration::from_secs(5),
self.send_heartbeat_to_peer(&peer.node_id, addr, &heartbeat),
self.heartbeat_timeout,
self.send_heartbeat_to_peer(&peer.info.node_id, addr, &heartbeat),
)
.await
{
Ok(Ok(())) => {
responded.push(peer.node_id.clone());
responded.push(peer.info.node_id.clone());
}
Ok(Err(e)) => {
tracing::debug!(
peer = %peer.node_id,
peer = %peer.info.node_id,
error = %e,
"Heartbeat failed"
);
}
Err(_) => {
tracing::debug!(peer = %peer.node_id, "Heartbeat timed out");
tracing::debug!(peer = %peer.info.node_id, "Heartbeat timed out");
}
}
}
+1
View File
@@ -9,6 +9,7 @@ pub mod erasure;
pub mod healing;
pub mod membership;
pub mod metadata;
pub mod persistence;
pub mod placement;
pub mod protocol;
pub mod quic_transport;
+77
View File
@@ -0,0 +1,77 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use tokio::fs;
use super::protocol::ClusterTopology;
const CLUSTER_METADATA_DIR: &str = ".smartstorage/cluster";
const IDENTITY_FILE: &str = "identity.json";
const TOPOLOGY_FILE: &str = "topology.json";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIdentity {
pub schema_version: u32,
pub node_id: String,
pub cluster_id: String,
}
impl ClusterIdentity {
pub fn new(node_id: String, cluster_id: String) -> Self {
Self {
schema_version: 1,
node_id,
cluster_id,
}
}
}
pub fn cluster_metadata_dir(storage_directory: &str) -> PathBuf {
PathBuf::from(storage_directory).join(CLUSTER_METADATA_DIR)
}
pub fn identity_path(metadata_dir: &Path) -> PathBuf {
metadata_dir.join(IDENTITY_FILE)
}
pub fn topology_path(metadata_dir: &Path) -> PathBuf {
metadata_dir.join(TOPOLOGY_FILE)
}
pub async fn load_identity(path: &Path) -> Result<Option<ClusterIdentity>> {
match fs::read_to_string(path).await {
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(error.into()),
}
}
pub async fn persist_identity(path: &Path, identity: &ClusterIdentity) -> Result<()> {
write_json_atomic(path, identity).await
}
pub async fn load_topology(path: &Path) -> Result<Option<ClusterTopology>> {
match fs::read_to_string(path).await {
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(error) => Err(error.into()),
}
}
pub async fn persist_topology(path: &Path, topology: &ClusterTopology) -> Result<()> {
write_json_atomic(path, topology).await
}
async fn write_json_atomic<T: Serialize>(path: &Path, value: &T) -> Result<()> {
let parent = path
.parent()
.ok_or_else(|| anyhow::anyhow!("Cluster metadata path has no parent"))?;
fs::create_dir_all(parent).await?;
let temp_path = path.with_extension("json.tmp");
let content = serde_json::to_string_pretty(value)?;
fs::write(&temp_path, content).await?;
fs::rename(&temp_path, path).await?;
Ok(())
}
+4
View File
@@ -102,6 +102,7 @@ pub struct ShardWriteRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
pub shard_data_length: u64,
pub checksum: u32, // crc32c of shard data
pub object_metadata: HashMap<String, String>,
@@ -121,6 +122,7 @@ pub struct ShardReadRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -139,6 +141,7 @@ pub struct ShardDeleteRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -154,6 +157,7 @@ pub struct ShardHeadRequest {
pub key: String,
pub chunk_index: u32,
pub shard_index: u32,
pub drive_index: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
+222 -22
View File
@@ -8,6 +8,7 @@ use super::protocol::{
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
};
use super::shard_store::{ShardId, ShardStore};
use super::state::{ClusterState, NodeStatus};
/// QUIC transport layer for inter-node communication.
///
@@ -26,11 +27,8 @@ impl QuicTransport {
pub async fn new(bind_addr: SocketAddr, local_node_id: String) -> Result<Self> {
let (server_config, client_config) = Self::generate_tls_configs()?;
let endpoint = Endpoint::server(server_config, bind_addr)?;
// Also configure the endpoint for client connections
let mut endpoint_client = endpoint.clone();
endpoint_client.set_default_client_config(client_config);
let mut endpoint = Endpoint::server(server_config, bind_addr)?;
endpoint.set_default_client_config(client_config);
Ok(Self {
endpoint,
@@ -163,7 +161,8 @@ impl QuicTransport {
/// Accept incoming connections and dispatch to the handler.
pub async fn accept_loop(
self: Arc<Self>,
shard_store: Arc<ShardStore>,
shard_stores: Vec<Arc<ShardStore>>,
cluster_state: Option<Arc<ClusterState>>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) {
loop {
@@ -172,11 +171,12 @@ impl QuicTransport {
match incoming {
Some(incoming_conn) => {
let transport = self.clone();
let store = shard_store.clone();
let stores = shard_stores.clone();
let state = cluster_state.clone();
tokio::spawn(async move {
match incoming_conn.await {
Ok(conn) => {
transport.handle_connection(conn, store).await;
transport.handle_connection(conn, stores, state).await;
}
Err(e) => {
tracing::error!("Failed to accept QUIC connection: {}", e);
@@ -194,16 +194,19 @@ impl QuicTransport {
/// Handle a single QUIC connection (may have multiple streams).
async fn handle_connection(
&self,
self: Arc<Self>,
conn: quinn::Connection,
shard_store: Arc<ShardStore>,
shard_stores: Vec<Arc<ShardStore>>,
cluster_state: Option<Arc<ClusterState>>,
) {
loop {
match conn.accept_bi().await {
Ok((send, recv)) => {
let store = shard_store.clone();
let stores = shard_stores.clone();
let state = cluster_state.clone();
let transport = self.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_stream(send, recv, store).await {
if let Err(e) = transport.handle_stream(send, recv, stores, state).await {
tracing::error!("Stream handler error: {}", e);
}
});
@@ -219,9 +222,11 @@ impl QuicTransport {
/// Handle a single bidirectional stream (one request-response exchange).
async fn handle_stream(
self: Arc<Self>,
mut send: quinn::SendStream,
mut recv: quinn::RecvStream,
shard_store: Arc<ShardStore>,
shard_stores: Vec<Arc<ShardStore>>,
cluster_state: Option<Arc<ClusterState>>,
) -> Result<()> {
// Read the full request (length-prefixed bincode + optional trailing data)
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
@@ -231,6 +236,7 @@ impl QuicTransport {
ClusterRequest::ShardWrite(write_req) => {
// Shard data follows the header in the raw buffer
let shard_data = &raw[header_len..];
let drive_index = write_req.drive_index;
let shard_id = ShardId {
bucket: write_req.bucket,
@@ -239,9 +245,10 @@ impl QuicTransport {
shard_index: write_req.shard_index,
};
let result = shard_store
.write_shard(&shard_id, &shard_data, write_req.checksum)
.await;
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store.write_shard(&shard_id, &shard_data, write_req.checksum).await,
Err(error) => Err(error),
};
let ack = ShardWriteAck {
request_id: write_req.request_id,
@@ -254,6 +261,7 @@ impl QuicTransport {
}
ClusterRequest::ShardRead(read_req) => {
let drive_index = read_req.drive_index;
let shard_id = ShardId {
bucket: read_req.bucket,
key: read_req.key,
@@ -261,7 +269,15 @@ impl QuicTransport {
shard_index: read_req.shard_index,
};
match shard_store.read_shard(&shard_id).await {
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store,
Err(error) => {
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
return Ok(());
}
};
match store.read_shard(&shard_id).await {
Ok((data, checksum)) => {
let header = ShardReadResponse {
request_id: read_req.request_id,
@@ -293,13 +309,17 @@ impl QuicTransport {
}
ClusterRequest::ShardDelete(del_req) => {
let drive_index = del_req.drive_index;
let shard_id = ShardId {
bucket: del_req.bucket,
key: del_req.key,
chunk_index: del_req.chunk_index,
shard_index: del_req.shard_index,
};
let result = shard_store.delete_shard(&shard_id).await;
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store.delete_shard(&shard_id).await,
Err(error) => Err(error),
};
let ack = protocol::ClusterResponse::ShardDeleteAck(protocol::ShardDeleteAck {
request_id: del_req.request_id,
success: result.is_ok(),
@@ -310,13 +330,22 @@ impl QuicTransport {
}
ClusterRequest::ShardHead(head_req) => {
let drive_index = head_req.drive_index;
let shard_id = ShardId {
bucket: head_req.bucket,
key: head_req.key,
chunk_index: head_req.chunk_index,
shard_index: head_req.shard_index,
};
let resp = match shard_store.head_shard(&shard_id).await {
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
Ok(store) => store,
Err(error) => {
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
return Ok(());
}
};
let resp = match store.head_shard(&shard_id).await {
Ok(Some(meta)) => protocol::ShardHeadResponse {
request_id: head_req.request_id,
found: true,
@@ -336,9 +365,103 @@ impl QuicTransport {
send.finish()?;
}
// Heartbeat, Join, TopologySync, Heal, and Manifest operations
// will be handled by the membership and coordinator modules.
// For now, send a generic ack.
ClusterRequest::JoinRequest(join_req) => {
let Some(state) = cluster_state else {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "ClusterDisabled".to_string(),
message: "Cluster state is not available".to_string(),
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
return Ok(());
};
let joining_node_id = join_req.node_info.node_id.clone();
state.add_node(join_req.node_info).await;
let topology = state.to_topology().await;
let node_drives: Vec<(String, u32)> = topology
.nodes
.iter()
.map(|node| (node.node_id.clone(), node.drive_count))
.collect();
let erasure_sets = super::placement::form_erasure_sets(
&node_drives,
topology.data_shards + topology.parity_shards,
);
state.set_erasure_sets(erasure_sets).await;
let response_topology = state.to_topology().await;
let response = protocol::encode_response(&ClusterResponse::JoinResponse(
protocol::JoinResponseMessage {
accepted: true,
topology: Some(response_topology.clone()),
error: None,
},
))?;
send.write_all(&response).await?;
send.finish()?;
self.broadcast_topology(&state, Some(response_topology), None, Some(&joining_node_id)).await;
}
ClusterRequest::Heartbeat(heartbeat) => {
let Some(state) = cluster_state else {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "ClusterDisabled".to_string(),
message: "Cluster state is not available".to_string(),
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
return Ok(());
};
let peer_node_id = heartbeat.node_id.clone();
let peer_topology_version = heartbeat.topology_version;
state.record_heartbeat(&heartbeat.node_id).await;
let local_topology_version = state.version().await;
let response = protocol::encode_response(&ClusterResponse::HeartbeatAck(
protocol::HeartbeatAckMessage {
node_id: state.local_node_id().to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
topology_version: local_topology_version,
},
))?;
send.write_all(&response).await?;
send.finish()?;
if local_topology_version > peer_topology_version {
self.broadcast_topology(&state, None, Some(&peer_node_id), None).await;
}
}
ClusterRequest::TopologySync(sync) => {
let Some(state) = cluster_state else {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: "ClusterDisabled".to_string(),
message: "Cluster state is not available".to_string(),
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
return Ok(());
};
state.apply_topology(&sync.topology).await;
let response = protocol::encode_response(&ClusterResponse::TopologySyncAck(
protocol::TopologySyncAckMessage {
accepted: true,
current_version: state.version().await,
},
))?;
send.write_all(&response).await?;
send.finish()?;
}
_ => {
let err = protocol::ErrorResponse {
request_id: String::new(),
@@ -354,6 +477,83 @@ impl QuicTransport {
Ok(())
}
fn shard_store_for_drive(
shard_stores: &[Arc<ShardStore>],
drive_index: u32,
) -> Result<Arc<ShardStore>> {
shard_stores
.get(drive_index as usize)
.cloned()
.ok_or_else(|| anyhow::anyhow!("Drive {} not found", drive_index))
}
async fn send_error_response(
send: &mut quinn::SendStream,
code: &str,
message: String,
) -> Result<()> {
let err = protocol::ErrorResponse {
request_id: String::new(),
code: code.to_string(),
message,
};
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
send.write_all(&response).await?;
send.finish()?;
Ok(())
}
async fn broadcast_topology(
&self,
state: &Arc<ClusterState>,
topology: Option<protocol::ClusterTopology>,
target_node_id: Option<&str>,
skip_node_id: Option<&str>,
) {
let topology = match topology {
Some(topology) => topology,
None => state.to_topology().await,
};
let nodes = state.all_nodes().await;
for node in nodes {
if node.info.node_id == state.local_node_id() {
continue;
}
if let Some(target_node_id) = target_node_id {
if node.info.node_id != target_node_id {
continue;
}
}
if matches!(skip_node_id, Some(skip_node_id) if node.info.node_id == skip_node_id) {
continue;
}
if node.status != NodeStatus::Online {
continue;
}
let addr = match node.info.quic_addr.parse() {
Ok(addr) => addr,
Err(error) => {
tracing::warn!(node = %node.info.node_id, error = %error, "Skipping topology sync for invalid peer address");
continue;
}
};
let conn = match self.get_connection(&node.info.node_id, addr).await {
Ok(conn) => conn,
Err(error) => {
tracing::warn!(node = %node.info.node_id, error = %error, "Failed to connect for topology sync");
continue;
}
};
let request = ClusterRequest::TopologySync(protocol::TopologySyncMessage {
topology: topology.clone(),
});
if let Err(error) = self.send_request(&conn, &request).await {
tracing::warn!(node = %node.info.node_id, error = %error, "Failed to send topology sync");
}
}
}
/// Generate self-signed TLS certificates for cluster-internal communication.
fn generate_tls_configs() -> Result<(QuinnServerConfig, ClientConfig)> {
// Generate self-signed certificate
+102 -50
View File
@@ -1,8 +1,10 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use super::placement::{DriveLocation, ErasureSet};
use super::persistence;
use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo};
/// Node status for tracking liveness.
@@ -26,6 +28,7 @@ pub struct NodeState {
pub struct ClusterState {
inner: Arc<RwLock<ClusterStateInner>>,
local_node_id: String,
topology_path: Option<PathBuf>,
}
struct ClusterStateInner {
@@ -43,6 +46,7 @@ impl ClusterState {
cluster_id: String,
data_shards: usize,
parity_shards: usize,
topology_path: Option<PathBuf>,
) -> Self {
Self {
inner: Arc::new(RwLock::new(ClusterStateInner {
@@ -54,6 +58,7 @@ impl ClusterState {
parity_shards,
})),
local_node_id,
topology_path,
}
}
@@ -61,27 +66,37 @@ impl ClusterState {
&self.local_node_id
}
pub async fn cluster_id(&self) -> String {
self.inner.read().await.cluster_id.clone()
}
/// Register a node in the cluster.
pub async fn add_node(&self, info: NodeInfo) {
let mut inner = self.inner.write().await;
let node_id = info.node_id.clone();
inner.nodes.insert(
node_id,
NodeState {
info,
status: NodeStatus::Online,
missed_heartbeats: 0,
last_heartbeat: chrono::Utc::now(),
},
);
inner.version += 1;
{
let mut inner = self.inner.write().await;
let node_id = info.node_id.clone();
inner.nodes.insert(
node_id,
NodeState {
info,
status: NodeStatus::Online,
missed_heartbeats: 0,
last_heartbeat: chrono::Utc::now(),
},
);
inner.version += 1;
}
self.persist_topology_snapshot().await;
}
/// Remove a node from the cluster.
pub async fn remove_node(&self, node_id: &str) {
let mut inner = self.inner.write().await;
inner.nodes.remove(node_id);
inner.version += 1;
{
let mut inner = self.inner.write().await;
inner.nodes.remove(node_id);
inner.version += 1;
}
self.persist_topology_snapshot().await;
}
/// Update heartbeat for a node (reset missed count).
@@ -133,9 +148,12 @@ impl ClusterState {
/// Set erasure sets (typically done once during cluster formation).
pub async fn set_erasure_sets(&self, sets: Vec<ErasureSet>) {
let mut inner = self.inner.write().await;
inner.erasure_sets = sets;
inner.version += 1;
{
let mut inner = self.inner.write().await;
inner.erasure_sets = sets;
inner.version += 1;
}
self.persist_topology_snapshot().await;
}
/// Get the erasure set for a given object based on consistent hashing.
@@ -244,48 +262,82 @@ impl ClusterState {
/// Import topology from a protocol message (e.g., received from a peer during join).
pub async fn apply_topology(&self, topology: &ClusterTopology) {
let mut inner = self.inner.write().await;
let applied = {
let mut inner = self.inner.write().await;
// Only apply if newer
if topology.version <= inner.version {
return;
}
// Only apply if newer and from the same cluster lineage. A node that has not yet
// joined any topology may adopt the seed cluster ID during its first join.
if topology.version <= inner.version {
return;
}
if topology.cluster_id != inner.cluster_id {
if inner.nodes.is_empty() {
inner.cluster_id = topology.cluster_id.clone();
} else {
return;
}
}
inner.cluster_id = topology.cluster_id.clone();
inner.version = topology.version;
inner.data_shards = topology.data_shards;
inner.parity_shards = topology.parity_shards;
inner.version = topology.version;
inner.data_shards = topology.data_shards;
inner.parity_shards = topology.parity_shards;
// Update nodes
for node_info in &topology.nodes {
if !inner.nodes.contains_key(&node_info.node_id) {
let now = chrono::Utc::now();
for node_info in &topology.nodes {
let existing_status = inner.nodes.get(&node_info.node_id).map(|node| node.status.clone());
let existing_missed_heartbeats = inner
.nodes
.get(&node_info.node_id)
.map(|node| node.missed_heartbeats);
let existing_last_heartbeat = inner
.nodes
.get(&node_info.node_id)
.map(|node| node.last_heartbeat);
inner.nodes.insert(
node_info.node_id.clone(),
NodeState {
info: node_info.clone(),
status: NodeStatus::Online,
missed_heartbeats: 0,
last_heartbeat: chrono::Utc::now(),
status: existing_status.unwrap_or(NodeStatus::Online),
missed_heartbeats: existing_missed_heartbeats.unwrap_or(0),
last_heartbeat: existing_last_heartbeat.unwrap_or(now),
},
);
}
}
// Update erasure sets
inner.erasure_sets = topology
.erasure_sets
.iter()
.map(|set| ErasureSet {
set_id: set.set_id,
drives: set
.drives
.iter()
.map(|d| DriveLocation {
node_id: d.node_id.clone(),
drive_index: d.drive_index,
})
.collect(),
})
.collect();
inner.nodes.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
// Update erasure sets
inner.erasure_sets = topology
.erasure_sets
.iter()
.map(|set| ErasureSet {
set_id: set.set_id,
drives: set
.drives
.iter()
.map(|d| DriveLocation {
node_id: d.node_id.clone(),
drive_index: d.drive_index,
})
.collect(),
})
.collect();
true
};
if applied {
self.persist_topology_snapshot().await;
}
}
async fn persist_topology_snapshot(&self) {
let Some(path) = &self.topology_path else {
return;
};
let topology = self.to_topology().await;
if let Err(error) = persistence::persist_topology(path, &topology).await {
tracing::warn!(error = %error, "Failed to persist cluster topology snapshot");
}
}
}
+5 -2
View File
@@ -45,11 +45,14 @@ pub struct AuthConfig {
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Credential {
#[serde(rename = "accessKeyId")]
pub access_key_id: String,
#[serde(rename = "secretAccessKey")]
pub secret_access_key: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub bucket_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub region: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
+249 -18
View File
@@ -7,6 +7,7 @@ use tokio::io::{AsyncBufReadExt, BufReader};
use crate::config::Credential;
use crate::config::SmartStorageConfig;
use crate::server::StorageServer;
use crate::storage::BucketExport;
#[derive(Deserialize)]
struct IpcRequest {
@@ -91,17 +92,15 @@ pub async fn management_loop() -> Result<()> {
config: SmartStorageConfig,
}
match serde_json::from_value::<StartParams>(req.params) {
Ok(params) => {
match StorageServer::start(params.config).await {
Ok(s) => {
server = Some(s);
send_response(id, serde_json::json!({}));
}
Err(e) => {
send_error(id, format!("Failed to start server: {}", e));
}
Ok(params) => match StorageServer::start(params.config).await {
Ok(s) => {
server = Some(s);
send_response(id, serde_json::json!({}));
}
}
Err(e) => {
send_error(id, format!("Failed to start server: {}", e));
}
},
Err(e) => {
send_error(id, format!("Invalid start params: {}", e));
}
@@ -126,10 +125,7 @@ pub async fn management_loop() -> Result<()> {
send_response(id, serde_json::json!({}));
}
Err(e) => {
send_error(
id,
format!("Failed to create bucket: {}", e),
);
send_error(id, format!("Failed to create bucket: {}", e));
}
}
} else {
@@ -141,6 +137,244 @@ pub async fn management_loop() -> Result<()> {
}
}
}
"createBucketTenant" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct CreateBucketTenantParams {
bucket_name: String,
access_key_id: String,
secret_access_key: String,
region: Option<String>,
}
match serde_json::from_value::<CreateBucketTenantParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
let credential = Credential {
access_key_id: params.access_key_id,
secret_access_key: params.secret_access_key,
bucket_name: Some(params.bucket_name.clone()),
region: params.region,
};
match s
.create_bucket_tenant(&params.bucket_name, credential)
.await
{
Ok(credential) => match serde_json::to_value(credential) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket tenant: {}", error),
),
},
Err(error) => send_error(
id,
format!("Failed to create bucket tenant: {}", error),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid createBucketTenant params: {}", error));
}
}
}
"deleteBucketTenant" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct DeleteBucketTenantParams {
bucket_name: String,
access_key_id: Option<String>,
}
match serde_json::from_value::<DeleteBucketTenantParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s
.delete_bucket_tenant(
&params.bucket_name,
params.access_key_id.as_deref(),
)
.await
{
Ok(()) => send_response(id, serde_json::json!({})),
Err(error) => send_error(
id,
format!("Failed to delete bucket tenant: {}", error),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid deleteBucketTenant params: {}", error));
}
}
}
"rotateBucketTenantCredentials" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct RotateBucketTenantCredentialsParams {
bucket_name: String,
access_key_id: String,
secret_access_key: String,
region: Option<String>,
}
match serde_json::from_value::<RotateBucketTenantCredentialsParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
let credential = Credential {
access_key_id: params.access_key_id,
secret_access_key: params.secret_access_key,
bucket_name: Some(params.bucket_name.clone()),
region: params.region,
};
match s
.rotate_bucket_tenant_credentials(&params.bucket_name, credential)
.await
{
Ok(credential) => match serde_json::to_value(credential) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket tenant: {}", error),
),
},
Err(error) => send_error(
id,
format!(
"Failed to rotate bucket tenant credentials: {}",
error
),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(
id,
format!("Invalid rotateBucketTenantCredentials params: {}", error),
);
}
}
}
"listBucketTenants" => {
if let Some(ref s) = server {
match serde_json::to_value(s.list_bucket_tenants().await) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(id, format!("Failed to serialize bucket tenants: {}", error))
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
"getBucketTenantCredential" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetBucketTenantCredentialParams {
bucket_name: String,
}
match serde_json::from_value::<GetBucketTenantCredentialParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s.get_bucket_tenant_credential(&params.bucket_name).await {
Some(credential) => match serde_json::to_value(credential) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket tenant: {}", error),
),
},
None => send_error(
id,
format!(
"No bucket tenant credential exists for bucket {}",
params.bucket_name
),
),
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(
id,
format!("Invalid getBucketTenantCredential params: {}", error),
);
}
}
}
"exportBucket" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ExportBucketParams {
bucket_name: String,
}
match serde_json::from_value::<ExportBucketParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s.store().export_bucket(&params.bucket_name).await {
Ok(export) => match serde_json::to_value(export) {
Ok(value) => send_response(id, value),
Err(error) => send_error(
id,
format!("Failed to serialize bucket export: {}", error),
),
},
Err(error) => {
send_error(id, format!("Failed to export bucket: {}", error))
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid exportBucket params: {}", error));
}
}
}
"importBucket" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ImportBucketParams {
bucket_name: String,
source: BucketExport,
}
match serde_json::from_value::<ImportBucketParams>(req.params) {
Ok(params) => {
if let Some(ref s) = server {
match s
.store()
.import_bucket(&params.bucket_name, params.source)
.await
{
Ok(()) => send_response(id, serde_json::json!({})),
Err(error) => {
send_error(id, format!("Failed to import bucket: {}", error))
}
}
} else {
send_error(id, "Server not started".to_string());
}
}
Err(error) => {
send_error(id, format!("Invalid importBucket params: {}", error));
}
}
}
"getStorageStats" => {
if let Some(ref s) = server {
match s.store().get_storage_stats().await {
@@ -186,10 +420,7 @@ pub async fn management_loop() -> Result<()> {
match serde_json::to_value(s.list_credentials().await) {
Ok(value) => send_response(id, value),
Err(error) => {
send_error(
id,
format!("Failed to serialize credentials: {}", error),
);
send_error(id, format!("Failed to serialize credentials: {}", error));
}
}
} else {
+495 -74
View File
@@ -10,6 +10,7 @@ use hyper_util::rt::TokioIo;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::AsyncReadExt;
@@ -20,34 +21,61 @@ use uuid::Uuid;
use crate::action::{self, RequestContext, StorageAction};
use crate::auth::{self, AuthenticatedIdentity};
use crate::config::SmartStorageConfig;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::error::StorageError;
use crate::cluster::coordinator::DistributedStore;
use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager;
use crate::cluster::persistence::{self, ClusterIdentity};
use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo;
use crate::cluster::quic_transport::QuicTransport;
use crate::cluster::shard_store::ShardStore;
use crate::cluster::state::ClusterState;
use crate::config::{Credential, SmartStorageConfig};
use crate::error::StorageError;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::storage::{FileStore, StorageBackend};
use crate::xml_response;
struct ServerMetrics {
started_at: chrono::DateTime<chrono::Utc>,
total_requests: AtomicU64,
error_responses: AtomicU64,
}
impl ServerMetrics {
fn new() -> Self {
Self {
started_at: chrono::Utc::now(),
total_requests: AtomicU64::new(0),
error_responses: AtomicU64::new(0),
}
}
fn record_response(&self, status: StatusCode) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
if status.as_u16() >= 400 {
self.error_responses.fetch_add(1, Ordering::Relaxed);
}
}
}
pub struct StorageServer {
store: Arc<StorageBackend>,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
shutdown_tx: watch::Sender<bool>,
cluster_shutdown_txs: Vec<watch::Sender<bool>>,
server_handle: tokio::task::JoinHandle<()>,
}
impl StorageServer {
pub async fn start(config: SmartStorageConfig) -> Result<Self> {
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
let mut cluster_shutdown_txs = Vec::new();
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
if cluster_config.enabled {
Self::start_clustered(&config, cluster_config).await?
let (store, shutdown_txs) = Self::start_clustered(&config, cluster_config).await?;
cluster_shutdown_txs = shutdown_txs;
store
} else {
Self::start_standalone(&config).await?
}
@@ -59,8 +87,12 @@ impl StorageServer {
let policy_store = Arc::new(PolicyStore::new(store.policies_dir()));
policy_store.load_from_disk().await?;
let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port)
.parse()?;
let auth_runtime = Arc::new(
auth::RuntimeCredentialStore::new(&config.auth, Some(Self::credentials_path(&config)))
.await?,
);
let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port).parse()?;
let listener = TcpListener::bind(addr).await?;
let (shutdown_tx, shutdown_rx) = watch::channel(false);
@@ -69,6 +101,7 @@ impl StorageServer {
let server_config = config.clone();
let server_auth_runtime = auth_runtime.clone();
let server_policy_store = policy_store.clone();
let server_metrics = Arc::new(ServerMetrics::new());
let server_handle = tokio::spawn(async move {
loop {
@@ -83,6 +116,7 @@ impl StorageServer {
let cfg = server_config.clone();
let auth_runtime = server_auth_runtime.clone();
let ps = server_policy_store.clone();
let metrics = server_metrics.clone();
tokio::spawn(async move {
let svc = service_fn(move |req: Request<Incoming>| {
@@ -90,8 +124,9 @@ impl StorageServer {
let cfg = cfg.clone();
let auth_runtime = auth_runtime.clone();
let ps = ps.clone();
let metrics = metrics.clone();
async move {
handle_request(req, store, cfg, auth_runtime, ps).await
handle_request(req, store, cfg, auth_runtime, ps, metrics).await
}
});
@@ -126,11 +161,15 @@ impl StorageServer {
store,
auth_runtime,
shutdown_tx,
cluster_shutdown_txs,
server_handle,
})
}
pub async fn stop(self) {
for shutdown_tx in &self.cluster_shutdown_txs {
let _ = shutdown_tx.send(true);
}
let _ = self.shutdown_tx.send(true);
let _ = self.server_handle.await;
}
@@ -139,21 +178,87 @@ impl StorageServer {
&self.store
}
pub async fn list_credentials(&self) -> Vec<crate::config::Credential> {
pub async fn list_credentials(&self) -> Vec<crate::auth::CredentialMetadata> {
self.auth_runtime.list_credentials().await
}
pub async fn replace_credentials(
&self,
credentials: Vec<crate::config::Credential>,
credentials: Vec<Credential>,
) -> Result<(), StorageError> {
self.auth_runtime.replace_credentials(credentials).await
}
pub async fn create_bucket_tenant(
&self,
bucket_name: &str,
credential: Credential,
) -> Result<Credential> {
self.ensure_tenant_auth_enabled()?;
self.store.create_bucket(bucket_name).await?;
Ok(self
.auth_runtime
.replace_bucket_tenant_credential(bucket_name, credential)
.await?)
}
pub async fn rotate_bucket_tenant_credentials(
&self,
bucket_name: &str,
credential: Credential,
) -> Result<Credential> {
self.ensure_tenant_auth_enabled()?;
if !self.store.bucket_exists(bucket_name).await {
return Err(StorageError::no_such_bucket().into());
}
Ok(self
.auth_runtime
.replace_bucket_tenant_credential(bucket_name, credential)
.await?)
}
pub async fn delete_bucket_tenant(
&self,
bucket_name: &str,
access_key_id: Option<&str>,
) -> Result<()> {
self.ensure_tenant_auth_enabled()?;
self.auth_runtime
.remove_bucket_tenant_credentials(bucket_name, access_key_id)
.await?;
if access_key_id.is_none() && self.store.bucket_exists(bucket_name).await {
self.store.delete_bucket_recursive(bucket_name).await?;
}
Ok(())
}
pub async fn list_bucket_tenants(&self) -> Vec<crate::auth::BucketTenantMetadata> {
self.auth_runtime.list_bucket_tenants().await
}
pub async fn get_bucket_tenant_credential(&self, bucket_name: &str) -> Option<Credential> {
self.auth_runtime
.get_bucket_tenant_credential(bucket_name)
.await
}
fn ensure_tenant_auth_enabled(&self) -> Result<()> {
if !self.auth_runtime.enabled() {
anyhow::bail!("Bucket tenants require auth.enabled=true");
}
Ok(())
}
fn credentials_path(config: &SmartStorageConfig) -> std::path::PathBuf {
std::path::PathBuf::from(&config.storage.directory)
.join(".smartstorage")
.join("credentials.json")
}
async fn start_standalone(config: &SmartStorageConfig) -> Result<Arc<StorageBackend>> {
let store = Arc::new(StorageBackend::Standalone(
FileStore::new(config.storage.directory.clone().into()),
));
let store = Arc::new(StorageBackend::Standalone(FileStore::new(
config.storage.directory.clone().into(),
)));
if config.storage.clean_slate {
store.reset().await?;
} else {
@@ -165,12 +270,43 @@ impl StorageServer {
async fn start_clustered(
config: &SmartStorageConfig,
cluster_config: &crate::cluster::config::ClusterConfig,
) -> Result<Arc<StorageBackend>> {
) -> Result<(Arc<StorageBackend>, Vec<watch::Sender<bool>>)> {
let erasure_config = cluster_config.erasure.clone();
let cluster_metadata_dir = persistence::cluster_metadata_dir(&config.storage.directory);
let identity_path = persistence::identity_path(&cluster_metadata_dir);
let topology_path = persistence::topology_path(&cluster_metadata_dir);
let persisted_identity = persistence::load_identity(&identity_path).await?;
if let (Some(configured_node_id), Some(identity)) =
(&cluster_config.node_id, &persisted_identity)
{
if configured_node_id != &identity.node_id {
anyhow::bail!(
"Configured cluster node ID '{}' conflicts with persisted node ID '{}'",
configured_node_id,
identity.node_id
);
}
}
let node_id = cluster_config
.node_id
.clone()
.or_else(|| {
persisted_identity
.as_ref()
.map(|identity| identity.node_id.clone())
})
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let cluster_id = persisted_identity
.as_ref()
.map(|identity| identity.cluster_id.clone())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
persistence::persist_identity(
&identity_path,
&ClusterIdentity::new(node_id.clone(), cluster_id.clone()),
)
.await?;
// Determine drive paths
let drive_paths: Vec<std::path::PathBuf> = if cluster_config.drives.paths.is_empty() {
@@ -202,28 +338,39 @@ impl StorageServer {
// Initialize cluster state
let cluster_state = Arc::new(ClusterState::new(
node_id.clone(),
uuid::Uuid::new_v4().to_string(),
cluster_id.clone(),
erasure_config.data_shards,
erasure_config.parity_shards,
Some(topology_path.clone()),
));
// Form erasure sets from local drives (single-node for now)
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
let erasure_sets =
placement::form_erasure_sets(&nodes, erasure_config.total_shards());
let persisted_topology = persistence::load_topology(&topology_path).await?;
let has_persisted_topology = persisted_topology.is_some();
if let Some(topology) = persisted_topology {
if topology.cluster_id != cluster_id {
anyhow::bail!(
"Persisted topology cluster ID does not match persisted node identity"
);
}
cluster_state.apply_topology(&topology).await;
} else if cluster_config.seed_nodes.is_empty() {
// Form erasure sets from local drives for a first node bootstrap.
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
let erasure_sets = placement::form_erasure_sets(&nodes, erasure_config.total_shards());
if erasure_sets.is_empty() {
tracing::warn!(
"Not enough drives ({}) for erasure set size ({}). \
Need at least {} drives.",
drive_paths.len(),
erasure_config.total_shards(),
erasure_config.total_shards(),
);
if erasure_sets.is_empty() {
tracing::warn!(
"Not enough drives ({}) for erasure set size ({}). \
Need at least {} drives.",
drive_paths.len(),
erasure_config.total_shards(),
erasure_config.total_shards(),
);
}
cluster_state.set_erasure_sets(erasure_sets).await;
}
cluster_state.set_erasure_sets(erasure_sets).await;
// Register self as a node
let local_node_info = NodeInfo {
node_id: node_id.clone(),
@@ -233,8 +380,6 @@ impl StorageServer {
status: "online".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
};
cluster_state.add_node(local_node_info.clone()).await;
// Initialize drive manager for health monitoring
let drive_manager = Arc::new(tokio::sync::Mutex::new(
DriveManager::from_paths(&drive_paths).await?,
@@ -246,13 +391,25 @@ impl StorageServer {
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
cluster_config.heartbeat_timeout_ms,
local_node_info,
)
.with_drive_manager(drive_manager.clone()),
);
membership
.join_cluster(&cluster_config.seed_nodes)
.join_cluster(
&cluster_config.seed_nodes,
cluster_config.seed_nodes.is_empty() && !has_persisted_topology,
)
.await?;
let final_cluster_id = cluster_state.cluster_id().await;
if final_cluster_id != cluster_id {
persistence::persist_identity(
&identity_path,
&ClusterIdentity::new(node_id.clone(), final_cluster_id),
)
.await?;
}
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
@@ -261,18 +418,23 @@ impl StorageServer {
.collect();
// Start QUIC accept loop for incoming connections
let shard_store_for_accept = local_shard_stores[0].clone();
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone();
let cluster_state_for_accept = cluster_state.clone();
let shard_stores_for_accept = local_shard_stores.clone();
tokio::spawn(async move {
transport_clone
.accept_loop(shard_store_for_accept, quic_shutdown_rx)
.accept_loop(
shard_stores_for_accept,
Some(cluster_state_for_accept),
quic_shutdown_rx,
)
.await;
});
// Start heartbeat loop
let membership_clone = membership.clone();
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
});
@@ -289,7 +451,7 @@ impl StorageServer {
24, // scan every 24 hours
healing_runtime.clone(),
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
let (heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
@@ -319,7 +481,10 @@ impl StorageServer {
);
}
Ok(store)
Ok((
store,
vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx],
))
}
}
@@ -333,17 +498,26 @@ impl SmartStorageConfig {
// Request handling
// ============================
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
type BoxBody =
http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
fn full_body(data: impl Into<Bytes>) -> BoxBody {
http_body_util::Full::new(data.into())
.map_err(|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> { match never {} })
.map_err(
|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> {
match never {}
},
)
.boxed()
}
fn empty_body() -> BoxBody {
http_body_util::Empty::new()
.map_err(|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> { match never {} })
.map_err(
|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> {
match never {}
},
)
.boxed()
}
@@ -364,10 +538,10 @@ impl Stream for FrameStream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
match inner.poll_next(cx) {
Poll::Ready(Some(Ok(bytes))) => {
Poll::Ready(Some(Ok(hyper::body::Frame::data(bytes))))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>))),
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(hyper::body::Frame::data(bytes)))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(
Box::new(e) as Box<dyn std::error::Error + Send + Sync>
))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
@@ -401,12 +575,40 @@ fn storage_error_response(err: &StorageError, request_id: &str) -> Response<BoxB
.unwrap()
}
fn json_response(
status: StatusCode,
value: serde_json::Value,
request_id: &str,
) -> Response<BoxBody> {
Response::builder()
.status(status)
.header("content-type", "application/json")
.header("x-amz-request-id", request_id)
.body(full_body(value.to_string()))
.unwrap()
}
fn text_response(
status: StatusCode,
content_type: &str,
body: String,
request_id: &str,
) -> Response<BoxBody> {
Response::builder()
.status(status)
.header("content-type", content_type)
.header("x-amz-request-id", request_id)
.body(full_body(body))
.unwrap()
}
async fn handle_request(
req: Request<Incoming>,
store: Arc<StorageBackend>,
config: SmartStorageConfig,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
policy_store: Arc<PolicyStore>,
metrics: Arc<ServerMetrics>,
) -> Result<Response<BoxBody>, std::convert::Infallible> {
let request_id = Uuid::new_v4().to_string();
let method = req.method().clone();
@@ -416,6 +618,26 @@ async fn handle_request(
// Handle CORS preflight
if config.cors.enabled && method == Method::OPTIONS {
let resp = build_cors_preflight(&config, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
if method == Method::GET && uri.path().starts_with("/-/") {
let resp =
match handle_operational_request(uri.path(), store, &config, &metrics, &request_id)
.await
{
Ok(resp) => resp,
Err(error) => {
tracing::error!(error = %error, "Operational endpoint failed");
json_response(
StatusCode::INTERNAL_SERVER_ERROR,
serde_json::json!({ "ok": false, "error": error.to_string() }),
&request_id,
)
}
};
metrics.record_response(resp.status());
return Ok(resp);
}
@@ -439,7 +661,9 @@ async fn handle_request(
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("Auth failed: {}", e.message);
return Ok(storage_error_response(&e, &request_id));
let resp = storage_error_response(&e, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
}
} else {
@@ -449,7 +673,9 @@ async fn handle_request(
// Step 3: Authorization (policy evaluation)
if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await {
return Ok(storage_error_response(&e, &request_id));
let resp = storage_error_response(&e, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
}
@@ -481,15 +707,134 @@ async fn handle_request(
"request"
);
metrics.record_response(response.status());
Ok(response)
}
async fn handle_operational_request(
path: &str,
store: Arc<StorageBackend>,
config: &SmartStorageConfig,
metrics: &ServerMetrics,
request_id: &str,
) -> Result<Response<BoxBody>> {
match path {
"/-/live" | "/-/livez" => Ok(json_response(
StatusCode::OK,
serde_json::json!({
"ok": true,
"status": "alive",
"startedAt": metrics.started_at.timestamp_millis(),
}),
request_id,
)),
"/-/ready" | "/-/readyz" => {
let cluster_health = store.get_cluster_health().await?;
let cluster_ready = !cluster_health.enabled
|| (cluster_health.majority_healthy.unwrap_or(false)
&& cluster_health.quorum_healthy.unwrap_or(false));
let status = if cluster_ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
Ok(json_response(
status,
serde_json::json!({
"ok": cluster_ready,
"status": if cluster_ready { "ready" } else { "degraded" },
"cluster": cluster_health,
}),
request_id,
))
}
"/-/health" | "/-/healthz" => {
let cluster_health = store.get_cluster_health().await?;
let stats = store.get_storage_stats().await?;
Ok(json_response(
StatusCode::OK,
serde_json::json!({
"ok": true,
"status": "healthy",
"version": env!("CARGO_PKG_VERSION"),
"server": {
"address": config.server.address,
"port": config.server.port,
"startedAt": metrics.started_at.timestamp_millis(),
},
"storage": stats,
"cluster": cluster_health,
"metrics": {
"totalRequests": metrics.total_requests.load(Ordering::Relaxed),
"errorResponses": metrics.error_responses.load(Ordering::Relaxed),
},
}),
request_id,
))
}
"/-/metrics" => {
let cluster_health = store.get_cluster_health().await?;
let stats = store.get_storage_stats().await?;
let cluster_enabled = if cluster_health.enabled { 1 } else { 0 };
let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) {
1
} else {
0
};
let body = format!(
"# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\
# TYPE smartstorage_requests_total counter\n\
smartstorage_requests_total {}\n\
# HELP smartstorage_error_responses_total HTTP responses with status >= 400.\n\
# TYPE smartstorage_error_responses_total counter\n\
smartstorage_error_responses_total {}\n\
# HELP smartstorage_buckets_total Runtime bucket count.\n\
# TYPE smartstorage_buckets_total gauge\n\
smartstorage_buckets_total {}\n\
# HELP smartstorage_objects_total Runtime object count.\n\
# TYPE smartstorage_objects_total gauge\n\
smartstorage_objects_total {}\n\
# HELP smartstorage_cluster_enabled Cluster mode enabled.\n\
# TYPE smartstorage_cluster_enabled gauge\n\
smartstorage_cluster_enabled {}\n\
# HELP smartstorage_cluster_quorum_healthy Cluster quorum health.\n\
# TYPE smartstorage_cluster_quorum_healthy gauge\n\
smartstorage_cluster_quorum_healthy {}\n",
metrics.total_requests.load(Ordering::Relaxed),
metrics.error_responses.load(Ordering::Relaxed),
stats.bucket_count,
stats.total_object_count,
cluster_enabled,
quorum_healthy,
);
Ok(text_response(
StatusCode::OK,
"text/plain; version=0.0.4",
body,
request_id,
))
}
_ => Ok(json_response(
StatusCode::NOT_FOUND,
serde_json::json!({ "ok": false, "error": "Unknown operational endpoint" }),
request_id,
)),
}
}
/// Authorize a request based on bucket policies and authentication state.
async fn authorize_request(
ctx: &RequestContext,
identity: Option<&AuthenticatedIdentity>,
policy_store: &PolicyStore,
) -> Result<(), StorageError> {
if let Some(identity) = identity {
if let Some(bucket_name) = identity.bucket_name.as_deref() {
authorize_scoped_credential(ctx, bucket_name)?;
}
}
// ListAllMyBuckets requires authentication (no bucket to apply policy to)
if ctx.action == StorageAction::ListAllMyBuckets {
if identity.is_none() {
@@ -520,6 +865,46 @@ async fn authorize_request(
Ok(())
}
fn authorize_scoped_credential(
ctx: &RequestContext,
bucket_name: &str,
) -> Result<(), StorageError> {
let Some(request_bucket) = ctx.bucket.as_deref() else {
return Err(StorageError::access_denied());
};
if request_bucket != bucket_name {
return Err(StorageError::access_denied());
}
if let Some(source_bucket) = ctx.source_bucket.as_deref() {
if source_bucket != bucket_name {
return Err(StorageError::access_denied());
}
}
match ctx.action {
StorageAction::CreateBucket
| StorageAction::DeleteBucket
| StorageAction::GetBucketPolicy
| StorageAction::PutBucketPolicy
| StorageAction::DeleteBucketPolicy
| StorageAction::ListAllMyBuckets => Err(StorageError::access_denied()),
StorageAction::HeadBucket
| StorageAction::ListBucket
| StorageAction::GetObject
| StorageAction::HeadObject
| StorageAction::PutObject
| StorageAction::DeleteObject
| StorageAction::CopyObject
| StorageAction::ListBucketMultipartUploads
| StorageAction::AbortMultipartUpload
| StorageAction::InitiateMultipartUpload
| StorageAction::UploadPart
| StorageAction::CompleteMultipartUpload => Ok(()),
}
}
// ============================
// Routing
// ============================
@@ -558,9 +943,16 @@ async fn route_request(
// Check for ?policy query parameter
if query.contains_key("policy") {
return match method {
Method::GET => handle_get_bucket_policy(policy_store, &bucket, request_id).await,
Method::PUT => handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id).await,
Method::DELETE => handle_delete_bucket_policy(policy_store, &bucket, request_id).await,
Method::GET => {
handle_get_bucket_policy(policy_store, &bucket, request_id).await
}
Method::PUT => {
handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id)
.await
}
Method::DELETE => {
handle_delete_bucket_policy(policy_store, &bucket, request_id).await
}
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
};
}
@@ -574,7 +966,9 @@ async fn route_request(
}
}
Method::PUT => handle_create_bucket(store, &bucket, request_id).await,
Method::DELETE => handle_delete_bucket(store, &bucket, request_id, policy_store).await,
Method::DELETE => {
handle_delete_bucket(store, &bucket, request_id, policy_store).await
}
Method::HEAD => handle_head_bucket(store, &bucket, request_id).await,
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
}
@@ -594,12 +988,8 @@ async fn route_request(
handle_put_object(req, store, &bucket, &key, request_id).await
}
}
Method::GET => {
handle_get_object(req, store, &bucket, &key, request_id).await
}
Method::HEAD => {
handle_head_object(store, &bucket, &key, request_id).await
}
Method::GET => handle_get_object(req, store, &bucket, &key, request_id).await,
Method::HEAD => handle_head_object(store, &bucket, &key, request_id).await,
Method::DELETE => {
if query.contains_key("uploadId") {
let upload_id = query.get("uploadId").unwrap();
@@ -613,7 +1003,8 @@ async fn route_request(
handle_initiate_multipart(req, store, &bucket, &key, request_id).await
} else if query.contains_key("uploadId") {
let upload_id = query.get("uploadId").unwrap().clone();
handle_complete_multipart(req, store, &bucket, &key, &upload_id, request_id).await
handle_complete_multipart(req, store, &bucket, &key, &upload_id, request_id)
.await
} else {
let err = StorageError::invalid_request("Invalid POST request");
Ok(storage_error_response(&err, request_id))
@@ -742,7 +1133,13 @@ async fn handle_get_object(
let mut builder = Response::builder()
.header("ETag", format!("\"{}\"", result.md5))
.header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string())
.header(
"Last-Modified",
result
.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string(),
)
.header("Content-Type", &content_type)
.header("Accept-Ranges", "bytes")
.header("x-amz-request-id", request_id);
@@ -793,7 +1190,13 @@ async fn handle_head_object(
let mut builder = Response::builder()
.status(StatusCode::OK)
.header("ETag", format!("\"{}\"", result.md5))
.header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string())
.header(
"Last-Modified",
result
.last_modified
.format("%a, %d %b %Y %H:%M:%S GMT")
.to_string(),
)
.header("Content-Type", &content_type)
.header("Content-Length", result.size.to_string())
.header("Accept-Ranges", "bytes")
@@ -856,7 +1259,14 @@ async fn handle_copy_object(
};
let result = store
.copy_object(&src_bucket, &src_key, dest_bucket, dest_key, &metadata_directive, new_metadata)
.copy_object(
&src_bucket,
&src_key,
dest_bucket,
dest_key,
&metadata_directive,
new_metadata,
)
.await?;
let xml = xml_response::copy_object_result_xml(&result.md5, &result.last_modified.to_rfc3339());
@@ -900,7 +1310,11 @@ async fn handle_put_bucket_policy(
}
// Read body
let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes();
let body_bytes = req
.collect()
.await
.map_err(|e| anyhow::anyhow!("Body error: {}", e))?
.to_bytes();
let body_str = String::from_utf8_lossy(&body_bytes);
// Validate and parse
@@ -982,7 +1396,11 @@ async fn handle_complete_multipart(
request_id: &str,
) -> Result<Response<BoxBody>> {
// Read request body (XML)
let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes();
let body_bytes = req
.collect()
.await
.map_err(|e| anyhow::anyhow!("Body error: {}", e))?
.to_bytes();
let body_str = String::from_utf8_lossy(&body_bytes);
// Parse parts from XML using regex-like approach
@@ -1046,8 +1464,12 @@ fn extract_metadata(headers: &hyper::HeaderMap) -> HashMap<String, String> {
let name_str = name.as_str().to_lowercase();
if let Ok(val) = value.to_str() {
match name_str.as_str() {
"content-type" | "cache-control" | "content-disposition"
| "content-encoding" | "content-language" | "expires" => {
"content-type"
| "cache-control"
| "content-disposition"
| "content-encoding"
| "content-language"
| "expires" => {
metadata.insert(name_str, val.to_string());
}
_ if name_str.starts_with("x-amz-meta-") => {
@@ -1060,7 +1482,10 @@ fn extract_metadata(headers: &hyper::HeaderMap) -> HashMap<String, String> {
// Default content-type
if !metadata.contains_key("content-type") {
metadata.insert("content-type".to_string(), "binary/octet-stream".to_string());
metadata.insert(
"content-type".to_string(),
"binary/octet-stream".to_string(),
);
}
metadata
@@ -1095,10 +1520,9 @@ fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
if let Some(part_end) = after_part.find("</Part>") {
let part_content = &after_part[..part_end];
let part_number = extract_xml_value(part_content, "PartNumber")
.and_then(|s| s.parse::<u32>().ok());
let etag = extract_xml_value(part_content, "ETag")
.map(|s| s.replace('"', ""));
let part_number =
extract_xml_value(part_content, "PartNumber").and_then(|s| s.parse::<u32>().ok());
let etag = extract_xml_value(part_content, "ETag").map(|s| s.replace('"', ""));
if let (Some(pn), Some(et)) = (part_number, etag) {
parts.push((pn, et));
@@ -1164,9 +1588,6 @@ fn add_cors_headers(headers: &mut hyper::HeaderMap, config: &SmartStorageConfig)
);
}
if config.cors.allow_credentials == Some(true) {
headers.insert(
"access-control-allow-credentials",
"true".parse().unwrap(),
);
headers.insert("access-control-allow-credentials", "true".parse().unwrap());
}
}
+136
View File
@@ -99,6 +99,25 @@ pub struct StorageStats {
pub storage_locations: Vec<StorageLocationSummary>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketExport {
pub format: String,
pub bucket_name: String,
pub exported_at: i64,
pub objects: Vec<BucketExportObject>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketExportObject {
pub key: String,
pub size: u64,
pub md5: String,
pub metadata: HashMap<String, String>,
pub data_hex: String,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClusterPeerHealth {
@@ -593,6 +612,40 @@ impl FileStore {
Ok(PutResult { md5: md5_hex })
}
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let previous_size = self.object_size_if_exists(bucket, key).await;
let object_path = self.object_path(bucket, key);
if let Some(parent) = object_path.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&object_path, data).await?;
let md5_hex = format!("{:x}", Md5::digest(data));
fs::write(format!("{}.md5", object_path.display()), &md5_hex).await?;
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(
format!("{}.metadata.json", object_path.display()),
metadata_json,
)
.await?;
self.track_object_upsert(bucket, previous_size, data.len() as u64)
.await;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object(
&self,
bucket: &str,
@@ -1311,6 +1364,25 @@ impl StorageBackend {
}
}
pub async fn delete_bucket_recursive(&self, bucket: &str) -> Result<()> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
loop {
let objects = self.list_objects(bucket, "", "", 1000, None).await?;
if objects.contents.is_empty() {
break;
}
for object in objects.contents {
self.delete_object(bucket, &object.key).await?;
}
}
self.delete_bucket(bucket).await
}
pub async fn put_object(
&self,
bucket: &str,
@@ -1324,6 +1396,21 @@ impl StorageBackend {
}
}
pub async fn put_object_bytes(
&self,
bucket: &str,
key: &str,
data: &[u8],
metadata: HashMap<String, String>,
) -> Result<PutResult> {
match self {
StorageBackend::Standalone(fs) => {
fs.put_object_bytes(bucket, key, data, metadata).await
}
StorageBackend::Clustered(ds) => ds.put_object_bytes(bucket, key, data, metadata).await,
}
}
pub async fn get_object(
&self,
bucket: &str,
@@ -1453,6 +1540,55 @@ impl StorageBackend {
StorageBackend::Clustered(ds) => ds.list_multipart_uploads(bucket).await,
}
}
pub async fn export_bucket(&self, bucket: &str) -> Result<BucketExport> {
if !self.bucket_exists(bucket).await {
return Err(StorageError::no_such_bucket().into());
}
let objects = self.list_objects(bucket, "", "", usize::MAX, None).await?;
let mut exported_objects = Vec::with_capacity(objects.contents.len());
for object in objects.contents {
let result = self.get_object(bucket, &object.key, None).await?;
let mut file = result.body;
let mut data = Vec::with_capacity(result.size as usize);
file.read_to_end(&mut data).await?;
exported_objects.push(BucketExportObject {
key: object.key,
size: result.size,
md5: result.md5,
metadata: result.metadata,
data_hex: hex::encode(data),
});
}
Ok(BucketExport {
format: "smartstorage.bucket.v1".to_string(),
bucket_name: bucket.to_string(),
exported_at: Utc::now().timestamp_millis(),
objects: exported_objects,
})
}
pub async fn import_bucket(&self, bucket: &str, source: BucketExport) -> Result<()> {
if source.format != "smartstorage.bucket.v1" {
return Err(StorageError::invalid_request("Unsupported bucket export format.").into());
}
if !self.bucket_exists(bucket).await {
self.create_bucket(bucket).await?;
}
for object in source.objects {
let data = hex::decode(&object.data_hex)
.map_err(|error| StorageError::invalid_request(&error.to_string()))?;
self.put_object_bytes(bucket, &object.key, &data, object.metadata)
.await?;
}
Ok(())
}
}
// ============================
+335
View File
@@ -0,0 +1,335 @@
/// <reference types="node" />
import { expect, tap } from '@git.zone/tstest/tapbundle';
import {
CopyObjectCommand,
GetBucketPolicyCommand,
GetObjectCommand,
HeadBucketCommand,
ListBucketsCommand,
ListObjectsV2Command,
PutBucketPolicyCommand,
PutObjectCommand,
DeleteObjectCommand,
S3Client,
} from '@aws-sdk/client-s3';
import { rm } from 'fs/promises';
import { fileURLToPath } from 'url';
import { Readable } from 'stream';
import * as smartstorage from '../ts/index.js';
const TEST_PORT = 3361;
const STORAGE_DIR = fileURLToPath(new URL('../.nogit/bucket-tenant-tests', import.meta.url));
const WORKAPP_A_BUCKET = 'workapp-a-bucket';
const WORKAPP_B_BUCKET = 'workapp-b-bucket';
const RESTORE_BUCKET = 'workapp-a-restore-bucket';
const POLICY_BUCKET = 'workapp-policy-bucket';
const ADMIN_CREDENTIAL: smartstorage.IStorageCredential = {
accessKeyId: 'TENANTADMIN',
secretAccessKey: 'TENANTADMINSECRET123',
};
let testSmartStorageInstance: smartstorage.SmartStorage;
let adminClient: S3Client;
let tenantA: smartstorage.IBucketTenantDescriptor;
let tenantB: smartstorage.IBucketTenantDescriptor;
let tenantAClient: S3Client;
let tenantBClient: S3Client;
let oldTenantAClient: S3Client;
function createS3Client(
credential: smartstorage.IStorageCredential,
region = 'us-east-1',
): S3Client {
return new S3Client({
endpoint: `http://localhost:${TEST_PORT}`,
region,
credentials: {
accessKeyId: credential.accessKeyId,
secretAccessKey: credential.secretAccessKey,
},
forcePathStyle: true,
});
}
function createS3ClientFromDescriptor(
descriptor: smartstorage.IBucketTenantDescriptor,
): S3Client {
return new S3Client({
endpoint: `http://${descriptor.endpoint}:${descriptor.port}`,
region: descriptor.region,
credentials: {
accessKeyId: descriptor.accessKeyId,
secretAccessKey: descriptor.secretAccessKey,
},
forcePathStyle: true,
});
}
async function streamToString(stream: Readable): Promise<string> {
const chunks: Buffer[] = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk)));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
}
async function startStorage() {
testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({
server: {
port: TEST_PORT,
silent: true,
region: 'us-east-1',
},
storage: {
directory: STORAGE_DIR,
cleanSlate: false,
},
auth: {
enabled: true,
credentials: [ADMIN_CREDENTIAL],
},
});
adminClient = createS3Client(ADMIN_CREDENTIAL);
}
tap.test('setup: start storage and provision bucket tenants', async () => {
await rm(STORAGE_DIR, { recursive: true, force: true });
await startStorage();
tenantA = await testSmartStorageInstance.createBucketTenant({
bucketName: WORKAPP_A_BUCKET,
});
tenantB = await testSmartStorageInstance.createBucketTenant({
bucketName: WORKAPP_B_BUCKET,
});
tenantAClient = createS3ClientFromDescriptor(tenantA);
tenantBClient = createS3ClientFromDescriptor(tenantB);
});
tap.test('tenant descriptors expose app-ready S3 connection data', async () => {
expect(tenantA.endpoint).toEqual('localhost');
expect(tenantA.port).toEqual(TEST_PORT);
expect(tenantA.region).toEqual('us-east-1');
expect(tenantA.bucket).toEqual(WORKAPP_A_BUCKET);
expect(tenantA.bucketName).toEqual(WORKAPP_A_BUCKET);
expect(tenantA.accessKeyId).toBeTypeofString();
expect(tenantA.secretAccessKey).toBeTypeofString();
expect(tenantA.useSsl).toEqual(false);
expect(tenantA.env.S3_BUCKET).toEqual(WORKAPP_A_BUCKET);
expect(tenantA.env.AWS_ACCESS_KEY_ID).toEqual(tenantA.accessKeyId);
});
tap.test('listBucketTenants returns scoped credential metadata without secrets', async () => {
const tenants = await testSmartStorageInstance.listBucketTenants();
expect(tenants.length).toEqual(2);
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_A_BUCKET)).toEqual(true);
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_B_BUCKET)).toEqual(true);
expect((tenants[0] as any).secretAccessKey).toEqual(undefined);
});
tap.test('tenant credentials work with AWS SDK v3 for their assigned bucket', async () => {
const putA = await tenantAClient.send(new PutObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
Body: 'hello from tenant a',
ContentType: 'text/plain',
}));
expect(putA.$metadata.httpStatusCode).toEqual(200);
const putB = await tenantBClient.send(new PutObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
Body: 'hello from tenant b',
ContentType: 'text/plain',
}));
expect(putB.$metadata.httpStatusCode).toEqual(200);
const getA = await tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(getA.Body as Readable)).toEqual('hello from tenant a');
const listA = await tenantAClient.send(new ListObjectsV2Command({
Bucket: WORKAPP_A_BUCKET,
}));
expect(listA.Contents?.some((object) => object.Key === 'hello.txt')).toEqual(true);
});
tap.test('tenant credentials cannot access unrelated buckets', async () => {
await expect(tenantAClient.send(new ListBucketsCommand({}))).rejects.toThrow();
await expect(tenantAClient.send(new HeadBucketCommand({
Bucket: WORKAPP_B_BUCKET,
}))).rejects.toThrow();
await expect(tenantAClient.send(new PutObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'blocked-write.txt',
Body: 'blocked',
}))).rejects.toThrow();
await expect(tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
}))).rejects.toThrow();
await expect(tenantAClient.send(new DeleteObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
}))).rejects.toThrow();
await expect(tenantAClient.send(new CopyObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'copy-from-other-bucket.txt',
CopySource: `/${WORKAPP_B_BUCKET}/other.txt`,
}))).rejects.toThrow();
await expect(tenantBClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}))).rejects.toThrow();
});
tap.test('health and metrics expose running storage state', async () => {
const health = await testSmartStorageInstance.getHealth();
expect(health.running).toEqual(true);
expect(health.ok).toEqual(true);
expect(health.storageDirectory).toEqual(STORAGE_DIR);
expect(health.auth.enabled).toEqual(true);
expect(health.auth.tenantCredentialCount).toEqual(2);
expect(health.bucketCount >= 2).toEqual(true);
expect(health.objectCount >= 2).toEqual(true);
expect(health.totalBytes > 0).toEqual(true);
const metrics = await testSmartStorageInstance.getMetrics();
expect(metrics.tenantCredentialCount).toEqual(2);
expect(metrics.prometheusText).toMatch(/smartstorage_tenant_credentials_total 2/);
});
tap.test('export/import targets one bucket without unrelated tenant data', async () => {
const bucketExport = await testSmartStorageInstance.exportBucket({
bucketName: WORKAPP_A_BUCKET,
});
expect(bucketExport.format).toEqual('smartstorage.bucket.v1');
expect(bucketExport.bucketName).toEqual(WORKAPP_A_BUCKET);
expect(bucketExport.objects.some((object) => object.key === 'hello.txt')).toEqual(true);
expect(bucketExport.objects.some((object) => object.key === 'other.txt')).toEqual(false);
await testSmartStorageInstance.importBucket({
bucketName: RESTORE_BUCKET,
source: bucketExport,
});
const restoredObject = await adminClient.send(new GetObjectCommand({
Bucket: RESTORE_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(restoredObject.Body as Readable)).toEqual('hello from tenant a');
const restoredObjects = await adminClient.send(new ListObjectsV2Command({
Bucket: RESTORE_BUCKET,
}));
expect(restoredObjects.Contents?.some((object) => object.Key === 'other.txt')).toEqual(false);
});
tap.test('bucket policies persist across restart', async () => {
await testSmartStorageInstance.createBucket(POLICY_BUCKET);
const policy = JSON.stringify({
Version: '2012-10-17',
Statement: [{
Sid: 'TenantPolicyPersistence',
Effect: 'Allow',
Principal: { AWS: ADMIN_CREDENTIAL.accessKeyId },
Action: ['s3:GetBucketPolicy', 's3:PutBucketPolicy', 's3:ListBucket'],
Resource: `arn:aws:s3:::${POLICY_BUCKET}`,
}],
});
const response = await adminClient.send(new PutBucketPolicyCommand({
Bucket: POLICY_BUCKET,
Policy: policy,
}));
expect(response.$metadata.httpStatusCode).toEqual(204);
});
tap.test('credential rotation replaces the active tenant credential', async () => {
oldTenantAClient = tenantAClient;
tenantA = await testSmartStorageInstance.rotateBucketTenantCredentials({
bucketName: WORKAPP_A_BUCKET,
});
tenantAClient = createS3ClientFromDescriptor(tenantA);
await expect(oldTenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}))).rejects.toThrow();
const getA = await tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(getA.Body as Readable)).toEqual('hello from tenant a');
const descriptor = await testSmartStorageInstance.getBucketTenantDescriptor({
bucketName: WORKAPP_A_BUCKET,
});
expect(descriptor.accessKeyId).toEqual(tenantA.accessKeyId);
expect(descriptor.secretAccessKey).toEqual(tenantA.secretAccessKey);
});
tap.test('runtime credentials survive restart', async () => {
await testSmartStorageInstance.stop();
await startStorage();
const persistedTenantA = await testSmartStorageInstance.getBucketTenantDescriptor({
bucketName: WORKAPP_A_BUCKET,
});
expect(persistedTenantA.accessKeyId).toEqual(tenantA.accessKeyId);
expect(persistedTenantA.secretAccessKey).toEqual(tenantA.secretAccessKey);
tenantAClient = createS3ClientFromDescriptor(persistedTenantA);
const getA = await tenantAClient.send(new GetObjectCommand({
Bucket: WORKAPP_A_BUCKET,
Key: 'hello.txt',
}));
expect(await streamToString(getA.Body as Readable)).toEqual('hello from tenant a');
const tenants = await testSmartStorageInstance.listBucketTenants();
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_A_BUCKET)).toEqual(true);
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_B_BUCKET)).toEqual(true);
const policyResponse = await adminClient.send(new GetBucketPolicyCommand({
Bucket: POLICY_BUCKET,
}));
expect(policyResponse.Policy?.includes('TenantPolicyPersistence')).toEqual(true);
});
tap.test('deleteBucketTenant can revoke credentials and delete tenant buckets', async () => {
await testSmartStorageInstance.deleteBucketTenant({
bucketName: WORKAPP_B_BUCKET,
accessKeyId: tenantB.accessKeyId,
});
await expect(tenantBClient.send(new GetObjectCommand({
Bucket: WORKAPP_B_BUCKET,
Key: 'other.txt',
}))).rejects.toThrow();
const headAfterRevoke = await adminClient.send(new HeadBucketCommand({
Bucket: WORKAPP_B_BUCKET,
}));
expect(headAfterRevoke.$metadata.httpStatusCode).toEqual(200);
await testSmartStorageInstance.deleteBucketTenant({
bucketName: WORKAPP_B_BUCKET,
});
await expect(adminClient.send(new HeadBucketCommand({
Bucket: WORKAPP_B_BUCKET,
}))).rejects.toThrow();
const tenants = await testSmartStorageInstance.listBucketTenants();
expect(tenants.some((tenant) => tenant.bucketName === WORKAPP_B_BUCKET)).toEqual(false);
});
tap.test('teardown: stop storage server', async () => {
await testSmartStorageInstance.stop();
});
export default tap.start();
+317
View File
@@ -0,0 +1,317 @@
/// <reference types="node" />
import { readFile, readdir, rm } from 'fs/promises';
import { join } from 'path';
import { expect, tap } from '@git.zone/tstest/tapbundle';
import { CreateBucketCommand, GetObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { Readable } from 'stream';
import * as smartstorage from '../ts/index.js';
const baseDir = join(process.cwd(), '.nogit', `cluster-multinode-${Date.now()}`);
const nodes: smartstorage.SmartStorage[] = [];
const makeDrivePaths = (nodeId: string) => {
return [1, 2].map((driveIndex) => join(baseDir, nodeId, `drive-${driveIndex}`));
};
const streamToString = async (stream: Readable): Promise<string> => {
const chunks: Buffer[] = [];
return new Promise((resolve, reject) => {
stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk)));
stream.on('error', reject);
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
});
};
const fileExistsBelow = async (directory: string, fileName: string): Promise<boolean> => {
let entries;
try {
entries = await readdir(directory, { withFileTypes: true });
} catch {
return false;
}
for (const entry of entries) {
const entryPath = join(directory, entry.name);
if (entry.isFile() && entry.name === fileName) {
return true;
}
if (entry.isDirectory() && await fileExistsBelow(entryPath, fileName)) {
return true;
}
}
return false;
};
const waitFor = async (check: () => Promise<boolean>, timeoutMs = 10000) => {
const deadline = Date.now() + timeoutMs;
let lastError = '';
while (Date.now() < deadline) {
try {
if (await check()) {
return;
}
} catch (error) {
lastError = error instanceof Error ? error.message : String(error);
}
await new Promise((resolve) => setTimeout(resolve, 250));
}
throw new Error(`Timed out waiting for cluster condition${lastError ? `: ${lastError}` : ''}`);
};
tap.test('setup: start three clustered storage nodes', async () => {
await rm(baseDir, { recursive: true, force: true });
const node1 = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3350,
silent: true,
},
storage: {
directory: join(baseDir, 'node-1', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-1',
quicPort: 4350,
seedNodes: [],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-1'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
nodes.push(node1);
await new Promise((resolve) => setTimeout(resolve, 500));
const node2 = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3351,
silent: true,
},
storage: {
directory: join(baseDir, 'node-2', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-2',
quicPort: 4351,
seedNodes: ['127.0.0.1:4350'],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-2'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
nodes.push(node2);
await new Promise((resolve) => setTimeout(resolve, 500));
const node3 = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3352,
silent: true,
},
storage: {
directory: join(baseDir, 'node-3', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-3',
quicPort: 4352,
seedNodes: ['127.0.0.1:4350'],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-3'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
nodes.push(node3);
});
tap.test('seed node should report joined peers and multi-node erasure topology', async () => {
const seed = nodes[0];
await waitFor(async () => {
const health = await seed.getClusterHealth();
if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) {
throw new Error(JSON.stringify(health));
}
return health.peers?.length === 2 && health.erasure?.erasureSetCount === 1;
});
const health = await seed.getClusterHealth();
const peerIds = health.peers!.map((peer) => peer.nodeId).sort();
expect(health.enabled).toEqual(true);
expect(health.nodeId).toEqual('node-1');
expect(health.quorumHealthy).toEqual(true);
expect(health.majorityHealthy).toEqual(true);
expect(peerIds).toEqual(['node-2', 'node-3']);
expect(health.erasure?.totalShards).toEqual(6);
expect(health.erasure?.erasureSetCount).toEqual(1);
});
tap.test('all nodes should converge to the same multi-node topology', async () => {
for (const node of nodes) {
await waitFor(async () => {
const health = await node.getClusterHealth();
if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) {
throw new Error(JSON.stringify(health));
}
return true;
});
}
});
tap.test('seed node should write shards to the declared remote drives', async () => {
const seed = nodes[0];
const descriptor = await seed.getStorageDescriptor();
const client = new S3Client({
endpoint: `http://${descriptor.endpoint}:${descriptor.port}`,
region: 'us-east-1',
credentials: {
accessKeyId: descriptor.accessKey,
secretAccessKey: descriptor.accessSecret,
},
forcePathStyle: true,
});
const bucket = 'multinode-bucket';
const key = 'distributed.txt';
const body = 'hello distributed shards';
await client.send(new CreateBucketCommand({ Bucket: bucket }));
await client.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: body }));
const getResponse = await client.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
expect(await streamToString(getResponse.Body as Readable)).toEqual(body);
const manifestPath = join(
baseDir,
'node-1',
'storage',
'.manifests',
bucket,
`${key}.manifest.json`,
);
const manifest = JSON.parse(await readFile(manifestPath, 'utf8')) as {
chunks: Array<{
shardPlacements: Array<{ shardIndex: number; nodeId: string; driveId: string }>;
}>;
};
const placements = manifest.chunks[0].shardPlacements;
expect(placements.length).toEqual(6);
expect(placements.some((placement) => placement.nodeId === 'node-2' && placement.driveId === '1'))
.toEqual(true);
expect(placements.some((placement) => placement.nodeId === 'node-3' && placement.driveId === '1'))
.toEqual(true);
for (const placement of placements) {
const drivePath = makeDrivePaths(placement.nodeId)[Number(placement.driveId)];
const shardFile = `shard-${placement.shardIndex}.dat`;
expect(await fileExistsBelow(join(drivePath, '.smartstorage', 'data'), shardFile)).toEqual(true);
}
});
tap.test('restarted peer should keep durable identity and rejoin topology', async () => {
await nodes[1].stop();
await new Promise((resolve) => setTimeout(resolve, 500));
nodes[1] = await smartstorage.SmartStorage.createAndStart({
server: {
address: '127.0.0.1',
port: 3351,
silent: true,
},
storage: {
directory: join(baseDir, 'node-2', 'storage'),
},
cluster: {
enabled: true,
nodeId: 'node-2',
quicPort: 4351,
seedNodes: ['127.0.0.1:4350'],
erasure: {
dataShards: 4,
parityShards: 2,
chunkSizeBytes: 1024 * 1024,
},
drives: {
paths: makeDrivePaths('node-2'),
},
heartbeatIntervalMs: 500,
heartbeatTimeoutMs: 3000,
},
});
await waitFor(async () => {
const health = await nodes[1].getClusterHealth();
if (health.nodeId !== 'node-2' || health.peers?.length !== 2) {
throw new Error(JSON.stringify(health));
}
return true;
});
const identityPath = join(
baseDir,
'node-2',
'storage',
'.smartstorage',
'cluster',
'identity.json',
);
const topologyPath = join(
baseDir,
'node-2',
'storage',
'.smartstorage',
'cluster',
'topology.json',
);
const identity = JSON.parse(await readFile(identityPath, 'utf8')) as {
nodeId: string;
clusterId: string;
};
const topology = JSON.parse(await readFile(topologyPath, 'utf8')) as {
cluster_id: string;
nodes: Array<{ node_id: string }>;
};
expect(identity.nodeId).toEqual('node-2');
expect(identity.clusterId).toEqual(topology.cluster_id);
expect(topology.nodes.some((node) => node.node_id === 'node-1')).toEqual(true);
expect(topology.nodes.some((node) => node.node_id === 'node-3')).toEqual(true);
});
tap.test('teardown: stop clustered nodes and clean files', async () => {
for (const node of nodes.reverse()) {
await node.stop();
}
await rm(baseDir, { recursive: true, force: true });
});
export default tap.start();
+2 -2
View File
@@ -65,11 +65,11 @@ tap.test('startup credentials authenticate successfully', async () => {
expect(response.$metadata.httpStatusCode).toEqual(200);
});
tap.test('listCredentials returns the active startup credential set', async () => {
tap.test('listCredentials returns active credential metadata without secrets', async () => {
const credentials = await testSmartStorageInstance.listCredentials();
expect(credentials.length).toEqual(1);
expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId);
expect(credentials[0].secretAccessKey).toEqual(INITIAL_CREDENTIAL.secretAccessKey);
expect((credentials[0] as any).secretAccessKey).toEqual(undefined);
});
tap.test('invalid replacement input fails cleanly and leaves old credentials active', async () => {
+50
View File
@@ -0,0 +1,50 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartstorage from '../ts/index.js';
const TEST_PORT = 3353;
let testSmartStorageInstance: smartstorage.SmartStorage;
tap.test('setup: start storage server for operational endpoint checks', async () => {
testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({
server: {
port: TEST_PORT,
silent: true,
region: 'us-east-1',
},
storage: {
cleanSlate: true,
},
auth: {
enabled: false,
credentials: [],
},
});
});
tap.test('operational endpoints expose live ready health and metrics', async () => {
const live = await fetch(`http://localhost:${TEST_PORT}/-/live`);
expect(live.status).toEqual(200);
expect((await live.json()).status).toEqual('alive');
const ready = await fetch(`http://localhost:${TEST_PORT}/-/ready`);
expect(ready.status).toEqual(200);
expect((await ready.json()).status).toEqual('ready');
const health = await fetch(`http://localhost:${TEST_PORT}/-/health`);
expect(health.status).toEqual(200);
const healthBody = await health.json();
expect(healthBody.ok).toEqual(true);
expect(healthBody.cluster.enabled).toEqual(false);
const metrics = await fetch(`http://localhost:${TEST_PORT}/-/metrics`);
expect(metrics.status).toEqual(200);
const metricsBody = await metrics.text();
expect(metricsBody.includes('smartstorage_requests_total')).toEqual(true);
expect(metricsBody.includes('smartstorage_cluster_enabled 0')).toEqual(true);
});
tap.test('teardown: stop storage server', async () => {
await testSmartStorageInstance.stop();
});
export default tap.start();
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartstorage',
version: '6.3.3',
version: '6.5.0',
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
}
+301 -2
View File
@@ -7,6 +7,14 @@ import * as paths from './paths.js';
export interface IStorageCredential {
accessKeyId: string;
secretAccessKey: string;
bucketName?: string;
region?: string;
}
export interface IStorageCredentialMetadata {
accessKeyId: string;
bucketName?: string;
region?: string;
}
/**
@@ -151,6 +159,88 @@ export interface IStorageStats {
storageLocations?: IStorageLocationSummary[];
}
export interface IBucketTenantInput {
bucketName: string;
accessKeyId?: string;
secretAccessKey?: string;
region?: string;
}
export interface IDeleteBucketTenantInput {
bucketName: string;
accessKeyId?: string;
}
export interface IBucketTenantMetadata {
bucketName: string;
accessKeyId: string;
region?: string;
}
export interface IBucketTenantDescriptor extends plugins.tsclass.storage.IS3Descriptor {
endpoint: string;
port: number;
region: string;
bucket: string;
bucketName: string;
accessKeyId: string;
secretAccessKey: string;
accessKey: string;
accessSecret: string;
useSsl: boolean;
ssl: boolean;
env: Record<string, string>;
}
export interface IBucketExportObject {
key: string;
size: number;
md5: string;
metadata: Record<string, string>;
dataHex: string;
}
export interface IBucketExport {
format: 'smartstorage.bucket.v1';
bucketName: string;
exportedAt: number;
objects: IBucketExportObject[];
}
export interface IExportBucketInput {
bucketName: string;
}
export interface IImportBucketInput {
bucketName: string;
source: IBucketExport;
}
export interface ISmartStorageHealth {
ok: boolean;
running: boolean;
storageDirectory: string;
auth: {
enabled: boolean;
credentialCount: number;
tenantCredentialCount: number;
};
bucketCount: number;
objectCount: number;
totalBytes: number;
cluster: IClusterHealth;
}
export interface ISmartStorageMetrics {
bucketCount: number;
objectCount: number;
totalBytes: number;
authCredentialCount: number;
tenantCredentialCount: number;
clusterEnabled: boolean;
prometheusText: string;
}
/**
* Known peer status from the local node's current cluster view.
*/
@@ -302,6 +392,14 @@ function mergeConfig(userConfig: ISmartStorageConfig): Required<ISmartStorageCon
} as Required<ISmartStorageConfig>;
}
function createAccessKeyId(): string {
return `SS${plugins.crypto.randomBytes(10).toString('hex').toUpperCase()}`;
}
function createSecretAccessKey(): string {
return plugins.crypto.randomBytes(32).toString('hex');
}
/**
* IPC command type map for RustBridge
*/
@@ -309,9 +407,38 @@ type TRustStorageCommands = {
start: { params: { config: Required<ISmartStorageConfig> }; result: {} };
stop: { params: {}; result: {} };
createBucket: { params: { name: string }; result: {} };
createBucketTenant: {
params: {
bucketName: string;
accessKeyId: string;
secretAccessKey: string;
region?: string;
};
result: IStorageCredential;
};
deleteBucketTenant: {
params: { bucketName: string; accessKeyId?: string };
result: {};
};
rotateBucketTenantCredentials: {
params: {
bucketName: string;
accessKeyId: string;
secretAccessKey: string;
region?: string;
};
result: IStorageCredential;
};
listBucketTenants: { params: {}; result: IBucketTenantMetadata[] };
getBucketTenantCredential: {
params: { bucketName: string };
result: IStorageCredential;
};
exportBucket: { params: { bucketName: string }; result: IBucketExport };
importBucket: { params: { bucketName: string; source: IBucketExport }; result: {} };
getStorageStats: { params: {}; result: IStorageStats };
listBucketSummaries: { params: {}; result: IBucketSummary[] };
listCredentials: { params: {}; result: IStorageCredential[] };
listCredentials: { params: {}; result: IStorageCredentialMetadata[] };
replaceCredentials: { params: { credentials: IStorageCredential[] }; result: {} };
getClusterHealth: { params: {}; result: IClusterHealth };
};
@@ -330,6 +457,7 @@ export class SmartStorage {
// INSTANCE
public config: Required<ISmartStorageConfig>;
private bridge: InstanceType<typeof plugins.RustBridge<TRustStorageCommands>>;
private running = false;
constructor(configArg: ISmartStorageConfig = {}) {
this.config = mergeConfig(configArg);
@@ -349,6 +477,7 @@ export class SmartStorage {
throw new Error('Failed to spawn ruststorage binary. Make sure it is compiled (pnpm build).');
}
await this.bridge.sendCommand('start', { config: this.config });
this.running = true;
if (!this.config.server.silent) {
console.log('storage server is running');
@@ -378,11 +507,110 @@ export class SmartStorage {
};
}
private getEndpoint(): string {
return this.config.server.address === '0.0.0.0' ? 'localhost' : this.config.server.address!;
}
private buildBucketTenantDescriptor(
credential: IStorageCredential,
bucketNameArg: string,
): IBucketTenantDescriptor {
const bucketName = credential.bucketName || bucketNameArg;
const region = credential.region || this.config.server.region || 'us-east-1';
const endpoint = this.getEndpoint();
const port = this.config.server.port!;
const useSsl = false;
return {
endpoint,
port,
region,
bucket: bucketName,
bucketName,
accessKeyId: credential.accessKeyId,
secretAccessKey: credential.secretAccessKey,
accessKey: credential.accessKeyId,
accessSecret: credential.secretAccessKey,
useSsl,
ssl: useSsl,
env: {
S3_ENDPOINT: endpoint,
S3_PORT: String(port),
S3_REGION: region,
S3_BUCKET: bucketName,
S3_ACCESS_KEY_ID: credential.accessKeyId,
S3_SECRET_ACCESS_KEY: credential.secretAccessKey,
S3_USE_SSL: String(useSsl),
AWS_ACCESS_KEY_ID: credential.accessKeyId,
AWS_SECRET_ACCESS_KEY: credential.secretAccessKey,
AWS_REGION: region,
},
};
}
private assertTenantAuthEnabled(): void {
if (!this.config.auth.enabled) {
throw new Error('Bucket tenant APIs require auth.enabled=true.');
}
}
public async createBucket(bucketNameArg: string) {
await this.bridge.sendCommand('createBucket', { name: bucketNameArg });
return { name: bucketNameArg };
}
public async createBucketTenant(
tenantArg: IBucketTenantInput,
): Promise<IBucketTenantDescriptor> {
this.assertTenantAuthEnabled();
const credential = await this.bridge.sendCommand('createBucketTenant', {
bucketName: tenantArg.bucketName,
accessKeyId: tenantArg.accessKeyId || createAccessKeyId(),
secretAccessKey: tenantArg.secretAccessKey || createSecretAccessKey(),
region: tenantArg.region || this.config.server.region,
});
return this.buildBucketTenantDescriptor(credential, tenantArg.bucketName);
}
public async deleteBucketTenant(tenantArg: IDeleteBucketTenantInput): Promise<void> {
this.assertTenantAuthEnabled();
await this.bridge.sendCommand('deleteBucketTenant', tenantArg);
}
public async rotateBucketTenantCredentials(
tenantArg: IBucketTenantInput,
): Promise<IBucketTenantDescriptor> {
this.assertTenantAuthEnabled();
const credential = await this.bridge.sendCommand('rotateBucketTenantCredentials', {
bucketName: tenantArg.bucketName,
accessKeyId: tenantArg.accessKeyId || createAccessKeyId(),
secretAccessKey: tenantArg.secretAccessKey || createSecretAccessKey(),
region: tenantArg.region || this.config.server.region,
});
return this.buildBucketTenantDescriptor(credential, tenantArg.bucketName);
}
public async listBucketTenants(): Promise<IBucketTenantMetadata[]> {
return this.bridge.sendCommand('listBucketTenants', {});
}
public async getBucketTenantDescriptor(optionsArg: {
bucketName: string;
}): Promise<IBucketTenantDescriptor> {
const credential = await this.bridge.sendCommand('getBucketTenantCredential', {
bucketName: optionsArg.bucketName,
});
return this.buildBucketTenantDescriptor(credential, optionsArg.bucketName);
}
public async exportBucket(optionsArg: IExportBucketInput): Promise<IBucketExport> {
return this.bridge.sendCommand('exportBucket', { bucketName: optionsArg.bucketName });
}
public async importBucket(optionsArg: IImportBucketInput): Promise<void> {
await this.bridge.sendCommand('importBucket', optionsArg);
}
public async getStorageStats(): Promise<IStorageStats> {
return this.bridge.sendCommand('getStorageStats', {});
}
@@ -391,7 +619,7 @@ export class SmartStorage {
return this.bridge.sendCommand('listBucketSummaries', {});
}
public async listCredentials(): Promise<IStorageCredential[]> {
public async listCredentials(): Promise<IStorageCredentialMetadata[]> {
return this.bridge.sendCommand('listCredentials', {});
}
@@ -404,8 +632,79 @@ export class SmartStorage {
return this.bridge.sendCommand('getClusterHealth', {});
}
public async getHealth(): Promise<ISmartStorageHealth> {
if (!this.running) {
return {
ok: false,
running: false,
storageDirectory: this.config.storage.directory || paths.bucketsDir,
auth: {
enabled: this.config.auth.enabled,
credentialCount: this.config.auth.credentials.length,
tenantCredentialCount: 0,
},
bucketCount: 0,
objectCount: 0,
totalBytes: 0,
cluster: { enabled: false },
};
}
const [stats, credentials, tenants, cluster] = await Promise.all([
this.getStorageStats(),
this.listCredentials(),
this.listBucketTenants(),
this.getClusterHealth(),
]);
return {
ok: true,
running: true,
storageDirectory: stats.storageDirectory,
auth: {
enabled: this.config.auth.enabled,
credentialCount: credentials.length,
tenantCredentialCount: tenants.length,
},
bucketCount: stats.bucketCount,
objectCount: stats.totalObjectCount,
totalBytes: stats.totalStorageBytes,
cluster,
};
}
public async getMetrics(): Promise<ISmartStorageMetrics> {
const health = await this.getHealth();
const clusterEnabled = health.cluster.enabled;
return {
bucketCount: health.bucketCount,
objectCount: health.objectCount,
totalBytes: health.totalBytes,
authCredentialCount: health.auth.credentialCount,
tenantCredentialCount: health.auth.tenantCredentialCount,
clusterEnabled,
prometheusText: [
'# HELP smartstorage_buckets_total Runtime bucket count.',
'# TYPE smartstorage_buckets_total gauge',
`smartstorage_buckets_total ${health.bucketCount}`,
'# HELP smartstorage_objects_total Runtime object count.',
'# TYPE smartstorage_objects_total gauge',
`smartstorage_objects_total ${health.objectCount}`,
'# HELP smartstorage_storage_bytes_total Runtime storage bytes.',
'# TYPE smartstorage_storage_bytes_total gauge',
`smartstorage_storage_bytes_total ${health.totalBytes}`,
'# HELP smartstorage_tenant_credentials_total Scoped bucket tenant credential count.',
'# TYPE smartstorage_tenant_credentials_total gauge',
`smartstorage_tenant_credentials_total ${health.auth.tenantCredentialCount}`,
'# HELP smartstorage_cluster_enabled Cluster mode enabled.',
'# TYPE smartstorage_cluster_enabled gauge',
`smartstorage_cluster_enabled ${clusterEnabled ? 1 : 0}`,
].join('\n'),
};
}
public async stop() {
await this.bridge.sendCommand('stop', {});
this.bridge.kill();
this.running = false;
}
}
+2 -1
View File
@@ -1,7 +1,8 @@
// node native
import * as crypto from 'crypto';
import * as path from 'path';
export { path };
export { crypto, path };
// @push.rocks scope
import * as smartpath from '@push.rocks/smartpath';
+1
View File
@@ -6,6 +6,7 @@
"esModuleInterop": true,
"verbatimModuleSyntax": true,
"types": ["node"],
"noImplicitAny": true,
"ignoreDeprecations": "6.0",
"baseUrl": ".",
"paths": {}