Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 53d663597a | |||
| 440197ccf3 | |||
| c8d3ed79aa | |||
| a31e477359 |
+1
-1
@@ -44,7 +44,7 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"@git.zone/tsdoc": {
|
"@git.zone/tsdoc": {
|
||||||
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code that is licensed under the MIT License. A copy of the MIT License can be found in the [license](license) file within this repository. \n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH and are not included within the scope of the MIT license granted herein. Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines, and any usage must be approved in writing by Task Venture Capital GmbH.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or if you require further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
|
"legal": "\n## License and Legal Information\n\nThis repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.\n\n**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.\n\n### Trademarks\n\nThis project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.\n\nUse of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.\n\n### Company Information\n\nTask Venture Capital GmbH \nRegistered at District Court Bremen HRB 35230 HB, Germany\n\nFor any legal inquiries or further information, please contact us via email at hello@task.vc.\n\nBy using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.\n"
|
||||||
},
|
},
|
||||||
"@ship.zone/szci": {
|
"@ship.zone/szci": {
|
||||||
"npmGlobalTools": []
|
"npmGlobalTools": []
|
||||||
|
|||||||
@@ -1,5 +1,22 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-04-30 - 6.4.1 - fix(build)
|
||||||
|
tighten TypeScript compiler settings and refresh package metadata
|
||||||
|
|
||||||
|
- enable noImplicitAny in tsconfig and align the build script with strict compilation
|
||||||
|
- update package metadata including author, repository URL, and pnpm version
|
||||||
|
- bump dependency versions for @aws-sdk/client-s3 and @tsclass/tsclass
|
||||||
|
- refresh README hints and legal text to match the current package setup
|
||||||
|
|
||||||
|
## 2026-04-30 - 6.4.0 - feat(cluster,server,auth)
|
||||||
|
add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings
|
||||||
|
|
||||||
|
- persist cluster identity and topology snapshots under .smartstorage/cluster to support safer clustered restarts and seed-node joins
|
||||||
|
- add unauthenticated /-/live, /-/ready, /-/health, and /-/metrics endpoints with basic request and storage metrics
|
||||||
|
- route clustered shard read/write/delete/head operations by drive index and handle join, heartbeat, and topology sync over QUIC
|
||||||
|
- change runtime credential listing to return access-key metadata only, excluding secretAccessKey values
|
||||||
|
- add tests for operational endpoints and multi-node cluster persistence and recovery behavior
|
||||||
|
|
||||||
## 2026-04-19 - 6.3.3 - fix(build)
|
## 2026-04-19 - 6.3.3 - fix(build)
|
||||||
rename npmextra config to .smartconfig and refresh build metadata
|
rename npmextra config to .smartconfig and refresh build metadata
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
Copyright (c) 2021 Task Venture Capital GmbH (hello@task.vc)
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) Task Venture Capital GmbH
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
|||||||
+7
-10
@@ -1,21 +1,21 @@
|
|||||||
{
|
{
|
||||||
"name": "@push.rocks/smartstorage",
|
"name": "@push.rocks/smartstorage",
|
||||||
"version": "6.3.3",
|
"version": "6.4.1",
|
||||||
"private": false,
|
"private": false,
|
||||||
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
"description": "A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.",
|
||||||
"main": "dist_ts/index.js",
|
"main": "dist_ts/index.js",
|
||||||
"typings": "dist_ts/index.d.ts",
|
"typings": "dist_ts/index.d.ts",
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"author": "Lossless GmbH",
|
"author": "Task Venture Capital GmbH",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"test:before": "(tsrust)",
|
"test:before": "(tsrust)",
|
||||||
"test": "(tstest test/ --web --verbose --logfile --timeout 60)",
|
"test": "(tstest test/ --web --verbose --logfile --timeout 60)",
|
||||||
"build": "(tsrust && tsbuild tsfolders --allowimplicitany)",
|
"build": "(tsrust && tsbuild tsfolders)",
|
||||||
"buildDocs": "tsdoc"
|
"buildDocs": "tsdoc"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@aws-sdk/client-s3": "^3.1032.0",
|
"@aws-sdk/client-s3": "^3.1039.0",
|
||||||
"@git.zone/tsbuild": "^4.4.0",
|
"@git.zone/tsbuild": "^4.4.0",
|
||||||
"@git.zone/tsbundle": "^2.10.0",
|
"@git.zone/tsbundle": "^2.10.0",
|
||||||
"@git.zone/tsrun": "^2.0.2",
|
"@git.zone/tsrun": "^2.0.2",
|
||||||
@@ -43,7 +43,7 @@
|
|||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@push.rocks/smartpath": "^6.0.0",
|
"@push.rocks/smartpath": "^6.0.0",
|
||||||
"@push.rocks/smartrust": "^1.3.2",
|
"@push.rocks/smartrust": "^1.3.2",
|
||||||
"@tsclass/tsclass": "^9.5.0"
|
"@tsclass/tsclass": "^9.5.1"
|
||||||
},
|
},
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"smartstorage",
|
"smartstorage",
|
||||||
@@ -67,13 +67,10 @@
|
|||||||
"homepage": "https://code.foss.global/push.rocks/smartstorage#readme",
|
"homepage": "https://code.foss.global/push.rocks/smartstorage#readme",
|
||||||
"repository": {
|
"repository": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "ssh://git@code.foss.global:29419/push.rocks/smartstorage.git"
|
"url": "https://code.foss.global/push.rocks/smartstorage.git"
|
||||||
},
|
},
|
||||||
"bugs": {
|
"bugs": {
|
||||||
"url": "https://code.foss.global/push.rocks/smartstorage/issues"
|
"url": "https://code.foss.global/push.rocks/smartstorage/issues"
|
||||||
},
|
},
|
||||||
"packageManager": "pnpm@10.14.0+sha512.ad27a79641b49c3e481a16a805baa71817a04bbe06a38d17e60e2eaee83f6a146c6a688125f5792e48dd5ba30e7da52a5cda4c3992b9ccf333f9ce223af84748",
|
"packageManager": "pnpm@10.28.2"
|
||||||
"pnpm": {
|
|
||||||
"overrides": {}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Generated
+273
-259
File diff suppressed because it is too large
Load Diff
+15
-2
@@ -1,6 +1,6 @@
|
|||||||
# Project Hints for smartstorage
|
# Project Hints for smartstorage
|
||||||
|
|
||||||
## Current State (v6.0.0)
|
## Current State (v6.4.0)
|
||||||
|
|
||||||
- **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge
|
- **Rust-powered S3-compatible storage server** via `@push.rocks/smartrust` IPC bridge
|
||||||
- High-performance: streaming I/O, zero-copy, backpressure, range seek
|
- High-performance: streaming I/O, zero-copy, backpressure, range seek
|
||||||
@@ -14,6 +14,9 @@
|
|||||||
- Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans)
|
- Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans)
|
||||||
- Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state)
|
- Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state)
|
||||||
- Runtime credential listing and atomic replacement via the Rust bridge
|
- Runtime credential listing and atomic replacement via the Rust bridge
|
||||||
|
- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/`
|
||||||
|
- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics`
|
||||||
|
- Runtime credential listing returns access-key metadata only; secrets are write-only
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
@@ -47,6 +50,15 @@
|
|||||||
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
|
| `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set |
|
||||||
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
|
| `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode |
|
||||||
|
|
||||||
|
### Operational HTTP Endpoints
|
||||||
|
|
||||||
|
| Endpoint | Purpose |
|
||||||
|
|----------|---------|
|
||||||
|
| `GET /-/live` | Process liveness |
|
||||||
|
| `GET /-/ready` | S3 readiness and cluster quorum readiness |
|
||||||
|
| `GET /-/health` | JSON storage, cluster, and runtime health |
|
||||||
|
| `GET /-/metrics` | Prometheus text metrics |
|
||||||
|
|
||||||
### Storage Layout
|
### Storage Layout
|
||||||
- Objects: `{root}/{bucket}/{key}._storage_object`
|
- Objects: `{root}/{bucket}/{key}._storage_object`
|
||||||
- Metadata: `{root}/{bucket}/{key}._storage_object.metadata.json`
|
- Metadata: `{root}/{bucket}/{key}._storage_object.metadata.json`
|
||||||
@@ -56,7 +68,7 @@
|
|||||||
|
|
||||||
## Build
|
## Build
|
||||||
|
|
||||||
- `pnpm build` runs `tsrust && tsbuild tsfolders --allowimplicitany`
|
- `pnpm build` runs `tsrust && tsbuild tsfolders`
|
||||||
- `tsrust` compiles Rust to `dist_rust/ruststorage`
|
- `tsrust` compiles Rust to `dist_rust/ruststorage`
|
||||||
- Targets: linux_amd64, linux_arm64 (configured in .smartconfig.json)
|
- Targets: linux_amd64, linux_arm64 (configured in .smartconfig.json)
|
||||||
|
|
||||||
@@ -71,6 +83,7 @@
|
|||||||
|
|
||||||
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337)
|
- `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337)
|
||||||
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
|
- `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349)
|
||||||
|
- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353)
|
||||||
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
|
- `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348)
|
||||||
- `test/test.auth.node.ts` - Auth + bucket policy integration (20 tests, auth enabled, port 3344)
|
- `test/test.auth.node.ts` - Auth + bucket policy integration (20 tests, auth enabled, port 3344)
|
||||||
- `test/test.policy-crud.node.ts` - Policy API CRUD + validation edge cases (17 tests, port 3345)
|
- `test/test.policy-crud.node.ts` - Policy API CRUD + validation edge cases (17 tests, port 3345)
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
# @push.rocks/smartstorage
|
# @push.rocks/smartstorage
|
||||||
|
|
||||||
A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just `npm install` and go. 🚀
|
A high-performance, S3-compatible storage server powered by a **Rust core** with a clean TypeScript API. Runs standalone for dev/test — or scales out as a **distributed, erasure-coded cluster** with QUIC-based inter-node communication. No cloud, no Docker. Just install the package and go. 🚀
|
||||||
|
|
||||||
## Issue Reporting and Security
|
## Issue Reporting and Security
|
||||||
|
|
||||||
|
|||||||
+15
-2
@@ -176,6 +176,12 @@ pub struct RuntimeCredentialStore {
|
|||||||
credentials: RwLock<Vec<Credential>>,
|
credentials: RwLock<Vec<Credential>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct CredentialMetadata {
|
||||||
|
pub access_key_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
impl RuntimeCredentialStore {
|
impl RuntimeCredentialStore {
|
||||||
pub fn new(config: &AuthConfig) -> Self {
|
pub fn new(config: &AuthConfig) -> Self {
|
||||||
Self {
|
Self {
|
||||||
@@ -188,8 +194,15 @@ impl RuntimeCredentialStore {
|
|||||||
self.enabled
|
self.enabled
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_credentials(&self) -> Vec<Credential> {
|
pub async fn list_credentials(&self) -> Vec<CredentialMetadata> {
|
||||||
self.credentials.read().await.clone()
|
self.credentials
|
||||||
|
.read()
|
||||||
|
.await
|
||||||
|
.iter()
|
||||||
|
.map(|credential| CredentialMetadata {
|
||||||
|
access_key_id: credential.access_key_id.clone(),
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn snapshot_credentials(&self) -> Vec<Credential> {
|
pub async fn snapshot_credentials(&self) -> Vec<Credential> {
|
||||||
|
|||||||
@@ -408,6 +408,7 @@ impl DistributedStore {
|
|||||||
key,
|
key,
|
||||||
chunk.chunk_index,
|
chunk.chunk_index,
|
||||||
placement.shard_index,
|
placement.shard_index,
|
||||||
|
placement.drive_id.parse::<u32>().unwrap_or(0),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@@ -930,6 +931,7 @@ impl DistributedStore {
|
|||||||
&part_info.part_key,
|
&part_info.part_key,
|
||||||
chunk.chunk_index,
|
chunk.chunk_index,
|
||||||
placement.shard_index,
|
placement.shard_index,
|
||||||
|
placement.drive_id.parse::<u32>().unwrap_or(0),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
@@ -1271,6 +1273,7 @@ impl DistributedStore {
|
|||||||
key,
|
key,
|
||||||
chunk_index,
|
chunk_index,
|
||||||
shard_idx as u32,
|
shard_idx as u32,
|
||||||
|
drive.drive_index,
|
||||||
shard_data,
|
shard_data,
|
||||||
checksum,
|
checksum,
|
||||||
)
|
)
|
||||||
@@ -1330,6 +1333,7 @@ impl DistributedStore {
|
|||||||
key: &str,
|
key: &str,
|
||||||
chunk_index: u32,
|
chunk_index: u32,
|
||||||
shard_index: u32,
|
shard_index: u32,
|
||||||
|
drive_index: u32,
|
||||||
data: &[u8],
|
data: &[u8],
|
||||||
checksum: u32,
|
checksum: u32,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
@@ -1348,6 +1352,7 @@ impl DistributedStore {
|
|||||||
key: key.to_string(),
|
key: key.to_string(),
|
||||||
chunk_index,
|
chunk_index,
|
||||||
shard_index,
|
shard_index,
|
||||||
|
drive_index,
|
||||||
shard_data_length: data.len() as u64,
|
shard_data_length: data.len() as u64,
|
||||||
checksum,
|
checksum,
|
||||||
object_metadata: HashMap::new(),
|
object_metadata: HashMap::new(),
|
||||||
@@ -1417,6 +1422,7 @@ impl DistributedStore {
|
|||||||
key,
|
key,
|
||||||
chunk.chunk_index,
|
chunk.chunk_index,
|
||||||
placement.shard_index,
|
placement.shard_index,
|
||||||
|
placement.drive_id.parse::<u32>().unwrap_or(0),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.ok()
|
.ok()
|
||||||
@@ -1448,6 +1454,7 @@ impl DistributedStore {
|
|||||||
key: &str,
|
key: &str,
|
||||||
chunk_index: u32,
|
chunk_index: u32,
|
||||||
shard_index: u32,
|
shard_index: u32,
|
||||||
|
drive_index: u32,
|
||||||
) -> Result<(Vec<u8>, u32)> {
|
) -> Result<(Vec<u8>, u32)> {
|
||||||
let node_info = self
|
let node_info = self
|
||||||
.state
|
.state
|
||||||
@@ -1464,6 +1471,7 @@ impl DistributedStore {
|
|||||||
key: key.to_string(),
|
key: key.to_string(),
|
||||||
chunk_index,
|
chunk_index,
|
||||||
shard_index,
|
shard_index,
|
||||||
|
drive_index,
|
||||||
});
|
});
|
||||||
|
|
||||||
match self.transport.send_shard_read(&conn, &request).await? {
|
match self.transport.send_shard_read(&conn, &request).await? {
|
||||||
@@ -1479,6 +1487,7 @@ impl DistributedStore {
|
|||||||
key: &str,
|
key: &str,
|
||||||
chunk_index: u32,
|
chunk_index: u32,
|
||||||
shard_index: u32,
|
shard_index: u32,
|
||||||
|
drive_index: u32,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let node_info = self
|
let node_info = self
|
||||||
.state
|
.state
|
||||||
@@ -1495,6 +1504,7 @@ impl DistributedStore {
|
|||||||
key: key.to_string(),
|
key: key.to_string(),
|
||||||
chunk_index,
|
chunk_index,
|
||||||
shard_index,
|
shard_index,
|
||||||
|
drive_index,
|
||||||
});
|
});
|
||||||
|
|
||||||
let _response = self.transport.send_request(&conn, &request).await?;
|
let _response = self.transport.send_request(&conn, &request).await?;
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ pub struct MembershipManager {
|
|||||||
state: Arc<ClusterState>,
|
state: Arc<ClusterState>,
|
||||||
transport: Arc<QuicTransport>,
|
transport: Arc<QuicTransport>,
|
||||||
heartbeat_interval: Duration,
|
heartbeat_interval: Duration,
|
||||||
|
heartbeat_timeout: Duration,
|
||||||
local_node_info: NodeInfo,
|
local_node_info: NodeInfo,
|
||||||
drive_manager: Option<Arc<Mutex<DriveManager>>>,
|
drive_manager: Option<Arc<Mutex<DriveManager>>>,
|
||||||
}
|
}
|
||||||
@@ -27,12 +28,14 @@ impl MembershipManager {
|
|||||||
state: Arc<ClusterState>,
|
state: Arc<ClusterState>,
|
||||||
transport: Arc<QuicTransport>,
|
transport: Arc<QuicTransport>,
|
||||||
heartbeat_interval_ms: u64,
|
heartbeat_interval_ms: u64,
|
||||||
|
heartbeat_timeout_ms: u64,
|
||||||
local_node_info: NodeInfo,
|
local_node_info: NodeInfo,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
state,
|
state,
|
||||||
transport,
|
transport,
|
||||||
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
|
heartbeat_interval: Duration::from_millis(heartbeat_interval_ms),
|
||||||
|
heartbeat_timeout: Duration::from_millis(heartbeat_timeout_ms),
|
||||||
local_node_info,
|
local_node_info,
|
||||||
drive_manager: None,
|
drive_manager: None,
|
||||||
}
|
}
|
||||||
@@ -46,7 +49,7 @@ impl MembershipManager {
|
|||||||
|
|
||||||
/// Join the cluster by contacting seed nodes.
|
/// Join the cluster by contacting seed nodes.
|
||||||
/// Sends a JoinRequest to each seed node until one accepts.
|
/// Sends a JoinRequest to each seed node until one accepts.
|
||||||
pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> {
|
pub async fn join_cluster(&self, seed_nodes: &[String], allow_bootstrap_on_failure: bool) -> Result<()> {
|
||||||
if seed_nodes.is_empty() {
|
if seed_nodes.is_empty() {
|
||||||
tracing::info!("No seed nodes configured, starting as initial cluster node");
|
tracing::info!("No seed nodes configured, starting as initial cluster node");
|
||||||
self.state.add_node(self.local_node_info.clone()).await;
|
self.state.add_node(self.local_node_info.clone()).await;
|
||||||
@@ -75,10 +78,13 @@ impl MembershipManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no seed responded, start as a new cluster
|
if allow_bootstrap_on_failure {
|
||||||
tracing::info!("Could not reach any seed nodes, starting as initial cluster node");
|
tracing::warn!("Could not reach any seed nodes, bootstrapping a new cluster because no persisted topology exists");
|
||||||
self.state.add_node(self.local_node_info.clone()).await;
|
self.state.add_node(self.local_node_info.clone()).await;
|
||||||
Ok(())
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
anyhow::bail!("Could not reach any configured seed nodes; refusing unsafe cluster bootstrap")
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_join(&self, addr: SocketAddr) -> Result<()> {
|
async fn try_join(&self, addr: SocketAddr) -> Result<()> {
|
||||||
@@ -97,9 +103,14 @@ impl MembershipManager {
|
|||||||
ClusterResponse::JoinResponse(join_resp) => {
|
ClusterResponse::JoinResponse(join_resp) => {
|
||||||
if join_resp.accepted {
|
if join_resp.accepted {
|
||||||
if let Some(topology) = &join_resp.topology {
|
if let Some(topology) = &join_resp.topology {
|
||||||
|
let topology_contains_self = topology
|
||||||
|
.nodes
|
||||||
|
.iter()
|
||||||
|
.any(|node| node.node_id == self.local_node_info.node_id);
|
||||||
self.state.apply_topology(topology).await;
|
self.state.apply_topology(topology).await;
|
||||||
// Also register self
|
if !topology_contains_self {
|
||||||
self.state.add_node(self.local_node_info.clone()).await;
|
self.state.add_node(self.local_node_info.clone()).await;
|
||||||
|
}
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Applied cluster topology (version {}, {} nodes, {} erasure sets)",
|
"Applied cluster topology (version {}, {} nodes, {} erasure sets)",
|
||||||
topology.version,
|
topology.version,
|
||||||
@@ -137,7 +148,13 @@ impl MembershipManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn send_heartbeats(&self) {
|
async fn send_heartbeats(&self) {
|
||||||
let peers = self.state.online_peers().await;
|
let peers = self
|
||||||
|
.state
|
||||||
|
.all_nodes()
|
||||||
|
.await
|
||||||
|
.into_iter()
|
||||||
|
.filter(|node| node.info.node_id != self.local_node_info.node_id)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
let topology_version = self.state.version().await;
|
let topology_version = self.state.version().await;
|
||||||
let mut responded = Vec::new();
|
let mut responded = Vec::new();
|
||||||
|
|
||||||
@@ -145,7 +162,7 @@ impl MembershipManager {
|
|||||||
let drive_states = self.collect_drive_states().await;
|
let drive_states = self.collect_drive_states().await;
|
||||||
|
|
||||||
for peer in &peers {
|
for peer in &peers {
|
||||||
let addr: SocketAddr = match peer.quic_addr.parse() {
|
let addr: SocketAddr = match peer.info.quic_addr.parse() {
|
||||||
Ok(a) => a,
|
Ok(a) => a,
|
||||||
Err(_) => continue,
|
Err(_) => continue,
|
||||||
};
|
};
|
||||||
@@ -158,23 +175,23 @@ impl MembershipManager {
|
|||||||
});
|
});
|
||||||
|
|
||||||
match tokio::time::timeout(
|
match tokio::time::timeout(
|
||||||
Duration::from_secs(5),
|
self.heartbeat_timeout,
|
||||||
self.send_heartbeat_to_peer(&peer.node_id, addr, &heartbeat),
|
self.send_heartbeat_to_peer(&peer.info.node_id, addr, &heartbeat),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(Ok(())) => {
|
Ok(Ok(())) => {
|
||||||
responded.push(peer.node_id.clone());
|
responded.push(peer.info.node_id.clone());
|
||||||
}
|
}
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
peer = %peer.node_id,
|
peer = %peer.info.node_id,
|
||||||
error = %e,
|
error = %e,
|
||||||
"Heartbeat failed"
|
"Heartbeat failed"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
tracing::debug!(peer = %peer.node_id, "Heartbeat timed out");
|
tracing::debug!(peer = %peer.info.node_id, "Heartbeat timed out");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ pub mod erasure;
|
|||||||
pub mod healing;
|
pub mod healing;
|
||||||
pub mod membership;
|
pub mod membership;
|
||||||
pub mod metadata;
|
pub mod metadata;
|
||||||
|
pub mod persistence;
|
||||||
pub mod placement;
|
pub mod placement;
|
||||||
pub mod protocol;
|
pub mod protocol;
|
||||||
pub mod quic_transport;
|
pub mod quic_transport;
|
||||||
|
|||||||
@@ -0,0 +1,77 @@
|
|||||||
|
use anyhow::Result;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use tokio::fs;
|
||||||
|
|
||||||
|
use super::protocol::ClusterTopology;
|
||||||
|
|
||||||
|
const CLUSTER_METADATA_DIR: &str = ".smartstorage/cluster";
|
||||||
|
const IDENTITY_FILE: &str = "identity.json";
|
||||||
|
const TOPOLOGY_FILE: &str = "topology.json";
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ClusterIdentity {
|
||||||
|
pub schema_version: u32,
|
||||||
|
pub node_id: String,
|
||||||
|
pub cluster_id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClusterIdentity {
|
||||||
|
pub fn new(node_id: String, cluster_id: String) -> Self {
|
||||||
|
Self {
|
||||||
|
schema_version: 1,
|
||||||
|
node_id,
|
||||||
|
cluster_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cluster_metadata_dir(storage_directory: &str) -> PathBuf {
|
||||||
|
PathBuf::from(storage_directory).join(CLUSTER_METADATA_DIR)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn identity_path(metadata_dir: &Path) -> PathBuf {
|
||||||
|
metadata_dir.join(IDENTITY_FILE)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn topology_path(metadata_dir: &Path) -> PathBuf {
|
||||||
|
metadata_dir.join(TOPOLOGY_FILE)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_identity(path: &Path) -> Result<Option<ClusterIdentity>> {
|
||||||
|
match fs::read_to_string(path).await {
|
||||||
|
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
|
||||||
|
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
|
||||||
|
Err(error) => Err(error.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn persist_identity(path: &Path, identity: &ClusterIdentity) -> Result<()> {
|
||||||
|
write_json_atomic(path, identity).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_topology(path: &Path) -> Result<Option<ClusterTopology>> {
|
||||||
|
match fs::read_to_string(path).await {
|
||||||
|
Ok(content) => Ok(Some(serde_json::from_str(&content)?)),
|
||||||
|
Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None),
|
||||||
|
Err(error) => Err(error.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn persist_topology(path: &Path, topology: &ClusterTopology) -> Result<()> {
|
||||||
|
write_json_atomic(path, topology).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_json_atomic<T: Serialize>(path: &Path, value: &T) -> Result<()> {
|
||||||
|
let parent = path
|
||||||
|
.parent()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Cluster metadata path has no parent"))?;
|
||||||
|
fs::create_dir_all(parent).await?;
|
||||||
|
|
||||||
|
let temp_path = path.with_extension("json.tmp");
|
||||||
|
let content = serde_json::to_string_pretty(value)?;
|
||||||
|
fs::write(&temp_path, content).await?;
|
||||||
|
fs::rename(&temp_path, path).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -102,6 +102,7 @@ pub struct ShardWriteRequest {
|
|||||||
pub key: String,
|
pub key: String,
|
||||||
pub chunk_index: u32,
|
pub chunk_index: u32,
|
||||||
pub shard_index: u32,
|
pub shard_index: u32,
|
||||||
|
pub drive_index: u32,
|
||||||
pub shard_data_length: u64,
|
pub shard_data_length: u64,
|
||||||
pub checksum: u32, // crc32c of shard data
|
pub checksum: u32, // crc32c of shard data
|
||||||
pub object_metadata: HashMap<String, String>,
|
pub object_metadata: HashMap<String, String>,
|
||||||
@@ -121,6 +122,7 @@ pub struct ShardReadRequest {
|
|||||||
pub key: String,
|
pub key: String,
|
||||||
pub chunk_index: u32,
|
pub chunk_index: u32,
|
||||||
pub shard_index: u32,
|
pub shard_index: u32,
|
||||||
|
pub drive_index: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@@ -139,6 +141,7 @@ pub struct ShardDeleteRequest {
|
|||||||
pub key: String,
|
pub key: String,
|
||||||
pub chunk_index: u32,
|
pub chunk_index: u32,
|
||||||
pub shard_index: u32,
|
pub shard_index: u32,
|
||||||
|
pub drive_index: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@@ -154,6 +157,7 @@ pub struct ShardHeadRequest {
|
|||||||
pub key: String,
|
pub key: String,
|
||||||
pub chunk_index: u32,
|
pub chunk_index: u32,
|
||||||
pub shard_index: u32,
|
pub shard_index: u32,
|
||||||
|
pub drive_index: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use super::protocol::{
|
|||||||
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest,
|
||||||
};
|
};
|
||||||
use super::shard_store::{ShardId, ShardStore};
|
use super::shard_store::{ShardId, ShardStore};
|
||||||
|
use super::state::{ClusterState, NodeStatus};
|
||||||
|
|
||||||
/// QUIC transport layer for inter-node communication.
|
/// QUIC transport layer for inter-node communication.
|
||||||
///
|
///
|
||||||
@@ -26,11 +27,8 @@ impl QuicTransport {
|
|||||||
pub async fn new(bind_addr: SocketAddr, local_node_id: String) -> Result<Self> {
|
pub async fn new(bind_addr: SocketAddr, local_node_id: String) -> Result<Self> {
|
||||||
let (server_config, client_config) = Self::generate_tls_configs()?;
|
let (server_config, client_config) = Self::generate_tls_configs()?;
|
||||||
|
|
||||||
let endpoint = Endpoint::server(server_config, bind_addr)?;
|
let mut endpoint = Endpoint::server(server_config, bind_addr)?;
|
||||||
|
endpoint.set_default_client_config(client_config);
|
||||||
// Also configure the endpoint for client connections
|
|
||||||
let mut endpoint_client = endpoint.clone();
|
|
||||||
endpoint_client.set_default_client_config(client_config);
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
endpoint,
|
endpoint,
|
||||||
@@ -163,7 +161,8 @@ impl QuicTransport {
|
|||||||
/// Accept incoming connections and dispatch to the handler.
|
/// Accept incoming connections and dispatch to the handler.
|
||||||
pub async fn accept_loop(
|
pub async fn accept_loop(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
shard_store: Arc<ShardStore>,
|
shard_stores: Vec<Arc<ShardStore>>,
|
||||||
|
cluster_state: Option<Arc<ClusterState>>,
|
||||||
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
@@ -172,11 +171,12 @@ impl QuicTransport {
|
|||||||
match incoming {
|
match incoming {
|
||||||
Some(incoming_conn) => {
|
Some(incoming_conn) => {
|
||||||
let transport = self.clone();
|
let transport = self.clone();
|
||||||
let store = shard_store.clone();
|
let stores = shard_stores.clone();
|
||||||
|
let state = cluster_state.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match incoming_conn.await {
|
match incoming_conn.await {
|
||||||
Ok(conn) => {
|
Ok(conn) => {
|
||||||
transport.handle_connection(conn, store).await;
|
transport.handle_connection(conn, stores, state).await;
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("Failed to accept QUIC connection: {}", e);
|
tracing::error!("Failed to accept QUIC connection: {}", e);
|
||||||
@@ -194,16 +194,19 @@ impl QuicTransport {
|
|||||||
|
|
||||||
/// Handle a single QUIC connection (may have multiple streams).
|
/// Handle a single QUIC connection (may have multiple streams).
|
||||||
async fn handle_connection(
|
async fn handle_connection(
|
||||||
&self,
|
self: Arc<Self>,
|
||||||
conn: quinn::Connection,
|
conn: quinn::Connection,
|
||||||
shard_store: Arc<ShardStore>,
|
shard_stores: Vec<Arc<ShardStore>>,
|
||||||
|
cluster_state: Option<Arc<ClusterState>>,
|
||||||
) {
|
) {
|
||||||
loop {
|
loop {
|
||||||
match conn.accept_bi().await {
|
match conn.accept_bi().await {
|
||||||
Ok((send, recv)) => {
|
Ok((send, recv)) => {
|
||||||
let store = shard_store.clone();
|
let stores = shard_stores.clone();
|
||||||
|
let state = cluster_state.clone();
|
||||||
|
let transport = self.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Err(e) = Self::handle_stream(send, recv, store).await {
|
if let Err(e) = transport.handle_stream(send, recv, stores, state).await {
|
||||||
tracing::error!("Stream handler error: {}", e);
|
tracing::error!("Stream handler error: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -219,9 +222,11 @@ impl QuicTransport {
|
|||||||
|
|
||||||
/// Handle a single bidirectional stream (one request-response exchange).
|
/// Handle a single bidirectional stream (one request-response exchange).
|
||||||
async fn handle_stream(
|
async fn handle_stream(
|
||||||
|
self: Arc<Self>,
|
||||||
mut send: quinn::SendStream,
|
mut send: quinn::SendStream,
|
||||||
mut recv: quinn::RecvStream,
|
mut recv: quinn::RecvStream,
|
||||||
shard_store: Arc<ShardStore>,
|
shard_stores: Vec<Arc<ShardStore>>,
|
||||||
|
cluster_state: Option<Arc<ClusterState>>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// Read the full request (length-prefixed bincode + optional trailing data)
|
// Read the full request (length-prefixed bincode + optional trailing data)
|
||||||
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
|
let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max
|
||||||
@@ -231,6 +236,7 @@ impl QuicTransport {
|
|||||||
ClusterRequest::ShardWrite(write_req) => {
|
ClusterRequest::ShardWrite(write_req) => {
|
||||||
// Shard data follows the header in the raw buffer
|
// Shard data follows the header in the raw buffer
|
||||||
let shard_data = &raw[header_len..];
|
let shard_data = &raw[header_len..];
|
||||||
|
let drive_index = write_req.drive_index;
|
||||||
|
|
||||||
let shard_id = ShardId {
|
let shard_id = ShardId {
|
||||||
bucket: write_req.bucket,
|
bucket: write_req.bucket,
|
||||||
@@ -239,9 +245,10 @@ impl QuicTransport {
|
|||||||
shard_index: write_req.shard_index,
|
shard_index: write_req.shard_index,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = shard_store
|
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||||
.write_shard(&shard_id, &shard_data, write_req.checksum)
|
Ok(store) => store.write_shard(&shard_id, &shard_data, write_req.checksum).await,
|
||||||
.await;
|
Err(error) => Err(error),
|
||||||
|
};
|
||||||
|
|
||||||
let ack = ShardWriteAck {
|
let ack = ShardWriteAck {
|
||||||
request_id: write_req.request_id,
|
request_id: write_req.request_id,
|
||||||
@@ -254,6 +261,7 @@ impl QuicTransport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ClusterRequest::ShardRead(read_req) => {
|
ClusterRequest::ShardRead(read_req) => {
|
||||||
|
let drive_index = read_req.drive_index;
|
||||||
let shard_id = ShardId {
|
let shard_id = ShardId {
|
||||||
bucket: read_req.bucket,
|
bucket: read_req.bucket,
|
||||||
key: read_req.key,
|
key: read_req.key,
|
||||||
@@ -261,7 +269,15 @@ impl QuicTransport {
|
|||||||
shard_index: read_req.shard_index,
|
shard_index: read_req.shard_index,
|
||||||
};
|
};
|
||||||
|
|
||||||
match shard_store.read_shard(&shard_id).await {
|
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||||
|
Ok(store) => store,
|
||||||
|
Err(error) => {
|
||||||
|
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match store.read_shard(&shard_id).await {
|
||||||
Ok((data, checksum)) => {
|
Ok((data, checksum)) => {
|
||||||
let header = ShardReadResponse {
|
let header = ShardReadResponse {
|
||||||
request_id: read_req.request_id,
|
request_id: read_req.request_id,
|
||||||
@@ -293,13 +309,17 @@ impl QuicTransport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ClusterRequest::ShardDelete(del_req) => {
|
ClusterRequest::ShardDelete(del_req) => {
|
||||||
|
let drive_index = del_req.drive_index;
|
||||||
let shard_id = ShardId {
|
let shard_id = ShardId {
|
||||||
bucket: del_req.bucket,
|
bucket: del_req.bucket,
|
||||||
key: del_req.key,
|
key: del_req.key,
|
||||||
chunk_index: del_req.chunk_index,
|
chunk_index: del_req.chunk_index,
|
||||||
shard_index: del_req.shard_index,
|
shard_index: del_req.shard_index,
|
||||||
};
|
};
|
||||||
let result = shard_store.delete_shard(&shard_id).await;
|
let result = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||||
|
Ok(store) => store.delete_shard(&shard_id).await,
|
||||||
|
Err(error) => Err(error),
|
||||||
|
};
|
||||||
let ack = protocol::ClusterResponse::ShardDeleteAck(protocol::ShardDeleteAck {
|
let ack = protocol::ClusterResponse::ShardDeleteAck(protocol::ShardDeleteAck {
|
||||||
request_id: del_req.request_id,
|
request_id: del_req.request_id,
|
||||||
success: result.is_ok(),
|
success: result.is_ok(),
|
||||||
@@ -310,13 +330,22 @@ impl QuicTransport {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ClusterRequest::ShardHead(head_req) => {
|
ClusterRequest::ShardHead(head_req) => {
|
||||||
|
let drive_index = head_req.drive_index;
|
||||||
let shard_id = ShardId {
|
let shard_id = ShardId {
|
||||||
bucket: head_req.bucket,
|
bucket: head_req.bucket,
|
||||||
key: head_req.key,
|
key: head_req.key,
|
||||||
chunk_index: head_req.chunk_index,
|
chunk_index: head_req.chunk_index,
|
||||||
shard_index: head_req.shard_index,
|
shard_index: head_req.shard_index,
|
||||||
};
|
};
|
||||||
let resp = match shard_store.head_shard(&shard_id).await {
|
let store = match Self::shard_store_for_drive(&shard_stores, drive_index) {
|
||||||
|
Ok(store) => store,
|
||||||
|
Err(error) => {
|
||||||
|
Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let resp = match store.head_shard(&shard_id).await {
|
||||||
Ok(Some(meta)) => protocol::ShardHeadResponse {
|
Ok(Some(meta)) => protocol::ShardHeadResponse {
|
||||||
request_id: head_req.request_id,
|
request_id: head_req.request_id,
|
||||||
found: true,
|
found: true,
|
||||||
@@ -336,9 +365,103 @@ impl QuicTransport {
|
|||||||
send.finish()?;
|
send.finish()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Heartbeat, Join, TopologySync, Heal, and Manifest operations
|
ClusterRequest::JoinRequest(join_req) => {
|
||||||
// will be handled by the membership and coordinator modules.
|
let Some(state) = cluster_state else {
|
||||||
// For now, send a generic ack.
|
let err = protocol::ErrorResponse {
|
||||||
|
request_id: String::new(),
|
||||||
|
code: "ClusterDisabled".to_string(),
|
||||||
|
message: "Cluster state is not available".to_string(),
|
||||||
|
};
|
||||||
|
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
|
||||||
|
send.write_all(&response).await?;
|
||||||
|
send.finish()?;
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
let joining_node_id = join_req.node_info.node_id.clone();
|
||||||
|
state.add_node(join_req.node_info).await;
|
||||||
|
let topology = state.to_topology().await;
|
||||||
|
let node_drives: Vec<(String, u32)> = topology
|
||||||
|
.nodes
|
||||||
|
.iter()
|
||||||
|
.map(|node| (node.node_id.clone(), node.drive_count))
|
||||||
|
.collect();
|
||||||
|
let erasure_sets = super::placement::form_erasure_sets(
|
||||||
|
&node_drives,
|
||||||
|
topology.data_shards + topology.parity_shards,
|
||||||
|
);
|
||||||
|
state.set_erasure_sets(erasure_sets).await;
|
||||||
|
|
||||||
|
let response_topology = state.to_topology().await;
|
||||||
|
let response = protocol::encode_response(&ClusterResponse::JoinResponse(
|
||||||
|
protocol::JoinResponseMessage {
|
||||||
|
accepted: true,
|
||||||
|
topology: Some(response_topology.clone()),
|
||||||
|
error: None,
|
||||||
|
},
|
||||||
|
))?;
|
||||||
|
send.write_all(&response).await?;
|
||||||
|
send.finish()?;
|
||||||
|
|
||||||
|
self.broadcast_topology(&state, Some(response_topology), None, Some(&joining_node_id)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
ClusterRequest::Heartbeat(heartbeat) => {
|
||||||
|
let Some(state) = cluster_state else {
|
||||||
|
let err = protocol::ErrorResponse {
|
||||||
|
request_id: String::new(),
|
||||||
|
code: "ClusterDisabled".to_string(),
|
||||||
|
message: "Cluster state is not available".to_string(),
|
||||||
|
};
|
||||||
|
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
|
||||||
|
send.write_all(&response).await?;
|
||||||
|
send.finish()?;
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
let peer_node_id = heartbeat.node_id.clone();
|
||||||
|
let peer_topology_version = heartbeat.topology_version;
|
||||||
|
state.record_heartbeat(&heartbeat.node_id).await;
|
||||||
|
let local_topology_version = state.version().await;
|
||||||
|
let response = protocol::encode_response(&ClusterResponse::HeartbeatAck(
|
||||||
|
protocol::HeartbeatAckMessage {
|
||||||
|
node_id: state.local_node_id().to_string(),
|
||||||
|
timestamp: chrono::Utc::now().to_rfc3339(),
|
||||||
|
topology_version: local_topology_version,
|
||||||
|
},
|
||||||
|
))?;
|
||||||
|
send.write_all(&response).await?;
|
||||||
|
send.finish()?;
|
||||||
|
|
||||||
|
if local_topology_version > peer_topology_version {
|
||||||
|
self.broadcast_topology(&state, None, Some(&peer_node_id), None).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ClusterRequest::TopologySync(sync) => {
|
||||||
|
let Some(state) = cluster_state else {
|
||||||
|
let err = protocol::ErrorResponse {
|
||||||
|
request_id: String::new(),
|
||||||
|
code: "ClusterDisabled".to_string(),
|
||||||
|
message: "Cluster state is not available".to_string(),
|
||||||
|
};
|
||||||
|
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
|
||||||
|
send.write_all(&response).await?;
|
||||||
|
send.finish()?;
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
state.apply_topology(&sync.topology).await;
|
||||||
|
let response = protocol::encode_response(&ClusterResponse::TopologySyncAck(
|
||||||
|
protocol::TopologySyncAckMessage {
|
||||||
|
accepted: true,
|
||||||
|
current_version: state.version().await,
|
||||||
|
},
|
||||||
|
))?;
|
||||||
|
send.write_all(&response).await?;
|
||||||
|
send.finish()?;
|
||||||
|
}
|
||||||
|
|
||||||
_ => {
|
_ => {
|
||||||
let err = protocol::ErrorResponse {
|
let err = protocol::ErrorResponse {
|
||||||
request_id: String::new(),
|
request_id: String::new(),
|
||||||
@@ -354,6 +477,83 @@ impl QuicTransport {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn shard_store_for_drive(
|
||||||
|
shard_stores: &[Arc<ShardStore>],
|
||||||
|
drive_index: u32,
|
||||||
|
) -> Result<Arc<ShardStore>> {
|
||||||
|
shard_stores
|
||||||
|
.get(drive_index as usize)
|
||||||
|
.cloned()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("Drive {} not found", drive_index))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_error_response(
|
||||||
|
send: &mut quinn::SendStream,
|
||||||
|
code: &str,
|
||||||
|
message: String,
|
||||||
|
) -> Result<()> {
|
||||||
|
let err = protocol::ErrorResponse {
|
||||||
|
request_id: String::new(),
|
||||||
|
code: code.to_string(),
|
||||||
|
message,
|
||||||
|
};
|
||||||
|
let response = protocol::encode_response(&ClusterResponse::Error(err))?;
|
||||||
|
send.write_all(&response).await?;
|
||||||
|
send.finish()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn broadcast_topology(
|
||||||
|
&self,
|
||||||
|
state: &Arc<ClusterState>,
|
||||||
|
topology: Option<protocol::ClusterTopology>,
|
||||||
|
target_node_id: Option<&str>,
|
||||||
|
skip_node_id: Option<&str>,
|
||||||
|
) {
|
||||||
|
let topology = match topology {
|
||||||
|
Some(topology) => topology,
|
||||||
|
None => state.to_topology().await,
|
||||||
|
};
|
||||||
|
let nodes = state.all_nodes().await;
|
||||||
|
for node in nodes {
|
||||||
|
if node.info.node_id == state.local_node_id() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if let Some(target_node_id) = target_node_id {
|
||||||
|
if node.info.node_id != target_node_id {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if matches!(skip_node_id, Some(skip_node_id) if node.info.node_id == skip_node_id) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if node.status != NodeStatus::Online {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let addr = match node.info.quic_addr.parse() {
|
||||||
|
Ok(addr) => addr,
|
||||||
|
Err(error) => {
|
||||||
|
tracing::warn!(node = %node.info.node_id, error = %error, "Skipping topology sync for invalid peer address");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let conn = match self.get_connection(&node.info.node_id, addr).await {
|
||||||
|
Ok(conn) => conn,
|
||||||
|
Err(error) => {
|
||||||
|
tracing::warn!(node = %node.info.node_id, error = %error, "Failed to connect for topology sync");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let request = ClusterRequest::TopologySync(protocol::TopologySyncMessage {
|
||||||
|
topology: topology.clone(),
|
||||||
|
});
|
||||||
|
if let Err(error) = self.send_request(&conn, &request).await {
|
||||||
|
tracing::warn!(node = %node.info.node_id, error = %error, "Failed to send topology sync");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Generate self-signed TLS certificates for cluster-internal communication.
|
/// Generate self-signed TLS certificates for cluster-internal communication.
|
||||||
fn generate_tls_configs() -> Result<(QuinnServerConfig, ClientConfig)> {
|
fn generate_tls_configs() -> Result<(QuinnServerConfig, ClientConfig)> {
|
||||||
// Generate self-signed certificate
|
// Generate self-signed certificate
|
||||||
|
|||||||
+102
-50
@@ -1,8 +1,10 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use super::placement::{DriveLocation, ErasureSet};
|
use super::placement::{DriveLocation, ErasureSet};
|
||||||
|
use super::persistence;
|
||||||
use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo};
|
use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo};
|
||||||
|
|
||||||
/// Node status for tracking liveness.
|
/// Node status for tracking liveness.
|
||||||
@@ -26,6 +28,7 @@ pub struct NodeState {
|
|||||||
pub struct ClusterState {
|
pub struct ClusterState {
|
||||||
inner: Arc<RwLock<ClusterStateInner>>,
|
inner: Arc<RwLock<ClusterStateInner>>,
|
||||||
local_node_id: String,
|
local_node_id: String,
|
||||||
|
topology_path: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ClusterStateInner {
|
struct ClusterStateInner {
|
||||||
@@ -43,6 +46,7 @@ impl ClusterState {
|
|||||||
cluster_id: String,
|
cluster_id: String,
|
||||||
data_shards: usize,
|
data_shards: usize,
|
||||||
parity_shards: usize,
|
parity_shards: usize,
|
||||||
|
topology_path: Option<PathBuf>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
inner: Arc::new(RwLock::new(ClusterStateInner {
|
inner: Arc::new(RwLock::new(ClusterStateInner {
|
||||||
@@ -54,6 +58,7 @@ impl ClusterState {
|
|||||||
parity_shards,
|
parity_shards,
|
||||||
})),
|
})),
|
||||||
local_node_id,
|
local_node_id,
|
||||||
|
topology_path,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -61,27 +66,37 @@ impl ClusterState {
|
|||||||
&self.local_node_id
|
&self.local_node_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn cluster_id(&self) -> String {
|
||||||
|
self.inner.read().await.cluster_id.clone()
|
||||||
|
}
|
||||||
|
|
||||||
/// Register a node in the cluster.
|
/// Register a node in the cluster.
|
||||||
pub async fn add_node(&self, info: NodeInfo) {
|
pub async fn add_node(&self, info: NodeInfo) {
|
||||||
let mut inner = self.inner.write().await;
|
{
|
||||||
let node_id = info.node_id.clone();
|
let mut inner = self.inner.write().await;
|
||||||
inner.nodes.insert(
|
let node_id = info.node_id.clone();
|
||||||
node_id,
|
inner.nodes.insert(
|
||||||
NodeState {
|
node_id,
|
||||||
info,
|
NodeState {
|
||||||
status: NodeStatus::Online,
|
info,
|
||||||
missed_heartbeats: 0,
|
status: NodeStatus::Online,
|
||||||
last_heartbeat: chrono::Utc::now(),
|
missed_heartbeats: 0,
|
||||||
},
|
last_heartbeat: chrono::Utc::now(),
|
||||||
);
|
},
|
||||||
inner.version += 1;
|
);
|
||||||
|
inner.version += 1;
|
||||||
|
}
|
||||||
|
self.persist_topology_snapshot().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a node from the cluster.
|
/// Remove a node from the cluster.
|
||||||
pub async fn remove_node(&self, node_id: &str) {
|
pub async fn remove_node(&self, node_id: &str) {
|
||||||
let mut inner = self.inner.write().await;
|
{
|
||||||
inner.nodes.remove(node_id);
|
let mut inner = self.inner.write().await;
|
||||||
inner.version += 1;
|
inner.nodes.remove(node_id);
|
||||||
|
inner.version += 1;
|
||||||
|
}
|
||||||
|
self.persist_topology_snapshot().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update heartbeat for a node (reset missed count).
|
/// Update heartbeat for a node (reset missed count).
|
||||||
@@ -133,9 +148,12 @@ impl ClusterState {
|
|||||||
|
|
||||||
/// Set erasure sets (typically done once during cluster formation).
|
/// Set erasure sets (typically done once during cluster formation).
|
||||||
pub async fn set_erasure_sets(&self, sets: Vec<ErasureSet>) {
|
pub async fn set_erasure_sets(&self, sets: Vec<ErasureSet>) {
|
||||||
let mut inner = self.inner.write().await;
|
{
|
||||||
inner.erasure_sets = sets;
|
let mut inner = self.inner.write().await;
|
||||||
inner.version += 1;
|
inner.erasure_sets = sets;
|
||||||
|
inner.version += 1;
|
||||||
|
}
|
||||||
|
self.persist_topology_snapshot().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the erasure set for a given object based on consistent hashing.
|
/// Get the erasure set for a given object based on consistent hashing.
|
||||||
@@ -244,48 +262,82 @@ impl ClusterState {
|
|||||||
|
|
||||||
/// Import topology from a protocol message (e.g., received from a peer during join).
|
/// Import topology from a protocol message (e.g., received from a peer during join).
|
||||||
pub async fn apply_topology(&self, topology: &ClusterTopology) {
|
pub async fn apply_topology(&self, topology: &ClusterTopology) {
|
||||||
let mut inner = self.inner.write().await;
|
let applied = {
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
|
||||||
// Only apply if newer
|
// Only apply if newer and from the same cluster lineage. A node that has not yet
|
||||||
if topology.version <= inner.version {
|
// joined any topology may adopt the seed cluster ID during its first join.
|
||||||
return;
|
if topology.version <= inner.version {
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
|
if topology.cluster_id != inner.cluster_id {
|
||||||
|
if inner.nodes.is_empty() {
|
||||||
|
inner.cluster_id = topology.cluster_id.clone();
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
inner.cluster_id = topology.cluster_id.clone();
|
inner.version = topology.version;
|
||||||
inner.version = topology.version;
|
inner.data_shards = topology.data_shards;
|
||||||
inner.data_shards = topology.data_shards;
|
inner.parity_shards = topology.parity_shards;
|
||||||
inner.parity_shards = topology.parity_shards;
|
|
||||||
|
|
||||||
// Update nodes
|
let now = chrono::Utc::now();
|
||||||
for node_info in &topology.nodes {
|
for node_info in &topology.nodes {
|
||||||
if !inner.nodes.contains_key(&node_info.node_id) {
|
let existing_status = inner.nodes.get(&node_info.node_id).map(|node| node.status.clone());
|
||||||
|
let existing_missed_heartbeats = inner
|
||||||
|
.nodes
|
||||||
|
.get(&node_info.node_id)
|
||||||
|
.map(|node| node.missed_heartbeats);
|
||||||
|
let existing_last_heartbeat = inner
|
||||||
|
.nodes
|
||||||
|
.get(&node_info.node_id)
|
||||||
|
.map(|node| node.last_heartbeat);
|
||||||
inner.nodes.insert(
|
inner.nodes.insert(
|
||||||
node_info.node_id.clone(),
|
node_info.node_id.clone(),
|
||||||
NodeState {
|
NodeState {
|
||||||
info: node_info.clone(),
|
info: node_info.clone(),
|
||||||
status: NodeStatus::Online,
|
status: existing_status.unwrap_or(NodeStatus::Online),
|
||||||
missed_heartbeats: 0,
|
missed_heartbeats: existing_missed_heartbeats.unwrap_or(0),
|
||||||
last_heartbeat: chrono::Utc::now(),
|
last_heartbeat: existing_last_heartbeat.unwrap_or(now),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Update erasure sets
|
inner.nodes.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id));
|
||||||
inner.erasure_sets = topology
|
|
||||||
.erasure_sets
|
// Update erasure sets
|
||||||
.iter()
|
inner.erasure_sets = topology
|
||||||
.map(|set| ErasureSet {
|
.erasure_sets
|
||||||
set_id: set.set_id,
|
.iter()
|
||||||
drives: set
|
.map(|set| ErasureSet {
|
||||||
.drives
|
set_id: set.set_id,
|
||||||
.iter()
|
drives: set
|
||||||
.map(|d| DriveLocation {
|
.drives
|
||||||
node_id: d.node_id.clone(),
|
.iter()
|
||||||
drive_index: d.drive_index,
|
.map(|d| DriveLocation {
|
||||||
})
|
node_id: d.node_id.clone(),
|
||||||
.collect(),
|
drive_index: d.drive_index,
|
||||||
})
|
})
|
||||||
.collect();
|
.collect(),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
true
|
||||||
|
};
|
||||||
|
|
||||||
|
if applied {
|
||||||
|
self.persist_topology_snapshot().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn persist_topology_snapshot(&self) {
|
||||||
|
let Some(path) = &self.topology_path else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let topology = self.to_topology().await;
|
||||||
|
if let Err(error) = persistence::persist_topology(path, &topology).await {
|
||||||
|
tracing::warn!(error = %error, "Failed to persist cluster topology snapshot");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
+260
-30
@@ -11,6 +11,7 @@ use std::collections::HashMap;
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
use tokio::io::AsyncReadExt;
|
use tokio::io::AsyncReadExt;
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
@@ -27,6 +28,7 @@ use crate::cluster::coordinator::DistributedStore;
|
|||||||
use crate::cluster::drive_manager::DriveManager;
|
use crate::cluster::drive_manager::DriveManager;
|
||||||
use crate::cluster::healing::HealingService;
|
use crate::cluster::healing::HealingService;
|
||||||
use crate::cluster::membership::MembershipManager;
|
use crate::cluster::membership::MembershipManager;
|
||||||
|
use crate::cluster::persistence::{self, ClusterIdentity};
|
||||||
use crate::cluster::placement;
|
use crate::cluster::placement;
|
||||||
use crate::cluster::protocol::NodeInfo;
|
use crate::cluster::protocol::NodeInfo;
|
||||||
use crate::cluster::quic_transport::QuicTransport;
|
use crate::cluster::quic_transport::QuicTransport;
|
||||||
@@ -35,19 +37,46 @@ use crate::cluster::state::ClusterState;
|
|||||||
use crate::storage::{FileStore, StorageBackend};
|
use crate::storage::{FileStore, StorageBackend};
|
||||||
use crate::xml_response;
|
use crate::xml_response;
|
||||||
|
|
||||||
|
struct ServerMetrics {
|
||||||
|
started_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
total_requests: AtomicU64,
|
||||||
|
error_responses: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ServerMetrics {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
started_at: chrono::Utc::now(),
|
||||||
|
total_requests: AtomicU64::new(0),
|
||||||
|
error_responses: AtomicU64::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn record_response(&self, status: StatusCode) {
|
||||||
|
self.total_requests.fetch_add(1, Ordering::Relaxed);
|
||||||
|
if status.as_u16() >= 400 {
|
||||||
|
self.error_responses.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct StorageServer {
|
pub struct StorageServer {
|
||||||
store: Arc<StorageBackend>,
|
store: Arc<StorageBackend>,
|
||||||
auth_runtime: Arc<auth::RuntimeCredentialStore>,
|
auth_runtime: Arc<auth::RuntimeCredentialStore>,
|
||||||
shutdown_tx: watch::Sender<bool>,
|
shutdown_tx: watch::Sender<bool>,
|
||||||
|
cluster_shutdown_txs: Vec<watch::Sender<bool>>,
|
||||||
server_handle: tokio::task::JoinHandle<()>,
|
server_handle: tokio::task::JoinHandle<()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StorageServer {
|
impl StorageServer {
|
||||||
pub async fn start(config: SmartStorageConfig) -> Result<Self> {
|
pub async fn start(config: SmartStorageConfig) -> Result<Self> {
|
||||||
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
|
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
|
||||||
|
let mut cluster_shutdown_txs = Vec::new();
|
||||||
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
|
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
|
||||||
if cluster_config.enabled {
|
if cluster_config.enabled {
|
||||||
Self::start_clustered(&config, cluster_config).await?
|
let (store, shutdown_txs) = Self::start_clustered(&config, cluster_config).await?;
|
||||||
|
cluster_shutdown_txs = shutdown_txs;
|
||||||
|
store
|
||||||
} else {
|
} else {
|
||||||
Self::start_standalone(&config).await?
|
Self::start_standalone(&config).await?
|
||||||
}
|
}
|
||||||
@@ -69,6 +98,7 @@ impl StorageServer {
|
|||||||
let server_config = config.clone();
|
let server_config = config.clone();
|
||||||
let server_auth_runtime = auth_runtime.clone();
|
let server_auth_runtime = auth_runtime.clone();
|
||||||
let server_policy_store = policy_store.clone();
|
let server_policy_store = policy_store.clone();
|
||||||
|
let server_metrics = Arc::new(ServerMetrics::new());
|
||||||
|
|
||||||
let server_handle = tokio::spawn(async move {
|
let server_handle = tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
@@ -83,6 +113,7 @@ impl StorageServer {
|
|||||||
let cfg = server_config.clone();
|
let cfg = server_config.clone();
|
||||||
let auth_runtime = server_auth_runtime.clone();
|
let auth_runtime = server_auth_runtime.clone();
|
||||||
let ps = server_policy_store.clone();
|
let ps = server_policy_store.clone();
|
||||||
|
let metrics = server_metrics.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let svc = service_fn(move |req: Request<Incoming>| {
|
let svc = service_fn(move |req: Request<Incoming>| {
|
||||||
@@ -90,8 +121,9 @@ impl StorageServer {
|
|||||||
let cfg = cfg.clone();
|
let cfg = cfg.clone();
|
||||||
let auth_runtime = auth_runtime.clone();
|
let auth_runtime = auth_runtime.clone();
|
||||||
let ps = ps.clone();
|
let ps = ps.clone();
|
||||||
|
let metrics = metrics.clone();
|
||||||
async move {
|
async move {
|
||||||
handle_request(req, store, cfg, auth_runtime, ps).await
|
handle_request(req, store, cfg, auth_runtime, ps, metrics).await
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -126,11 +158,15 @@ impl StorageServer {
|
|||||||
store,
|
store,
|
||||||
auth_runtime,
|
auth_runtime,
|
||||||
shutdown_tx,
|
shutdown_tx,
|
||||||
|
cluster_shutdown_txs,
|
||||||
server_handle,
|
server_handle,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn stop(self) {
|
pub async fn stop(self) {
|
||||||
|
for shutdown_tx in &self.cluster_shutdown_txs {
|
||||||
|
let _ = shutdown_tx.send(true);
|
||||||
|
}
|
||||||
let _ = self.shutdown_tx.send(true);
|
let _ = self.shutdown_tx.send(true);
|
||||||
let _ = self.server_handle.await;
|
let _ = self.server_handle.await;
|
||||||
}
|
}
|
||||||
@@ -139,7 +175,7 @@ impl StorageServer {
|
|||||||
&self.store
|
&self.store
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_credentials(&self) -> Vec<crate::config::Credential> {
|
pub async fn list_credentials(&self) -> Vec<crate::auth::CredentialMetadata> {
|
||||||
self.auth_runtime.list_credentials().await
|
self.auth_runtime.list_credentials().await
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,12 +201,37 @@ impl StorageServer {
|
|||||||
async fn start_clustered(
|
async fn start_clustered(
|
||||||
config: &SmartStorageConfig,
|
config: &SmartStorageConfig,
|
||||||
cluster_config: &crate::cluster::config::ClusterConfig,
|
cluster_config: &crate::cluster::config::ClusterConfig,
|
||||||
) -> Result<Arc<StorageBackend>> {
|
) -> Result<(Arc<StorageBackend>, Vec<watch::Sender<bool>>)> {
|
||||||
let erasure_config = cluster_config.erasure.clone();
|
let erasure_config = cluster_config.erasure.clone();
|
||||||
|
let cluster_metadata_dir = persistence::cluster_metadata_dir(&config.storage.directory);
|
||||||
|
let identity_path = persistence::identity_path(&cluster_metadata_dir);
|
||||||
|
let topology_path = persistence::topology_path(&cluster_metadata_dir);
|
||||||
|
let persisted_identity = persistence::load_identity(&identity_path).await?;
|
||||||
|
|
||||||
|
if let (Some(configured_node_id), Some(identity)) = (&cluster_config.node_id, &persisted_identity) {
|
||||||
|
if configured_node_id != &identity.node_id {
|
||||||
|
anyhow::bail!(
|
||||||
|
"Configured cluster node ID '{}' conflicts with persisted node ID '{}'",
|
||||||
|
configured_node_id,
|
||||||
|
identity.node_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let node_id = cluster_config
|
let node_id = cluster_config
|
||||||
.node_id
|
.node_id
|
||||||
.clone()
|
.clone()
|
||||||
|
.or_else(|| persisted_identity.as_ref().map(|identity| identity.node_id.clone()))
|
||||||
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||||
|
let cluster_id = persisted_identity
|
||||||
|
.as_ref()
|
||||||
|
.map(|identity| identity.cluster_id.clone())
|
||||||
|
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
||||||
|
persistence::persist_identity(
|
||||||
|
&identity_path,
|
||||||
|
&ClusterIdentity::new(node_id.clone(), cluster_id.clone()),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Determine drive paths
|
// Determine drive paths
|
||||||
let drive_paths: Vec<std::path::PathBuf> = if cluster_config.drives.paths.is_empty() {
|
let drive_paths: Vec<std::path::PathBuf> = if cluster_config.drives.paths.is_empty() {
|
||||||
@@ -202,28 +263,37 @@ impl StorageServer {
|
|||||||
// Initialize cluster state
|
// Initialize cluster state
|
||||||
let cluster_state = Arc::new(ClusterState::new(
|
let cluster_state = Arc::new(ClusterState::new(
|
||||||
node_id.clone(),
|
node_id.clone(),
|
||||||
uuid::Uuid::new_v4().to_string(),
|
cluster_id.clone(),
|
||||||
erasure_config.data_shards,
|
erasure_config.data_shards,
|
||||||
erasure_config.parity_shards,
|
erasure_config.parity_shards,
|
||||||
|
Some(topology_path.clone()),
|
||||||
));
|
));
|
||||||
|
|
||||||
// Form erasure sets from local drives (single-node for now)
|
let persisted_topology = persistence::load_topology(&topology_path).await?;
|
||||||
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
|
let has_persisted_topology = persisted_topology.is_some();
|
||||||
let erasure_sets =
|
if let Some(topology) = persisted_topology {
|
||||||
placement::form_erasure_sets(&nodes, erasure_config.total_shards());
|
if topology.cluster_id != cluster_id {
|
||||||
|
anyhow::bail!("Persisted topology cluster ID does not match persisted node identity");
|
||||||
|
}
|
||||||
|
cluster_state.apply_topology(&topology).await;
|
||||||
|
} else if cluster_config.seed_nodes.is_empty() {
|
||||||
|
// Form erasure sets from local drives for a first node bootstrap.
|
||||||
|
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
|
||||||
|
let erasure_sets = placement::form_erasure_sets(&nodes, erasure_config.total_shards());
|
||||||
|
|
||||||
if erasure_sets.is_empty() {
|
if erasure_sets.is_empty() {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"Not enough drives ({}) for erasure set size ({}). \
|
"Not enough drives ({}) for erasure set size ({}). \
|
||||||
Need at least {} drives.",
|
Need at least {} drives.",
|
||||||
drive_paths.len(),
|
drive_paths.len(),
|
||||||
erasure_config.total_shards(),
|
erasure_config.total_shards(),
|
||||||
erasure_config.total_shards(),
|
erasure_config.total_shards(),
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster_state.set_erasure_sets(erasure_sets).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster_state.set_erasure_sets(erasure_sets).await;
|
|
||||||
|
|
||||||
// Register self as a node
|
// Register self as a node
|
||||||
let local_node_info = NodeInfo {
|
let local_node_info = NodeInfo {
|
||||||
node_id: node_id.clone(),
|
node_id: node_id.clone(),
|
||||||
@@ -233,8 +303,6 @@ impl StorageServer {
|
|||||||
status: "online".to_string(),
|
status: "online".to_string(),
|
||||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||||
};
|
};
|
||||||
cluster_state.add_node(local_node_info.clone()).await;
|
|
||||||
|
|
||||||
// Initialize drive manager for health monitoring
|
// Initialize drive manager for health monitoring
|
||||||
let drive_manager = Arc::new(tokio::sync::Mutex::new(
|
let drive_manager = Arc::new(tokio::sync::Mutex::new(
|
||||||
DriveManager::from_paths(&drive_paths).await?,
|
DriveManager::from_paths(&drive_paths).await?,
|
||||||
@@ -246,13 +314,25 @@ impl StorageServer {
|
|||||||
cluster_state.clone(),
|
cluster_state.clone(),
|
||||||
transport.clone(),
|
transport.clone(),
|
||||||
cluster_config.heartbeat_interval_ms,
|
cluster_config.heartbeat_interval_ms,
|
||||||
|
cluster_config.heartbeat_timeout_ms,
|
||||||
local_node_info,
|
local_node_info,
|
||||||
)
|
)
|
||||||
.with_drive_manager(drive_manager.clone()),
|
.with_drive_manager(drive_manager.clone()),
|
||||||
);
|
);
|
||||||
membership
|
membership
|
||||||
.join_cluster(&cluster_config.seed_nodes)
|
.join_cluster(
|
||||||
|
&cluster_config.seed_nodes,
|
||||||
|
cluster_config.seed_nodes.is_empty() && !has_persisted_topology,
|
||||||
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
let final_cluster_id = cluster_state.cluster_id().await;
|
||||||
|
if final_cluster_id != cluster_id {
|
||||||
|
persistence::persist_identity(
|
||||||
|
&identity_path,
|
||||||
|
&ClusterIdentity::new(node_id.clone(), final_cluster_id),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
// Build local shard stores (one per drive) for shared use
|
// Build local shard stores (one per drive) for shared use
|
||||||
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
|
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
|
||||||
@@ -261,18 +341,19 @@ impl StorageServer {
|
|||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Start QUIC accept loop for incoming connections
|
// Start QUIC accept loop for incoming connections
|
||||||
let shard_store_for_accept = local_shard_stores[0].clone();
|
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
||||||
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
|
||||||
let transport_clone = transport.clone();
|
let transport_clone = transport.clone();
|
||||||
|
let cluster_state_for_accept = cluster_state.clone();
|
||||||
|
let shard_stores_for_accept = local_shard_stores.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
transport_clone
|
transport_clone
|
||||||
.accept_loop(shard_store_for_accept, quic_shutdown_rx)
|
.accept_loop(shard_stores_for_accept, Some(cluster_state_for_accept), quic_shutdown_rx)
|
||||||
.await;
|
.await;
|
||||||
});
|
});
|
||||||
|
|
||||||
// Start heartbeat loop
|
// Start heartbeat loop
|
||||||
let membership_clone = membership.clone();
|
let membership_clone = membership.clone();
|
||||||
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
|
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
|
||||||
});
|
});
|
||||||
@@ -289,7 +370,7 @@ impl StorageServer {
|
|||||||
24, // scan every 24 hours
|
24, // scan every 24 hours
|
||||||
healing_runtime.clone(),
|
healing_runtime.clone(),
|
||||||
)?;
|
)?;
|
||||||
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
|
let (heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
healing_service.run(heal_shutdown_rx).await;
|
healing_service.run(heal_shutdown_rx).await;
|
||||||
});
|
});
|
||||||
@@ -319,7 +400,7 @@ impl StorageServer {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(store)
|
Ok((store, vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -401,12 +482,31 @@ fn storage_error_response(err: &StorageError, request_id: &str) -> Response<BoxB
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn json_response(status: StatusCode, value: serde_json::Value, request_id: &str) -> Response<BoxBody> {
|
||||||
|
Response::builder()
|
||||||
|
.status(status)
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.header("x-amz-request-id", request_id)
|
||||||
|
.body(full_body(value.to_string()))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn text_response(status: StatusCode, content_type: &str, body: String, request_id: &str) -> Response<BoxBody> {
|
||||||
|
Response::builder()
|
||||||
|
.status(status)
|
||||||
|
.header("content-type", content_type)
|
||||||
|
.header("x-amz-request-id", request_id)
|
||||||
|
.body(full_body(body))
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_request(
|
async fn handle_request(
|
||||||
req: Request<Incoming>,
|
req: Request<Incoming>,
|
||||||
store: Arc<StorageBackend>,
|
store: Arc<StorageBackend>,
|
||||||
config: SmartStorageConfig,
|
config: SmartStorageConfig,
|
||||||
auth_runtime: Arc<auth::RuntimeCredentialStore>,
|
auth_runtime: Arc<auth::RuntimeCredentialStore>,
|
||||||
policy_store: Arc<PolicyStore>,
|
policy_store: Arc<PolicyStore>,
|
||||||
|
metrics: Arc<ServerMetrics>,
|
||||||
) -> Result<Response<BoxBody>, std::convert::Infallible> {
|
) -> Result<Response<BoxBody>, std::convert::Infallible> {
|
||||||
let request_id = Uuid::new_v4().to_string();
|
let request_id = Uuid::new_v4().to_string();
|
||||||
let method = req.method().clone();
|
let method = req.method().clone();
|
||||||
@@ -416,6 +516,23 @@ async fn handle_request(
|
|||||||
// Handle CORS preflight
|
// Handle CORS preflight
|
||||||
if config.cors.enabled && method == Method::OPTIONS {
|
if config.cors.enabled && method == Method::OPTIONS {
|
||||||
let resp = build_cors_preflight(&config, &request_id);
|
let resp = build_cors_preflight(&config, &request_id);
|
||||||
|
metrics.record_response(resp.status());
|
||||||
|
return Ok(resp);
|
||||||
|
}
|
||||||
|
|
||||||
|
if method == Method::GET && uri.path().starts_with("/-/") {
|
||||||
|
let resp = match handle_operational_request(uri.path(), store, &config, &metrics, &request_id).await {
|
||||||
|
Ok(resp) => resp,
|
||||||
|
Err(error) => {
|
||||||
|
tracing::error!(error = %error, "Operational endpoint failed");
|
||||||
|
json_response(
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
serde_json::json!({ "ok": false, "error": error.to_string() }),
|
||||||
|
&request_id,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
metrics.record_response(resp.status());
|
||||||
return Ok(resp);
|
return Ok(resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -439,7 +556,9 @@ async fn handle_request(
|
|||||||
Ok(id) => Some(id),
|
Ok(id) => Some(id),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!("Auth failed: {}", e.message);
|
tracing::warn!("Auth failed: {}", e.message);
|
||||||
return Ok(storage_error_response(&e, &request_id));
|
let resp = storage_error_response(&e, &request_id);
|
||||||
|
metrics.record_response(resp.status());
|
||||||
|
return Ok(resp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -449,7 +568,9 @@ async fn handle_request(
|
|||||||
|
|
||||||
// Step 3: Authorization (policy evaluation)
|
// Step 3: Authorization (policy evaluation)
|
||||||
if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await {
|
if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await {
|
||||||
return Ok(storage_error_response(&e, &request_id));
|
let resp = storage_error_response(&e, &request_id);
|
||||||
|
metrics.record_response(resp.status());
|
||||||
|
return Ok(resp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -481,9 +602,118 @@ async fn handle_request(
|
|||||||
"request"
|
"request"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
metrics.record_response(response.status());
|
||||||
|
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn handle_operational_request(
|
||||||
|
path: &str,
|
||||||
|
store: Arc<StorageBackend>,
|
||||||
|
config: &SmartStorageConfig,
|
||||||
|
metrics: &ServerMetrics,
|
||||||
|
request_id: &str,
|
||||||
|
) -> Result<Response<BoxBody>> {
|
||||||
|
match path {
|
||||||
|
"/-/live" | "/-/livez" => Ok(json_response(
|
||||||
|
StatusCode::OK,
|
||||||
|
serde_json::json!({
|
||||||
|
"ok": true,
|
||||||
|
"status": "alive",
|
||||||
|
"startedAt": metrics.started_at.timestamp_millis(),
|
||||||
|
}),
|
||||||
|
request_id,
|
||||||
|
)),
|
||||||
|
"/-/ready" | "/-/readyz" => {
|
||||||
|
let cluster_health = store.get_cluster_health().await?;
|
||||||
|
let cluster_ready = !cluster_health.enabled
|
||||||
|
|| (cluster_health.majority_healthy.unwrap_or(false)
|
||||||
|
&& cluster_health.quorum_healthy.unwrap_or(false));
|
||||||
|
let status = if cluster_ready {
|
||||||
|
StatusCode::OK
|
||||||
|
} else {
|
||||||
|
StatusCode::SERVICE_UNAVAILABLE
|
||||||
|
};
|
||||||
|
Ok(json_response(
|
||||||
|
status,
|
||||||
|
serde_json::json!({
|
||||||
|
"ok": cluster_ready,
|
||||||
|
"status": if cluster_ready { "ready" } else { "degraded" },
|
||||||
|
"cluster": cluster_health,
|
||||||
|
}),
|
||||||
|
request_id,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
"/-/health" | "/-/healthz" => {
|
||||||
|
let cluster_health = store.get_cluster_health().await?;
|
||||||
|
let stats = store.get_storage_stats().await?;
|
||||||
|
Ok(json_response(
|
||||||
|
StatusCode::OK,
|
||||||
|
serde_json::json!({
|
||||||
|
"ok": true,
|
||||||
|
"status": "healthy",
|
||||||
|
"version": env!("CARGO_PKG_VERSION"),
|
||||||
|
"server": {
|
||||||
|
"address": config.server.address,
|
||||||
|
"port": config.server.port,
|
||||||
|
"startedAt": metrics.started_at.timestamp_millis(),
|
||||||
|
},
|
||||||
|
"storage": stats,
|
||||||
|
"cluster": cluster_health,
|
||||||
|
"metrics": {
|
||||||
|
"totalRequests": metrics.total_requests.load(Ordering::Relaxed),
|
||||||
|
"errorResponses": metrics.error_responses.load(Ordering::Relaxed),
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
request_id,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
"/-/metrics" => {
|
||||||
|
let cluster_health = store.get_cluster_health().await?;
|
||||||
|
let stats = store.get_storage_stats().await?;
|
||||||
|
let cluster_enabled = if cluster_health.enabled { 1 } else { 0 };
|
||||||
|
let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) { 1 } else { 0 };
|
||||||
|
let body = format!(
|
||||||
|
"# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\
|
||||||
|
# TYPE smartstorage_requests_total counter\n\
|
||||||
|
smartstorage_requests_total {}\n\
|
||||||
|
# HELP smartstorage_error_responses_total HTTP responses with status >= 400.\n\
|
||||||
|
# TYPE smartstorage_error_responses_total counter\n\
|
||||||
|
smartstorage_error_responses_total {}\n\
|
||||||
|
# HELP smartstorage_buckets_total Runtime bucket count.\n\
|
||||||
|
# TYPE smartstorage_buckets_total gauge\n\
|
||||||
|
smartstorage_buckets_total {}\n\
|
||||||
|
# HELP smartstorage_objects_total Runtime object count.\n\
|
||||||
|
# TYPE smartstorage_objects_total gauge\n\
|
||||||
|
smartstorage_objects_total {}\n\
|
||||||
|
# HELP smartstorage_cluster_enabled Cluster mode enabled.\n\
|
||||||
|
# TYPE smartstorage_cluster_enabled gauge\n\
|
||||||
|
smartstorage_cluster_enabled {}\n\
|
||||||
|
# HELP smartstorage_cluster_quorum_healthy Cluster quorum health.\n\
|
||||||
|
# TYPE smartstorage_cluster_quorum_healthy gauge\n\
|
||||||
|
smartstorage_cluster_quorum_healthy {}\n",
|
||||||
|
metrics.total_requests.load(Ordering::Relaxed),
|
||||||
|
metrics.error_responses.load(Ordering::Relaxed),
|
||||||
|
stats.bucket_count,
|
||||||
|
stats.total_object_count,
|
||||||
|
cluster_enabled,
|
||||||
|
quorum_healthy,
|
||||||
|
);
|
||||||
|
Ok(text_response(
|
||||||
|
StatusCode::OK,
|
||||||
|
"text/plain; version=0.0.4",
|
||||||
|
body,
|
||||||
|
request_id,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
_ => Ok(json_response(
|
||||||
|
StatusCode::NOT_FOUND,
|
||||||
|
serde_json::json!({ "ok": false, "error": "Unknown operational endpoint" }),
|
||||||
|
request_id,
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Authorize a request based on bucket policies and authentication state.
|
/// Authorize a request based on bucket policies and authentication state.
|
||||||
async fn authorize_request(
|
async fn authorize_request(
|
||||||
ctx: &RequestContext,
|
ctx: &RequestContext,
|
||||||
|
|||||||
@@ -0,0 +1,317 @@
|
|||||||
|
/// <reference types="node" />
|
||||||
|
|
||||||
|
import { readFile, readdir, rm } from 'fs/promises';
|
||||||
|
import { join } from 'path';
|
||||||
|
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||||
|
import { CreateBucketCommand, GetObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
||||||
|
import { Readable } from 'stream';
|
||||||
|
import * as smartstorage from '../ts/index.js';
|
||||||
|
|
||||||
|
const baseDir = join(process.cwd(), '.nogit', `cluster-multinode-${Date.now()}`);
|
||||||
|
const nodes: smartstorage.SmartStorage[] = [];
|
||||||
|
|
||||||
|
const makeDrivePaths = (nodeId: string) => {
|
||||||
|
return [1, 2].map((driveIndex) => join(baseDir, nodeId, `drive-${driveIndex}`));
|
||||||
|
};
|
||||||
|
|
||||||
|
const streamToString = async (stream: Readable): Promise<string> => {
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk)));
|
||||||
|
stream.on('error', reject);
|
||||||
|
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
const fileExistsBelow = async (directory: string, fileName: string): Promise<boolean> => {
|
||||||
|
let entries;
|
||||||
|
try {
|
||||||
|
entries = await readdir(directory, { withFileTypes: true });
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const entry of entries) {
|
||||||
|
const entryPath = join(directory, entry.name);
|
||||||
|
if (entry.isFile() && entry.name === fileName) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (entry.isDirectory() && await fileExistsBelow(entryPath, fileName)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
const waitFor = async (check: () => Promise<boolean>, timeoutMs = 10000) => {
|
||||||
|
const deadline = Date.now() + timeoutMs;
|
||||||
|
let lastError = '';
|
||||||
|
while (Date.now() < deadline) {
|
||||||
|
try {
|
||||||
|
if (await check()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
lastError = error instanceof Error ? error.message : String(error);
|
||||||
|
}
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 250));
|
||||||
|
}
|
||||||
|
throw new Error(`Timed out waiting for cluster condition${lastError ? `: ${lastError}` : ''}`);
|
||||||
|
};
|
||||||
|
|
||||||
|
tap.test('setup: start three clustered storage nodes', async () => {
|
||||||
|
await rm(baseDir, { recursive: true, force: true });
|
||||||
|
|
||||||
|
const node1 = await smartstorage.SmartStorage.createAndStart({
|
||||||
|
server: {
|
||||||
|
address: '127.0.0.1',
|
||||||
|
port: 3350,
|
||||||
|
silent: true,
|
||||||
|
},
|
||||||
|
storage: {
|
||||||
|
directory: join(baseDir, 'node-1', 'storage'),
|
||||||
|
},
|
||||||
|
cluster: {
|
||||||
|
enabled: true,
|
||||||
|
nodeId: 'node-1',
|
||||||
|
quicPort: 4350,
|
||||||
|
seedNodes: [],
|
||||||
|
erasure: {
|
||||||
|
dataShards: 4,
|
||||||
|
parityShards: 2,
|
||||||
|
chunkSizeBytes: 1024 * 1024,
|
||||||
|
},
|
||||||
|
drives: {
|
||||||
|
paths: makeDrivePaths('node-1'),
|
||||||
|
},
|
||||||
|
heartbeatIntervalMs: 500,
|
||||||
|
heartbeatTimeoutMs: 3000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
nodes.push(node1);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
const node2 = await smartstorage.SmartStorage.createAndStart({
|
||||||
|
server: {
|
||||||
|
address: '127.0.0.1',
|
||||||
|
port: 3351,
|
||||||
|
silent: true,
|
||||||
|
},
|
||||||
|
storage: {
|
||||||
|
directory: join(baseDir, 'node-2', 'storage'),
|
||||||
|
},
|
||||||
|
cluster: {
|
||||||
|
enabled: true,
|
||||||
|
nodeId: 'node-2',
|
||||||
|
quicPort: 4351,
|
||||||
|
seedNodes: ['127.0.0.1:4350'],
|
||||||
|
erasure: {
|
||||||
|
dataShards: 4,
|
||||||
|
parityShards: 2,
|
||||||
|
chunkSizeBytes: 1024 * 1024,
|
||||||
|
},
|
||||||
|
drives: {
|
||||||
|
paths: makeDrivePaths('node-2'),
|
||||||
|
},
|
||||||
|
heartbeatIntervalMs: 500,
|
||||||
|
heartbeatTimeoutMs: 3000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
nodes.push(node2);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
const node3 = await smartstorage.SmartStorage.createAndStart({
|
||||||
|
server: {
|
||||||
|
address: '127.0.0.1',
|
||||||
|
port: 3352,
|
||||||
|
silent: true,
|
||||||
|
},
|
||||||
|
storage: {
|
||||||
|
directory: join(baseDir, 'node-3', 'storage'),
|
||||||
|
},
|
||||||
|
cluster: {
|
||||||
|
enabled: true,
|
||||||
|
nodeId: 'node-3',
|
||||||
|
quicPort: 4352,
|
||||||
|
seedNodes: ['127.0.0.1:4350'],
|
||||||
|
erasure: {
|
||||||
|
dataShards: 4,
|
||||||
|
parityShards: 2,
|
||||||
|
chunkSizeBytes: 1024 * 1024,
|
||||||
|
},
|
||||||
|
drives: {
|
||||||
|
paths: makeDrivePaths('node-3'),
|
||||||
|
},
|
||||||
|
heartbeatIntervalMs: 500,
|
||||||
|
heartbeatTimeoutMs: 3000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
nodes.push(node3);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('seed node should report joined peers and multi-node erasure topology', async () => {
|
||||||
|
const seed = nodes[0];
|
||||||
|
|
||||||
|
await waitFor(async () => {
|
||||||
|
const health = await seed.getClusterHealth();
|
||||||
|
if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) {
|
||||||
|
throw new Error(JSON.stringify(health));
|
||||||
|
}
|
||||||
|
return health.peers?.length === 2 && health.erasure?.erasureSetCount === 1;
|
||||||
|
});
|
||||||
|
|
||||||
|
const health = await seed.getClusterHealth();
|
||||||
|
const peerIds = health.peers!.map((peer) => peer.nodeId).sort();
|
||||||
|
|
||||||
|
expect(health.enabled).toEqual(true);
|
||||||
|
expect(health.nodeId).toEqual('node-1');
|
||||||
|
expect(health.quorumHealthy).toEqual(true);
|
||||||
|
expect(health.majorityHealthy).toEqual(true);
|
||||||
|
expect(peerIds).toEqual(['node-2', 'node-3']);
|
||||||
|
expect(health.erasure?.totalShards).toEqual(6);
|
||||||
|
expect(health.erasure?.erasureSetCount).toEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('all nodes should converge to the same multi-node topology', async () => {
|
||||||
|
for (const node of nodes) {
|
||||||
|
await waitFor(async () => {
|
||||||
|
const health = await node.getClusterHealth();
|
||||||
|
if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) {
|
||||||
|
throw new Error(JSON.stringify(health));
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('seed node should write shards to the declared remote drives', async () => {
|
||||||
|
const seed = nodes[0];
|
||||||
|
const descriptor = await seed.getStorageDescriptor();
|
||||||
|
const client = new S3Client({
|
||||||
|
endpoint: `http://${descriptor.endpoint}:${descriptor.port}`,
|
||||||
|
region: 'us-east-1',
|
||||||
|
credentials: {
|
||||||
|
accessKeyId: descriptor.accessKey,
|
||||||
|
secretAccessKey: descriptor.accessSecret,
|
||||||
|
},
|
||||||
|
forcePathStyle: true,
|
||||||
|
});
|
||||||
|
const bucket = 'multinode-bucket';
|
||||||
|
const key = 'distributed.txt';
|
||||||
|
const body = 'hello distributed shards';
|
||||||
|
|
||||||
|
await client.send(new CreateBucketCommand({ Bucket: bucket }));
|
||||||
|
await client.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: body }));
|
||||||
|
|
||||||
|
const getResponse = await client.send(new GetObjectCommand({ Bucket: bucket, Key: key }));
|
||||||
|
expect(await streamToString(getResponse.Body as Readable)).toEqual(body);
|
||||||
|
|
||||||
|
const manifestPath = join(
|
||||||
|
baseDir,
|
||||||
|
'node-1',
|
||||||
|
'storage',
|
||||||
|
'.manifests',
|
||||||
|
bucket,
|
||||||
|
`${key}.manifest.json`,
|
||||||
|
);
|
||||||
|
const manifest = JSON.parse(await readFile(manifestPath, 'utf8')) as {
|
||||||
|
chunks: Array<{
|
||||||
|
shardPlacements: Array<{ shardIndex: number; nodeId: string; driveId: string }>;
|
||||||
|
}>;
|
||||||
|
};
|
||||||
|
const placements = manifest.chunks[0].shardPlacements;
|
||||||
|
|
||||||
|
expect(placements.length).toEqual(6);
|
||||||
|
expect(placements.some((placement) => placement.nodeId === 'node-2' && placement.driveId === '1'))
|
||||||
|
.toEqual(true);
|
||||||
|
expect(placements.some((placement) => placement.nodeId === 'node-3' && placement.driveId === '1'))
|
||||||
|
.toEqual(true);
|
||||||
|
|
||||||
|
for (const placement of placements) {
|
||||||
|
const drivePath = makeDrivePaths(placement.nodeId)[Number(placement.driveId)];
|
||||||
|
const shardFile = `shard-${placement.shardIndex}.dat`;
|
||||||
|
expect(await fileExistsBelow(join(drivePath, '.smartstorage', 'data'), shardFile)).toEqual(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('restarted peer should keep durable identity and rejoin topology', async () => {
|
||||||
|
await nodes[1].stop();
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||||
|
|
||||||
|
nodes[1] = await smartstorage.SmartStorage.createAndStart({
|
||||||
|
server: {
|
||||||
|
address: '127.0.0.1',
|
||||||
|
port: 3351,
|
||||||
|
silent: true,
|
||||||
|
},
|
||||||
|
storage: {
|
||||||
|
directory: join(baseDir, 'node-2', 'storage'),
|
||||||
|
},
|
||||||
|
cluster: {
|
||||||
|
enabled: true,
|
||||||
|
nodeId: 'node-2',
|
||||||
|
quicPort: 4351,
|
||||||
|
seedNodes: ['127.0.0.1:4350'],
|
||||||
|
erasure: {
|
||||||
|
dataShards: 4,
|
||||||
|
parityShards: 2,
|
||||||
|
chunkSizeBytes: 1024 * 1024,
|
||||||
|
},
|
||||||
|
drives: {
|
||||||
|
paths: makeDrivePaths('node-2'),
|
||||||
|
},
|
||||||
|
heartbeatIntervalMs: 500,
|
||||||
|
heartbeatTimeoutMs: 3000,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await waitFor(async () => {
|
||||||
|
const health = await nodes[1].getClusterHealth();
|
||||||
|
if (health.nodeId !== 'node-2' || health.peers?.length !== 2) {
|
||||||
|
throw new Error(JSON.stringify(health));
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
const identityPath = join(
|
||||||
|
baseDir,
|
||||||
|
'node-2',
|
||||||
|
'storage',
|
||||||
|
'.smartstorage',
|
||||||
|
'cluster',
|
||||||
|
'identity.json',
|
||||||
|
);
|
||||||
|
const topologyPath = join(
|
||||||
|
baseDir,
|
||||||
|
'node-2',
|
||||||
|
'storage',
|
||||||
|
'.smartstorage',
|
||||||
|
'cluster',
|
||||||
|
'topology.json',
|
||||||
|
);
|
||||||
|
const identity = JSON.parse(await readFile(identityPath, 'utf8')) as {
|
||||||
|
nodeId: string;
|
||||||
|
clusterId: string;
|
||||||
|
};
|
||||||
|
const topology = JSON.parse(await readFile(topologyPath, 'utf8')) as {
|
||||||
|
cluster_id: string;
|
||||||
|
nodes: Array<{ node_id: string }>;
|
||||||
|
};
|
||||||
|
|
||||||
|
expect(identity.nodeId).toEqual('node-2');
|
||||||
|
expect(identity.clusterId).toEqual(topology.cluster_id);
|
||||||
|
expect(topology.nodes.some((node) => node.node_id === 'node-1')).toEqual(true);
|
||||||
|
expect(topology.nodes.some((node) => node.node_id === 'node-3')).toEqual(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('teardown: stop clustered nodes and clean files', async () => {
|
||||||
|
for (const node of nodes.reverse()) {
|
||||||
|
await node.stop();
|
||||||
|
}
|
||||||
|
await rm(baseDir, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
@@ -65,11 +65,11 @@ tap.test('startup credentials authenticate successfully', async () => {
|
|||||||
expect(response.$metadata.httpStatusCode).toEqual(200);
|
expect(response.$metadata.httpStatusCode).toEqual(200);
|
||||||
});
|
});
|
||||||
|
|
||||||
tap.test('listCredentials returns the active startup credential set', async () => {
|
tap.test('listCredentials returns active credential metadata without secrets', async () => {
|
||||||
const credentials = await testSmartStorageInstance.listCredentials();
|
const credentials = await testSmartStorageInstance.listCredentials();
|
||||||
expect(credentials.length).toEqual(1);
|
expect(credentials.length).toEqual(1);
|
||||||
expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId);
|
expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId);
|
||||||
expect(credentials[0].secretAccessKey).toEqual(INITIAL_CREDENTIAL.secretAccessKey);
|
expect((credentials[0] as any).secretAccessKey).toEqual(undefined);
|
||||||
});
|
});
|
||||||
|
|
||||||
tap.test('invalid replacement input fails cleanly and leaves old credentials active', async () => {
|
tap.test('invalid replacement input fails cleanly and leaves old credentials active', async () => {
|
||||||
|
|||||||
@@ -0,0 +1,50 @@
|
|||||||
|
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||||
|
import * as smartstorage from '../ts/index.js';
|
||||||
|
|
||||||
|
const TEST_PORT = 3353;
|
||||||
|
let testSmartStorageInstance: smartstorage.SmartStorage;
|
||||||
|
|
||||||
|
tap.test('setup: start storage server for operational endpoint checks', async () => {
|
||||||
|
testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({
|
||||||
|
server: {
|
||||||
|
port: TEST_PORT,
|
||||||
|
silent: true,
|
||||||
|
region: 'us-east-1',
|
||||||
|
},
|
||||||
|
storage: {
|
||||||
|
cleanSlate: true,
|
||||||
|
},
|
||||||
|
auth: {
|
||||||
|
enabled: false,
|
||||||
|
credentials: [],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('operational endpoints expose live ready health and metrics', async () => {
|
||||||
|
const live = await fetch(`http://localhost:${TEST_PORT}/-/live`);
|
||||||
|
expect(live.status).toEqual(200);
|
||||||
|
expect((await live.json()).status).toEqual('alive');
|
||||||
|
|
||||||
|
const ready = await fetch(`http://localhost:${TEST_PORT}/-/ready`);
|
||||||
|
expect(ready.status).toEqual(200);
|
||||||
|
expect((await ready.json()).status).toEqual('ready');
|
||||||
|
|
||||||
|
const health = await fetch(`http://localhost:${TEST_PORT}/-/health`);
|
||||||
|
expect(health.status).toEqual(200);
|
||||||
|
const healthBody = await health.json();
|
||||||
|
expect(healthBody.ok).toEqual(true);
|
||||||
|
expect(healthBody.cluster.enabled).toEqual(false);
|
||||||
|
|
||||||
|
const metrics = await fetch(`http://localhost:${TEST_PORT}/-/metrics`);
|
||||||
|
expect(metrics.status).toEqual(200);
|
||||||
|
const metricsBody = await metrics.text();
|
||||||
|
expect(metricsBody.includes('smartstorage_requests_total')).toEqual(true);
|
||||||
|
expect(metricsBody.includes('smartstorage_cluster_enabled 0')).toEqual(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
tap.test('teardown: stop storage server', async () => {
|
||||||
|
await testSmartStorageInstance.stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
export default tap.start();
|
||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: '@push.rocks/smartstorage',
|
name: '@push.rocks/smartstorage',
|
||||||
version: '6.3.3',
|
version: '6.4.1',
|
||||||
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.'
|
||||||
}
|
}
|
||||||
|
|||||||
+6
-2
@@ -9,6 +9,10 @@ export interface IStorageCredential {
|
|||||||
secretAccessKey: string;
|
secretAccessKey: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface IStorageCredentialMetadata {
|
||||||
|
accessKeyId: string;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Authentication configuration
|
* Authentication configuration
|
||||||
*/
|
*/
|
||||||
@@ -311,7 +315,7 @@ type TRustStorageCommands = {
|
|||||||
createBucket: { params: { name: string }; result: {} };
|
createBucket: { params: { name: string }; result: {} };
|
||||||
getStorageStats: { params: {}; result: IStorageStats };
|
getStorageStats: { params: {}; result: IStorageStats };
|
||||||
listBucketSummaries: { params: {}; result: IBucketSummary[] };
|
listBucketSummaries: { params: {}; result: IBucketSummary[] };
|
||||||
listCredentials: { params: {}; result: IStorageCredential[] };
|
listCredentials: { params: {}; result: IStorageCredentialMetadata[] };
|
||||||
replaceCredentials: { params: { credentials: IStorageCredential[] }; result: {} };
|
replaceCredentials: { params: { credentials: IStorageCredential[] }; result: {} };
|
||||||
getClusterHealth: { params: {}; result: IClusterHealth };
|
getClusterHealth: { params: {}; result: IClusterHealth };
|
||||||
};
|
};
|
||||||
@@ -391,7 +395,7 @@ export class SmartStorage {
|
|||||||
return this.bridge.sendCommand('listBucketSummaries', {});
|
return this.bridge.sendCommand('listBucketSummaries', {});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async listCredentials(): Promise<IStorageCredential[]> {
|
public async listCredentials(): Promise<IStorageCredentialMetadata[]> {
|
||||||
return this.bridge.sendCommand('listCredentials', {});
|
return this.bridge.sendCommand('listCredentials', {});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@
|
|||||||
"esModuleInterop": true,
|
"esModuleInterop": true,
|
||||||
"verbatimModuleSyntax": true,
|
"verbatimModuleSyntax": true,
|
||||||
"types": ["node"],
|
"types": ["node"],
|
||||||
|
"noImplicitAny": true,
|
||||||
"ignoreDeprecations": "6.0",
|
"ignoreDeprecations": "6.0",
|
||||||
"baseUrl": ".",
|
"baseUrl": ".",
|
||||||
"paths": {}
|
"paths": {}
|
||||||
|
|||||||
Reference in New Issue
Block a user