From a31e4773599afef3d117b1e08ef6f53cc85965d3 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Thu, 30 Apr 2026 06:08:42 +0000 Subject: [PATCH] feat(cluster,server,auth): add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings --- changelog.md | 9 + readme.hints.md | 13 ++ rust/src/auth.rs | 17 +- rust/src/cluster/coordinator.rs | 10 + rust/src/cluster/membership.rs | 45 ++-- rust/src/cluster/mod.rs | 1 + rust/src/cluster/persistence.rs | 77 +++++++ rust/src/cluster/protocol.rs | 4 + rust/src/cluster/quic_transport.rs | 244 +++++++++++++++++++-- rust/src/cluster/state.rs | 152 ++++++++----- rust/src/server.rs | 290 ++++++++++++++++++++++--- test/test.cluster-multinode.node.ts | 317 ++++++++++++++++++++++++++++ test/test.credentials.node.ts | 4 +- test/test.health-http.node.ts | 50 +++++ ts/00_commitinfo_data.ts | 2 +- ts/index.ts | 8 +- 16 files changed, 1120 insertions(+), 123 deletions(-) create mode 100644 rust/src/cluster/persistence.rs create mode 100644 test/test.cluster-multinode.node.ts create mode 100644 test/test.health-http.node.ts diff --git a/changelog.md b/changelog.md index 202bc76..7704aef 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-04-30 - 6.4.0 - feat(cluster,server,auth) +add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings + +- persist cluster identity and topology snapshots under .smartstorage/cluster to support safer clustered restarts and seed-node joins +- add unauthenticated /-/live, /-/ready, /-/health, and /-/metrics endpoints with basic request and storage metrics +- route clustered shard read/write/delete/head operations by drive index and handle join, heartbeat, and topology sync over QUIC +- change runtime credential listing to return access-key metadata only, excluding secretAccessKey values +- add tests for operational endpoints and multi-node cluster persistence and recovery behavior + ## 2026-04-19 - 6.3.3 - fix(build) rename npmextra config to .smartconfig and refresh build metadata diff --git a/readme.hints.md b/readme.hints.md index f3b7350..add259c 100644 --- a/readme.hints.md +++ b/readme.hints.md @@ -14,6 +14,9 @@ - Runtime bucket summaries and storage stats via the Rust bridge (no S3 list scans) - Cluster health introspection via the Rust bridge (node membership, local drive probes, quorum, healing state) - Runtime credential listing and atomic replacement via the Rust bridge +- Cluster identity and topology snapshots persist under `{storage}/.smartstorage/cluster/` +- S3-side operational endpoints are available at `/-/live`, `/-/ready`, `/-/health`, and `/-/metrics` +- Runtime credential listing returns access-key metadata only; secrets are write-only ## Architecture @@ -47,6 +50,15 @@ | `replaceCredentials` | `{ credentials: IStorageCredential[] }` | Atomically replace the runtime auth credential set | | `getClusterHealth` | `{}` | Return runtime cluster health or `{ enabled: false }` in standalone mode | +### Operational HTTP Endpoints + +| Endpoint | Purpose | +|----------|---------| +| `GET /-/live` | Process liveness | +| `GET /-/ready` | S3 readiness and cluster quorum readiness | +| `GET /-/health` | JSON storage, cluster, and runtime health | +| `GET /-/metrics` | Prometheus text metrics | + ### Storage Layout - Objects: `{root}/{bucket}/{key}._storage_object` - Metadata: `{root}/{bucket}/{key}._storage_object.metadata.json` @@ -71,6 +83,7 @@ - `test/test.aws-sdk.node.ts` - AWS SDK v3 compatibility + runtime stats + standalone cluster health coverage (19 tests, auth disabled, port 3337) - `test/test.credentials.node.ts` - runtime credential rotation coverage (10 tests, auth enabled, port 3349) +- `test/test.health-http.node.ts` - unauthenticated operational endpoint coverage (3 tests, port 3353) - `test/test.cluster-health.node.ts` - single-node cluster health coverage (4 tests, S3 port 3348, QUIC port 4348) - `test/test.auth.node.ts` - Auth + bucket policy integration (20 tests, auth enabled, port 3344) - `test/test.policy-crud.node.ts` - Policy API CRUD + validation edge cases (17 tests, port 3345) diff --git a/rust/src/auth.rs b/rust/src/auth.rs index cc853a6..8a15cc4 100644 --- a/rust/src/auth.rs +++ b/rust/src/auth.rs @@ -176,6 +176,12 @@ pub struct RuntimeCredentialStore { credentials: RwLock>, } +#[derive(Debug, Clone, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CredentialMetadata { + pub access_key_id: String, +} + impl RuntimeCredentialStore { pub fn new(config: &AuthConfig) -> Self { Self { @@ -188,8 +194,15 @@ impl RuntimeCredentialStore { self.enabled } - pub async fn list_credentials(&self) -> Vec { - self.credentials.read().await.clone() + pub async fn list_credentials(&self) -> Vec { + self.credentials + .read() + .await + .iter() + .map(|credential| CredentialMetadata { + access_key_id: credential.access_key_id.clone(), + }) + .collect() } pub async fn snapshot_credentials(&self) -> Vec { diff --git a/rust/src/cluster/coordinator.rs b/rust/src/cluster/coordinator.rs index d70b849..e5399a4 100644 --- a/rust/src/cluster/coordinator.rs +++ b/rust/src/cluster/coordinator.rs @@ -408,6 +408,7 @@ impl DistributedStore { key, chunk.chunk_index, placement.shard_index, + placement.drive_id.parse::().unwrap_or(0), ) .await { @@ -930,6 +931,7 @@ impl DistributedStore { &part_info.part_key, chunk.chunk_index, placement.shard_index, + placement.drive_id.parse::().unwrap_or(0), ) .await; } @@ -1271,6 +1273,7 @@ impl DistributedStore { key, chunk_index, shard_idx as u32, + drive.drive_index, shard_data, checksum, ) @@ -1330,6 +1333,7 @@ impl DistributedStore { key: &str, chunk_index: u32, shard_index: u32, + drive_index: u32, data: &[u8], checksum: u32, ) -> Result<()> { @@ -1348,6 +1352,7 @@ impl DistributedStore { key: key.to_string(), chunk_index, shard_index, + drive_index, shard_data_length: data.len() as u64, checksum, object_metadata: HashMap::new(), @@ -1417,6 +1422,7 @@ impl DistributedStore { key, chunk.chunk_index, placement.shard_index, + placement.drive_id.parse::().unwrap_or(0), ) .await .ok() @@ -1448,6 +1454,7 @@ impl DistributedStore { key: &str, chunk_index: u32, shard_index: u32, + drive_index: u32, ) -> Result<(Vec, u32)> { let node_info = self .state @@ -1464,6 +1471,7 @@ impl DistributedStore { key: key.to_string(), chunk_index, shard_index, + drive_index, }); match self.transport.send_shard_read(&conn, &request).await? { @@ -1479,6 +1487,7 @@ impl DistributedStore { key: &str, chunk_index: u32, shard_index: u32, + drive_index: u32, ) -> Result<()> { let node_info = self .state @@ -1495,6 +1504,7 @@ impl DistributedStore { key: key.to_string(), chunk_index, shard_index, + drive_index, }); let _response = self.transport.send_request(&conn, &request).await?; diff --git a/rust/src/cluster/membership.rs b/rust/src/cluster/membership.rs index 42891a6..9093021 100644 --- a/rust/src/cluster/membership.rs +++ b/rust/src/cluster/membership.rs @@ -18,6 +18,7 @@ pub struct MembershipManager { state: Arc, transport: Arc, heartbeat_interval: Duration, + heartbeat_timeout: Duration, local_node_info: NodeInfo, drive_manager: Option>>, } @@ -27,12 +28,14 @@ impl MembershipManager { state: Arc, transport: Arc, heartbeat_interval_ms: u64, + heartbeat_timeout_ms: u64, local_node_info: NodeInfo, ) -> Self { Self { state, transport, heartbeat_interval: Duration::from_millis(heartbeat_interval_ms), + heartbeat_timeout: Duration::from_millis(heartbeat_timeout_ms), local_node_info, drive_manager: None, } @@ -46,7 +49,7 @@ impl MembershipManager { /// Join the cluster by contacting seed nodes. /// Sends a JoinRequest to each seed node until one accepts. - pub async fn join_cluster(&self, seed_nodes: &[String]) -> Result<()> { + pub async fn join_cluster(&self, seed_nodes: &[String], allow_bootstrap_on_failure: bool) -> Result<()> { if seed_nodes.is_empty() { tracing::info!("No seed nodes configured, starting as initial cluster node"); self.state.add_node(self.local_node_info.clone()).await; @@ -75,10 +78,13 @@ impl MembershipManager { } } - // If no seed responded, start as a new cluster - tracing::info!("Could not reach any seed nodes, starting as initial cluster node"); - self.state.add_node(self.local_node_info.clone()).await; - Ok(()) + if allow_bootstrap_on_failure { + tracing::warn!("Could not reach any seed nodes, bootstrapping a new cluster because no persisted topology exists"); + self.state.add_node(self.local_node_info.clone()).await; + return Ok(()); + } + + anyhow::bail!("Could not reach any configured seed nodes; refusing unsafe cluster bootstrap") } async fn try_join(&self, addr: SocketAddr) -> Result<()> { @@ -97,9 +103,14 @@ impl MembershipManager { ClusterResponse::JoinResponse(join_resp) => { if join_resp.accepted { if let Some(topology) = &join_resp.topology { + let topology_contains_self = topology + .nodes + .iter() + .any(|node| node.node_id == self.local_node_info.node_id); self.state.apply_topology(topology).await; - // Also register self - self.state.add_node(self.local_node_info.clone()).await; + if !topology_contains_self { + self.state.add_node(self.local_node_info.clone()).await; + } tracing::info!( "Applied cluster topology (version {}, {} nodes, {} erasure sets)", topology.version, @@ -137,7 +148,13 @@ impl MembershipManager { } async fn send_heartbeats(&self) { - let peers = self.state.online_peers().await; + let peers = self + .state + .all_nodes() + .await + .into_iter() + .filter(|node| node.info.node_id != self.local_node_info.node_id) + .collect::>(); let topology_version = self.state.version().await; let mut responded = Vec::new(); @@ -145,7 +162,7 @@ impl MembershipManager { let drive_states = self.collect_drive_states().await; for peer in &peers { - let addr: SocketAddr = match peer.quic_addr.parse() { + let addr: SocketAddr = match peer.info.quic_addr.parse() { Ok(a) => a, Err(_) => continue, }; @@ -158,23 +175,23 @@ impl MembershipManager { }); match tokio::time::timeout( - Duration::from_secs(5), - self.send_heartbeat_to_peer(&peer.node_id, addr, &heartbeat), + self.heartbeat_timeout, + self.send_heartbeat_to_peer(&peer.info.node_id, addr, &heartbeat), ) .await { Ok(Ok(())) => { - responded.push(peer.node_id.clone()); + responded.push(peer.info.node_id.clone()); } Ok(Err(e)) => { tracing::debug!( - peer = %peer.node_id, + peer = %peer.info.node_id, error = %e, "Heartbeat failed" ); } Err(_) => { - tracing::debug!(peer = %peer.node_id, "Heartbeat timed out"); + tracing::debug!(peer = %peer.info.node_id, "Heartbeat timed out"); } } } diff --git a/rust/src/cluster/mod.rs b/rust/src/cluster/mod.rs index 87a280c..daa547a 100644 --- a/rust/src/cluster/mod.rs +++ b/rust/src/cluster/mod.rs @@ -9,6 +9,7 @@ pub mod erasure; pub mod healing; pub mod membership; pub mod metadata; +pub mod persistence; pub mod placement; pub mod protocol; pub mod quic_transport; diff --git a/rust/src/cluster/persistence.rs b/rust/src/cluster/persistence.rs new file mode 100644 index 0000000..8a9ce15 --- /dev/null +++ b/rust/src/cluster/persistence.rs @@ -0,0 +1,77 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::path::{Path, PathBuf}; +use tokio::fs; + +use super::protocol::ClusterTopology; + +const CLUSTER_METADATA_DIR: &str = ".smartstorage/cluster"; +const IDENTITY_FILE: &str = "identity.json"; +const TOPOLOGY_FILE: &str = "topology.json"; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ClusterIdentity { + pub schema_version: u32, + pub node_id: String, + pub cluster_id: String, +} + +impl ClusterIdentity { + pub fn new(node_id: String, cluster_id: String) -> Self { + Self { + schema_version: 1, + node_id, + cluster_id, + } + } +} + +pub fn cluster_metadata_dir(storage_directory: &str) -> PathBuf { + PathBuf::from(storage_directory).join(CLUSTER_METADATA_DIR) +} + +pub fn identity_path(metadata_dir: &Path) -> PathBuf { + metadata_dir.join(IDENTITY_FILE) +} + +pub fn topology_path(metadata_dir: &Path) -> PathBuf { + metadata_dir.join(TOPOLOGY_FILE) +} + +pub async fn load_identity(path: &Path) -> Result> { + match fs::read_to_string(path).await { + Ok(content) => Ok(Some(serde_json::from_str(&content)?)), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + +pub async fn persist_identity(path: &Path, identity: &ClusterIdentity) -> Result<()> { + write_json_atomic(path, identity).await +} + +pub async fn load_topology(path: &Path) -> Result> { + match fs::read_to_string(path).await { + Ok(content) => Ok(Some(serde_json::from_str(&content)?)), + Err(error) if error.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(error) => Err(error.into()), + } +} + +pub async fn persist_topology(path: &Path, topology: &ClusterTopology) -> Result<()> { + write_json_atomic(path, topology).await +} + +async fn write_json_atomic(path: &Path, value: &T) -> Result<()> { + let parent = path + .parent() + .ok_or_else(|| anyhow::anyhow!("Cluster metadata path has no parent"))?; + fs::create_dir_all(parent).await?; + + let temp_path = path.with_extension("json.tmp"); + let content = serde_json::to_string_pretty(value)?; + fs::write(&temp_path, content).await?; + fs::rename(&temp_path, path).await?; + Ok(()) +} diff --git a/rust/src/cluster/protocol.rs b/rust/src/cluster/protocol.rs index b8bd229..03b6e0c 100644 --- a/rust/src/cluster/protocol.rs +++ b/rust/src/cluster/protocol.rs @@ -102,6 +102,7 @@ pub struct ShardWriteRequest { pub key: String, pub chunk_index: u32, pub shard_index: u32, + pub drive_index: u32, pub shard_data_length: u64, pub checksum: u32, // crc32c of shard data pub object_metadata: HashMap, @@ -121,6 +122,7 @@ pub struct ShardReadRequest { pub key: String, pub chunk_index: u32, pub shard_index: u32, + pub drive_index: u32, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -139,6 +141,7 @@ pub struct ShardDeleteRequest { pub key: String, pub chunk_index: u32, pub shard_index: u32, + pub drive_index: u32, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -154,6 +157,7 @@ pub struct ShardHeadRequest { pub key: String, pub chunk_index: u32, pub shard_index: u32, + pub drive_index: u32, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/rust/src/cluster/quic_transport.rs b/rust/src/cluster/quic_transport.rs index 16485e1..dbce352 100644 --- a/rust/src/cluster/quic_transport.rs +++ b/rust/src/cluster/quic_transport.rs @@ -8,6 +8,7 @@ use super::protocol::{ self, ClusterRequest, ClusterResponse, ShardReadResponse, ShardWriteAck, ShardWriteRequest, }; use super::shard_store::{ShardId, ShardStore}; +use super::state::{ClusterState, NodeStatus}; /// QUIC transport layer for inter-node communication. /// @@ -26,11 +27,8 @@ impl QuicTransport { pub async fn new(bind_addr: SocketAddr, local_node_id: String) -> Result { let (server_config, client_config) = Self::generate_tls_configs()?; - let endpoint = Endpoint::server(server_config, bind_addr)?; - - // Also configure the endpoint for client connections - let mut endpoint_client = endpoint.clone(); - endpoint_client.set_default_client_config(client_config); + let mut endpoint = Endpoint::server(server_config, bind_addr)?; + endpoint.set_default_client_config(client_config); Ok(Self { endpoint, @@ -163,7 +161,8 @@ impl QuicTransport { /// Accept incoming connections and dispatch to the handler. pub async fn accept_loop( self: Arc, - shard_store: Arc, + shard_stores: Vec>, + cluster_state: Option>, mut shutdown: tokio::sync::watch::Receiver, ) { loop { @@ -172,11 +171,12 @@ impl QuicTransport { match incoming { Some(incoming_conn) => { let transport = self.clone(); - let store = shard_store.clone(); + let stores = shard_stores.clone(); + let state = cluster_state.clone(); tokio::spawn(async move { match incoming_conn.await { Ok(conn) => { - transport.handle_connection(conn, store).await; + transport.handle_connection(conn, stores, state).await; } Err(e) => { tracing::error!("Failed to accept QUIC connection: {}", e); @@ -194,16 +194,19 @@ impl QuicTransport { /// Handle a single QUIC connection (may have multiple streams). async fn handle_connection( - &self, + self: Arc, conn: quinn::Connection, - shard_store: Arc, + shard_stores: Vec>, + cluster_state: Option>, ) { loop { match conn.accept_bi().await { Ok((send, recv)) => { - let store = shard_store.clone(); + let stores = shard_stores.clone(); + let state = cluster_state.clone(); + let transport = self.clone(); tokio::spawn(async move { - if let Err(e) = Self::handle_stream(send, recv, store).await { + if let Err(e) = transport.handle_stream(send, recv, stores, state).await { tracing::error!("Stream handler error: {}", e); } }); @@ -219,9 +222,11 @@ impl QuicTransport { /// Handle a single bidirectional stream (one request-response exchange). async fn handle_stream( + self: Arc, mut send: quinn::SendStream, mut recv: quinn::RecvStream, - shard_store: Arc, + shard_stores: Vec>, + cluster_state: Option>, ) -> Result<()> { // Read the full request (length-prefixed bincode + optional trailing data) let raw = recv.read_to_end(64 * 1024 * 1024).await?; // 64MB max @@ -231,6 +236,7 @@ impl QuicTransport { ClusterRequest::ShardWrite(write_req) => { // Shard data follows the header in the raw buffer let shard_data = &raw[header_len..]; + let drive_index = write_req.drive_index; let shard_id = ShardId { bucket: write_req.bucket, @@ -239,9 +245,10 @@ impl QuicTransport { shard_index: write_req.shard_index, }; - let result = shard_store - .write_shard(&shard_id, &shard_data, write_req.checksum) - .await; + let result = match Self::shard_store_for_drive(&shard_stores, drive_index) { + Ok(store) => store.write_shard(&shard_id, &shard_data, write_req.checksum).await, + Err(error) => Err(error), + }; let ack = ShardWriteAck { request_id: write_req.request_id, @@ -254,6 +261,7 @@ impl QuicTransport { } ClusterRequest::ShardRead(read_req) => { + let drive_index = read_req.drive_index; let shard_id = ShardId { bucket: read_req.bucket, key: read_req.key, @@ -261,7 +269,15 @@ impl QuicTransport { shard_index: read_req.shard_index, }; - match shard_store.read_shard(&shard_id).await { + let store = match Self::shard_store_for_drive(&shard_stores, drive_index) { + Ok(store) => store, + Err(error) => { + Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?; + return Ok(()); + } + }; + + match store.read_shard(&shard_id).await { Ok((data, checksum)) => { let header = ShardReadResponse { request_id: read_req.request_id, @@ -293,13 +309,17 @@ impl QuicTransport { } ClusterRequest::ShardDelete(del_req) => { + let drive_index = del_req.drive_index; let shard_id = ShardId { bucket: del_req.bucket, key: del_req.key, chunk_index: del_req.chunk_index, shard_index: del_req.shard_index, }; - let result = shard_store.delete_shard(&shard_id).await; + let result = match Self::shard_store_for_drive(&shard_stores, drive_index) { + Ok(store) => store.delete_shard(&shard_id).await, + Err(error) => Err(error), + }; let ack = protocol::ClusterResponse::ShardDeleteAck(protocol::ShardDeleteAck { request_id: del_req.request_id, success: result.is_ok(), @@ -310,13 +330,22 @@ impl QuicTransport { } ClusterRequest::ShardHead(head_req) => { + let drive_index = head_req.drive_index; let shard_id = ShardId { bucket: head_req.bucket, key: head_req.key, chunk_index: head_req.chunk_index, shard_index: head_req.shard_index, }; - let resp = match shard_store.head_shard(&shard_id).await { + let store = match Self::shard_store_for_drive(&shard_stores, drive_index) { + Ok(store) => store, + Err(error) => { + Self::send_error_response(&mut send, "InvalidDrive", error.to_string()).await?; + return Ok(()); + } + }; + + let resp = match store.head_shard(&shard_id).await { Ok(Some(meta)) => protocol::ShardHeadResponse { request_id: head_req.request_id, found: true, @@ -336,9 +365,103 @@ impl QuicTransport { send.finish()?; } - // Heartbeat, Join, TopologySync, Heal, and Manifest operations - // will be handled by the membership and coordinator modules. - // For now, send a generic ack. + ClusterRequest::JoinRequest(join_req) => { + let Some(state) = cluster_state else { + let err = protocol::ErrorResponse { + request_id: String::new(), + code: "ClusterDisabled".to_string(), + message: "Cluster state is not available".to_string(), + }; + let response = protocol::encode_response(&ClusterResponse::Error(err))?; + send.write_all(&response).await?; + send.finish()?; + return Ok(()); + }; + + let joining_node_id = join_req.node_info.node_id.clone(); + state.add_node(join_req.node_info).await; + let topology = state.to_topology().await; + let node_drives: Vec<(String, u32)> = topology + .nodes + .iter() + .map(|node| (node.node_id.clone(), node.drive_count)) + .collect(); + let erasure_sets = super::placement::form_erasure_sets( + &node_drives, + topology.data_shards + topology.parity_shards, + ); + state.set_erasure_sets(erasure_sets).await; + + let response_topology = state.to_topology().await; + let response = protocol::encode_response(&ClusterResponse::JoinResponse( + protocol::JoinResponseMessage { + accepted: true, + topology: Some(response_topology.clone()), + error: None, + }, + ))?; + send.write_all(&response).await?; + send.finish()?; + + self.broadcast_topology(&state, Some(response_topology), None, Some(&joining_node_id)).await; + } + + ClusterRequest::Heartbeat(heartbeat) => { + let Some(state) = cluster_state else { + let err = protocol::ErrorResponse { + request_id: String::new(), + code: "ClusterDisabled".to_string(), + message: "Cluster state is not available".to_string(), + }; + let response = protocol::encode_response(&ClusterResponse::Error(err))?; + send.write_all(&response).await?; + send.finish()?; + return Ok(()); + }; + + let peer_node_id = heartbeat.node_id.clone(); + let peer_topology_version = heartbeat.topology_version; + state.record_heartbeat(&heartbeat.node_id).await; + let local_topology_version = state.version().await; + let response = protocol::encode_response(&ClusterResponse::HeartbeatAck( + protocol::HeartbeatAckMessage { + node_id: state.local_node_id().to_string(), + timestamp: chrono::Utc::now().to_rfc3339(), + topology_version: local_topology_version, + }, + ))?; + send.write_all(&response).await?; + send.finish()?; + + if local_topology_version > peer_topology_version { + self.broadcast_topology(&state, None, Some(&peer_node_id), None).await; + } + } + + ClusterRequest::TopologySync(sync) => { + let Some(state) = cluster_state else { + let err = protocol::ErrorResponse { + request_id: String::new(), + code: "ClusterDisabled".to_string(), + message: "Cluster state is not available".to_string(), + }; + let response = protocol::encode_response(&ClusterResponse::Error(err))?; + send.write_all(&response).await?; + send.finish()?; + return Ok(()); + }; + + state.apply_topology(&sync.topology).await; + let response = protocol::encode_response(&ClusterResponse::TopologySyncAck( + protocol::TopologySyncAckMessage { + accepted: true, + current_version: state.version().await, + }, + ))?; + send.write_all(&response).await?; + send.finish()?; + } + _ => { let err = protocol::ErrorResponse { request_id: String::new(), @@ -354,6 +477,83 @@ impl QuicTransport { Ok(()) } + fn shard_store_for_drive( + shard_stores: &[Arc], + drive_index: u32, + ) -> Result> { + shard_stores + .get(drive_index as usize) + .cloned() + .ok_or_else(|| anyhow::anyhow!("Drive {} not found", drive_index)) + } + + async fn send_error_response( + send: &mut quinn::SendStream, + code: &str, + message: String, + ) -> Result<()> { + let err = protocol::ErrorResponse { + request_id: String::new(), + code: code.to_string(), + message, + }; + let response = protocol::encode_response(&ClusterResponse::Error(err))?; + send.write_all(&response).await?; + send.finish()?; + Ok(()) + } + + async fn broadcast_topology( + &self, + state: &Arc, + topology: Option, + target_node_id: Option<&str>, + skip_node_id: Option<&str>, + ) { + let topology = match topology { + Some(topology) => topology, + None => state.to_topology().await, + }; + let nodes = state.all_nodes().await; + for node in nodes { + if node.info.node_id == state.local_node_id() { + continue; + } + if let Some(target_node_id) = target_node_id { + if node.info.node_id != target_node_id { + continue; + } + } + if matches!(skip_node_id, Some(skip_node_id) if node.info.node_id == skip_node_id) { + continue; + } + if node.status != NodeStatus::Online { + continue; + } + + let addr = match node.info.quic_addr.parse() { + Ok(addr) => addr, + Err(error) => { + tracing::warn!(node = %node.info.node_id, error = %error, "Skipping topology sync for invalid peer address"); + continue; + } + }; + let conn = match self.get_connection(&node.info.node_id, addr).await { + Ok(conn) => conn, + Err(error) => { + tracing::warn!(node = %node.info.node_id, error = %error, "Failed to connect for topology sync"); + continue; + } + }; + let request = ClusterRequest::TopologySync(protocol::TopologySyncMessage { + topology: topology.clone(), + }); + if let Err(error) = self.send_request(&conn, &request).await { + tracing::warn!(node = %node.info.node_id, error = %error, "Failed to send topology sync"); + } + } + } + /// Generate self-signed TLS certificates for cluster-internal communication. fn generate_tls_configs() -> Result<(QuinnServerConfig, ClientConfig)> { // Generate self-signed certificate diff --git a/rust/src/cluster/state.rs b/rust/src/cluster/state.rs index 69befc6..6e6ce13 100644 --- a/rust/src/cluster/state.rs +++ b/rust/src/cluster/state.rs @@ -1,8 +1,10 @@ use std::collections::HashMap; +use std::path::PathBuf; use std::sync::Arc; use tokio::sync::RwLock; use super::placement::{DriveLocation, ErasureSet}; +use super::persistence; use super::protocol::{ClusterTopology, ErasureSetInfo, DriveLocationInfo, NodeInfo}; /// Node status for tracking liveness. @@ -26,6 +28,7 @@ pub struct NodeState { pub struct ClusterState { inner: Arc>, local_node_id: String, + topology_path: Option, } struct ClusterStateInner { @@ -43,6 +46,7 @@ impl ClusterState { cluster_id: String, data_shards: usize, parity_shards: usize, + topology_path: Option, ) -> Self { Self { inner: Arc::new(RwLock::new(ClusterStateInner { @@ -54,6 +58,7 @@ impl ClusterState { parity_shards, })), local_node_id, + topology_path, } } @@ -61,27 +66,37 @@ impl ClusterState { &self.local_node_id } + pub async fn cluster_id(&self) -> String { + self.inner.read().await.cluster_id.clone() + } + /// Register a node in the cluster. pub async fn add_node(&self, info: NodeInfo) { - let mut inner = self.inner.write().await; - let node_id = info.node_id.clone(); - inner.nodes.insert( - node_id, - NodeState { - info, - status: NodeStatus::Online, - missed_heartbeats: 0, - last_heartbeat: chrono::Utc::now(), - }, - ); - inner.version += 1; + { + let mut inner = self.inner.write().await; + let node_id = info.node_id.clone(); + inner.nodes.insert( + node_id, + NodeState { + info, + status: NodeStatus::Online, + missed_heartbeats: 0, + last_heartbeat: chrono::Utc::now(), + }, + ); + inner.version += 1; + } + self.persist_topology_snapshot().await; } /// Remove a node from the cluster. pub async fn remove_node(&self, node_id: &str) { - let mut inner = self.inner.write().await; - inner.nodes.remove(node_id); - inner.version += 1; + { + let mut inner = self.inner.write().await; + inner.nodes.remove(node_id); + inner.version += 1; + } + self.persist_topology_snapshot().await; } /// Update heartbeat for a node (reset missed count). @@ -133,9 +148,12 @@ impl ClusterState { /// Set erasure sets (typically done once during cluster formation). pub async fn set_erasure_sets(&self, sets: Vec) { - let mut inner = self.inner.write().await; - inner.erasure_sets = sets; - inner.version += 1; + { + let mut inner = self.inner.write().await; + inner.erasure_sets = sets; + inner.version += 1; + } + self.persist_topology_snapshot().await; } /// Get the erasure set for a given object based on consistent hashing. @@ -244,48 +262,82 @@ impl ClusterState { /// Import topology from a protocol message (e.g., received from a peer during join). pub async fn apply_topology(&self, topology: &ClusterTopology) { - let mut inner = self.inner.write().await; + let applied = { + let mut inner = self.inner.write().await; - // Only apply if newer - if topology.version <= inner.version { - return; - } + // Only apply if newer and from the same cluster lineage. A node that has not yet + // joined any topology may adopt the seed cluster ID during its first join. + if topology.version <= inner.version { + return; + } + if topology.cluster_id != inner.cluster_id { + if inner.nodes.is_empty() { + inner.cluster_id = topology.cluster_id.clone(); + } else { + return; + } + } - inner.cluster_id = topology.cluster_id.clone(); - inner.version = topology.version; - inner.data_shards = topology.data_shards; - inner.parity_shards = topology.parity_shards; + inner.version = topology.version; + inner.data_shards = topology.data_shards; + inner.parity_shards = topology.parity_shards; - // Update nodes - for node_info in &topology.nodes { - if !inner.nodes.contains_key(&node_info.node_id) { + let now = chrono::Utc::now(); + for node_info in &topology.nodes { + let existing_status = inner.nodes.get(&node_info.node_id).map(|node| node.status.clone()); + let existing_missed_heartbeats = inner + .nodes + .get(&node_info.node_id) + .map(|node| node.missed_heartbeats); + let existing_last_heartbeat = inner + .nodes + .get(&node_info.node_id) + .map(|node| node.last_heartbeat); inner.nodes.insert( node_info.node_id.clone(), NodeState { info: node_info.clone(), - status: NodeStatus::Online, - missed_heartbeats: 0, - last_heartbeat: chrono::Utc::now(), + status: existing_status.unwrap_or(NodeStatus::Online), + missed_heartbeats: existing_missed_heartbeats.unwrap_or(0), + last_heartbeat: existing_last_heartbeat.unwrap_or(now), }, ); } - } - // Update erasure sets - inner.erasure_sets = topology - .erasure_sets - .iter() - .map(|set| ErasureSet { - set_id: set.set_id, - drives: set - .drives - .iter() - .map(|d| DriveLocation { - node_id: d.node_id.clone(), - drive_index: d.drive_index, - }) - .collect(), - }) - .collect(); + inner.nodes.retain(|node_id, _| topology.nodes.iter().any(|node| &node.node_id == node_id)); + + // Update erasure sets + inner.erasure_sets = topology + .erasure_sets + .iter() + .map(|set| ErasureSet { + set_id: set.set_id, + drives: set + .drives + .iter() + .map(|d| DriveLocation { + node_id: d.node_id.clone(), + drive_index: d.drive_index, + }) + .collect(), + }) + .collect(); + true + }; + + if applied { + self.persist_topology_snapshot().await; + } + } + + async fn persist_topology_snapshot(&self) { + let Some(path) = &self.topology_path else { + return; + }; + + let topology = self.to_topology().await; + if let Err(error) = persistence::persist_topology(path, &topology).await { + tracing::warn!(error = %error, "Failed to persist cluster topology snapshot"); + } } } diff --git a/rust/src/server.rs b/rust/src/server.rs index b4a50ba..895db15 100644 --- a/rust/src/server.rs +++ b/rust/src/server.rs @@ -11,6 +11,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; use tokio::io::AsyncReadExt; use tokio::net::TcpListener; @@ -27,6 +28,7 @@ use crate::cluster::coordinator::DistributedStore; use crate::cluster::drive_manager::DriveManager; use crate::cluster::healing::HealingService; use crate::cluster::membership::MembershipManager; +use crate::cluster::persistence::{self, ClusterIdentity}; use crate::cluster::placement; use crate::cluster::protocol::NodeInfo; use crate::cluster::quic_transport::QuicTransport; @@ -35,19 +37,46 @@ use crate::cluster::state::ClusterState; use crate::storage::{FileStore, StorageBackend}; use crate::xml_response; +struct ServerMetrics { + started_at: chrono::DateTime, + total_requests: AtomicU64, + error_responses: AtomicU64, +} + +impl ServerMetrics { + fn new() -> Self { + Self { + started_at: chrono::Utc::now(), + total_requests: AtomicU64::new(0), + error_responses: AtomicU64::new(0), + } + } + + fn record_response(&self, status: StatusCode) { + self.total_requests.fetch_add(1, Ordering::Relaxed); + if status.as_u16() >= 400 { + self.error_responses.fetch_add(1, Ordering::Relaxed); + } + } +} + pub struct StorageServer { store: Arc, auth_runtime: Arc, shutdown_tx: watch::Sender, + cluster_shutdown_txs: Vec>, server_handle: tokio::task::JoinHandle<()>, } impl StorageServer { pub async fn start(config: SmartStorageConfig) -> Result { let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth)); + let mut cluster_shutdown_txs = Vec::new(); let store: Arc = if let Some(ref cluster_config) = config.cluster { if cluster_config.enabled { - Self::start_clustered(&config, cluster_config).await? + let (store, shutdown_txs) = Self::start_clustered(&config, cluster_config).await?; + cluster_shutdown_txs = shutdown_txs; + store } else { Self::start_standalone(&config).await? } @@ -69,6 +98,7 @@ impl StorageServer { let server_config = config.clone(); let server_auth_runtime = auth_runtime.clone(); let server_policy_store = policy_store.clone(); + let server_metrics = Arc::new(ServerMetrics::new()); let server_handle = tokio::spawn(async move { loop { @@ -83,6 +113,7 @@ impl StorageServer { let cfg = server_config.clone(); let auth_runtime = server_auth_runtime.clone(); let ps = server_policy_store.clone(); + let metrics = server_metrics.clone(); tokio::spawn(async move { let svc = service_fn(move |req: Request| { @@ -90,8 +121,9 @@ impl StorageServer { let cfg = cfg.clone(); let auth_runtime = auth_runtime.clone(); let ps = ps.clone(); + let metrics = metrics.clone(); async move { - handle_request(req, store, cfg, auth_runtime, ps).await + handle_request(req, store, cfg, auth_runtime, ps, metrics).await } }); @@ -126,11 +158,15 @@ impl StorageServer { store, auth_runtime, shutdown_tx, + cluster_shutdown_txs, server_handle, }) } pub async fn stop(self) { + for shutdown_tx in &self.cluster_shutdown_txs { + let _ = shutdown_tx.send(true); + } let _ = self.shutdown_tx.send(true); let _ = self.server_handle.await; } @@ -139,7 +175,7 @@ impl StorageServer { &self.store } - pub async fn list_credentials(&self) -> Vec { + pub async fn list_credentials(&self) -> Vec { self.auth_runtime.list_credentials().await } @@ -165,12 +201,37 @@ impl StorageServer { async fn start_clustered( config: &SmartStorageConfig, cluster_config: &crate::cluster::config::ClusterConfig, - ) -> Result> { + ) -> Result<(Arc, Vec>)> { let erasure_config = cluster_config.erasure.clone(); + let cluster_metadata_dir = persistence::cluster_metadata_dir(&config.storage.directory); + let identity_path = persistence::identity_path(&cluster_metadata_dir); + let topology_path = persistence::topology_path(&cluster_metadata_dir); + let persisted_identity = persistence::load_identity(&identity_path).await?; + + if let (Some(configured_node_id), Some(identity)) = (&cluster_config.node_id, &persisted_identity) { + if configured_node_id != &identity.node_id { + anyhow::bail!( + "Configured cluster node ID '{}' conflicts with persisted node ID '{}'", + configured_node_id, + identity.node_id + ); + } + } + let node_id = cluster_config .node_id .clone() + .or_else(|| persisted_identity.as_ref().map(|identity| identity.node_id.clone())) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + let cluster_id = persisted_identity + .as_ref() + .map(|identity| identity.cluster_id.clone()) + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + persistence::persist_identity( + &identity_path, + &ClusterIdentity::new(node_id.clone(), cluster_id.clone()), + ) + .await?; // Determine drive paths let drive_paths: Vec = if cluster_config.drives.paths.is_empty() { @@ -202,28 +263,37 @@ impl StorageServer { // Initialize cluster state let cluster_state = Arc::new(ClusterState::new( node_id.clone(), - uuid::Uuid::new_v4().to_string(), + cluster_id.clone(), erasure_config.data_shards, erasure_config.parity_shards, + Some(topology_path.clone()), )); - // Form erasure sets from local drives (single-node for now) - let nodes = vec![(node_id.clone(), drive_paths.len() as u32)]; - let erasure_sets = - placement::form_erasure_sets(&nodes, erasure_config.total_shards()); + let persisted_topology = persistence::load_topology(&topology_path).await?; + let has_persisted_topology = persisted_topology.is_some(); + if let Some(topology) = persisted_topology { + if topology.cluster_id != cluster_id { + anyhow::bail!("Persisted topology cluster ID does not match persisted node identity"); + } + cluster_state.apply_topology(&topology).await; + } else if cluster_config.seed_nodes.is_empty() { + // Form erasure sets from local drives for a first node bootstrap. + let nodes = vec![(node_id.clone(), drive_paths.len() as u32)]; + let erasure_sets = placement::form_erasure_sets(&nodes, erasure_config.total_shards()); - if erasure_sets.is_empty() { - tracing::warn!( - "Not enough drives ({}) for erasure set size ({}). \ - Need at least {} drives.", - drive_paths.len(), - erasure_config.total_shards(), - erasure_config.total_shards(), - ); + if erasure_sets.is_empty() { + tracing::warn!( + "Not enough drives ({}) for erasure set size ({}). \ + Need at least {} drives.", + drive_paths.len(), + erasure_config.total_shards(), + erasure_config.total_shards(), + ); + } + + cluster_state.set_erasure_sets(erasure_sets).await; } - cluster_state.set_erasure_sets(erasure_sets).await; - // Register self as a node let local_node_info = NodeInfo { node_id: node_id.clone(), @@ -233,8 +303,6 @@ impl StorageServer { status: "online".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), }; - cluster_state.add_node(local_node_info.clone()).await; - // Initialize drive manager for health monitoring let drive_manager = Arc::new(tokio::sync::Mutex::new( DriveManager::from_paths(&drive_paths).await?, @@ -246,13 +314,25 @@ impl StorageServer { cluster_state.clone(), transport.clone(), cluster_config.heartbeat_interval_ms, + cluster_config.heartbeat_timeout_ms, local_node_info, ) .with_drive_manager(drive_manager.clone()), ); membership - .join_cluster(&cluster_config.seed_nodes) + .join_cluster( + &cluster_config.seed_nodes, + cluster_config.seed_nodes.is_empty() && !has_persisted_topology, + ) .await?; + let final_cluster_id = cluster_state.cluster_id().await; + if final_cluster_id != cluster_id { + persistence::persist_identity( + &identity_path, + &ClusterIdentity::new(node_id.clone(), final_cluster_id), + ) + .await?; + } // Build local shard stores (one per drive) for shared use let local_shard_stores: Vec> = drive_paths @@ -261,18 +341,19 @@ impl StorageServer { .collect(); // Start QUIC accept loop for incoming connections - let shard_store_for_accept = local_shard_stores[0].clone(); - let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); + let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); let transport_clone = transport.clone(); + let cluster_state_for_accept = cluster_state.clone(); + let shard_stores_for_accept = local_shard_stores.clone(); tokio::spawn(async move { transport_clone - .accept_loop(shard_store_for_accept, quic_shutdown_rx) + .accept_loop(shard_stores_for_accept, Some(cluster_state_for_accept), quic_shutdown_rx) .await; }); // Start heartbeat loop let membership_clone = membership.clone(); - let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); + let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); tokio::spawn(async move { membership_clone.heartbeat_loop(hb_shutdown_rx).await; }); @@ -289,7 +370,7 @@ impl StorageServer { 24, // scan every 24 hours healing_runtime.clone(), )?; - let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false); + let (heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false); tokio::spawn(async move { healing_service.run(heal_shutdown_rx).await; }); @@ -319,7 +400,7 @@ impl StorageServer { ); } - Ok(store) + Ok((store, vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx])) } } @@ -401,12 +482,31 @@ fn storage_error_response(err: &StorageError, request_id: &str) -> Response Response { + Response::builder() + .status(status) + .header("content-type", "application/json") + .header("x-amz-request-id", request_id) + .body(full_body(value.to_string())) + .unwrap() +} + +fn text_response(status: StatusCode, content_type: &str, body: String, request_id: &str) -> Response { + Response::builder() + .status(status) + .header("content-type", content_type) + .header("x-amz-request-id", request_id) + .body(full_body(body)) + .unwrap() +} + async fn handle_request( req: Request, store: Arc, config: SmartStorageConfig, auth_runtime: Arc, policy_store: Arc, + metrics: Arc, ) -> Result, std::convert::Infallible> { let request_id = Uuid::new_v4().to_string(); let method = req.method().clone(); @@ -416,6 +516,23 @@ async fn handle_request( // Handle CORS preflight if config.cors.enabled && method == Method::OPTIONS { let resp = build_cors_preflight(&config, &request_id); + metrics.record_response(resp.status()); + return Ok(resp); + } + + if method == Method::GET && uri.path().starts_with("/-/") { + let resp = match handle_operational_request(uri.path(), store, &config, &metrics, &request_id).await { + Ok(resp) => resp, + Err(error) => { + tracing::error!(error = %error, "Operational endpoint failed"); + json_response( + StatusCode::INTERNAL_SERVER_ERROR, + serde_json::json!({ "ok": false, "error": error.to_string() }), + &request_id, + ) + } + }; + metrics.record_response(resp.status()); return Ok(resp); } @@ -439,7 +556,9 @@ async fn handle_request( Ok(id) => Some(id), Err(e) => { tracing::warn!("Auth failed: {}", e.message); - return Ok(storage_error_response(&e, &request_id)); + let resp = storage_error_response(&e, &request_id); + metrics.record_response(resp.status()); + return Ok(resp); } } } else { @@ -449,7 +568,9 @@ async fn handle_request( // Step 3: Authorization (policy evaluation) if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await { - return Ok(storage_error_response(&e, &request_id)); + let resp = storage_error_response(&e, &request_id); + metrics.record_response(resp.status()); + return Ok(resp); } } @@ -481,9 +602,118 @@ async fn handle_request( "request" ); + metrics.record_response(response.status()); + Ok(response) } +async fn handle_operational_request( + path: &str, + store: Arc, + config: &SmartStorageConfig, + metrics: &ServerMetrics, + request_id: &str, +) -> Result> { + match path { + "/-/live" | "/-/livez" => Ok(json_response( + StatusCode::OK, + serde_json::json!({ + "ok": true, + "status": "alive", + "startedAt": metrics.started_at.timestamp_millis(), + }), + request_id, + )), + "/-/ready" | "/-/readyz" => { + let cluster_health = store.get_cluster_health().await?; + let cluster_ready = !cluster_health.enabled + || (cluster_health.majority_healthy.unwrap_or(false) + && cluster_health.quorum_healthy.unwrap_or(false)); + let status = if cluster_ready { + StatusCode::OK + } else { + StatusCode::SERVICE_UNAVAILABLE + }; + Ok(json_response( + status, + serde_json::json!({ + "ok": cluster_ready, + "status": if cluster_ready { "ready" } else { "degraded" }, + "cluster": cluster_health, + }), + request_id, + )) + } + "/-/health" | "/-/healthz" => { + let cluster_health = store.get_cluster_health().await?; + let stats = store.get_storage_stats().await?; + Ok(json_response( + StatusCode::OK, + serde_json::json!({ + "ok": true, + "status": "healthy", + "version": env!("CARGO_PKG_VERSION"), + "server": { + "address": config.server.address, + "port": config.server.port, + "startedAt": metrics.started_at.timestamp_millis(), + }, + "storage": stats, + "cluster": cluster_health, + "metrics": { + "totalRequests": metrics.total_requests.load(Ordering::Relaxed), + "errorResponses": metrics.error_responses.load(Ordering::Relaxed), + }, + }), + request_id, + )) + } + "/-/metrics" => { + let cluster_health = store.get_cluster_health().await?; + let stats = store.get_storage_stats().await?; + let cluster_enabled = if cluster_health.enabled { 1 } else { 0 }; + let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) { 1 } else { 0 }; + let body = format!( + "# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\ + # TYPE smartstorage_requests_total counter\n\ + smartstorage_requests_total {}\n\ + # HELP smartstorage_error_responses_total HTTP responses with status >= 400.\n\ + # TYPE smartstorage_error_responses_total counter\n\ + smartstorage_error_responses_total {}\n\ + # HELP smartstorage_buckets_total Runtime bucket count.\n\ + # TYPE smartstorage_buckets_total gauge\n\ + smartstorage_buckets_total {}\n\ + # HELP smartstorage_objects_total Runtime object count.\n\ + # TYPE smartstorage_objects_total gauge\n\ + smartstorage_objects_total {}\n\ + # HELP smartstorage_cluster_enabled Cluster mode enabled.\n\ + # TYPE smartstorage_cluster_enabled gauge\n\ + smartstorage_cluster_enabled {}\n\ + # HELP smartstorage_cluster_quorum_healthy Cluster quorum health.\n\ + # TYPE smartstorage_cluster_quorum_healthy gauge\n\ + smartstorage_cluster_quorum_healthy {}\n", + metrics.total_requests.load(Ordering::Relaxed), + metrics.error_responses.load(Ordering::Relaxed), + stats.bucket_count, + stats.total_object_count, + cluster_enabled, + quorum_healthy, + ); + Ok(text_response( + StatusCode::OK, + "text/plain; version=0.0.4", + body, + request_id, + )) + } + _ => Ok(json_response( + StatusCode::NOT_FOUND, + serde_json::json!({ "ok": false, "error": "Unknown operational endpoint" }), + request_id, + )), + } +} + /// Authorize a request based on bucket policies and authentication state. async fn authorize_request( ctx: &RequestContext, diff --git a/test/test.cluster-multinode.node.ts b/test/test.cluster-multinode.node.ts new file mode 100644 index 0000000..4911438 --- /dev/null +++ b/test/test.cluster-multinode.node.ts @@ -0,0 +1,317 @@ +/// + +import { readFile, readdir, rm } from 'fs/promises'; +import { join } from 'path'; +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import { CreateBucketCommand, GetObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3'; +import { Readable } from 'stream'; +import * as smartstorage from '../ts/index.js'; + +const baseDir = join(process.cwd(), '.nogit', `cluster-multinode-${Date.now()}`); +const nodes: smartstorage.SmartStorage[] = []; + +const makeDrivePaths = (nodeId: string) => { + return [1, 2].map((driveIndex) => join(baseDir, nodeId, `drive-${driveIndex}`)); +}; + +const streamToString = async (stream: Readable): Promise => { + const chunks: Buffer[] = []; + return new Promise((resolve, reject) => { + stream.on('data', (chunk: string | Buffer | Uint8Array) => chunks.push(Buffer.from(chunk))); + stream.on('error', reject); + stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + }); +}; + +const fileExistsBelow = async (directory: string, fileName: string): Promise => { + let entries; + try { + entries = await readdir(directory, { withFileTypes: true }); + } catch { + return false; + } + + for (const entry of entries) { + const entryPath = join(directory, entry.name); + if (entry.isFile() && entry.name === fileName) { + return true; + } + if (entry.isDirectory() && await fileExistsBelow(entryPath, fileName)) { + return true; + } + } + + return false; +}; + +const waitFor = async (check: () => Promise, timeoutMs = 10000) => { + const deadline = Date.now() + timeoutMs; + let lastError = ''; + while (Date.now() < deadline) { + try { + if (await check()) { + return; + } + } catch (error) { + lastError = error instanceof Error ? error.message : String(error); + } + await new Promise((resolve) => setTimeout(resolve, 250)); + } + throw new Error(`Timed out waiting for cluster condition${lastError ? `: ${lastError}` : ''}`); +}; + +tap.test('setup: start three clustered storage nodes', async () => { + await rm(baseDir, { recursive: true, force: true }); + + const node1 = await smartstorage.SmartStorage.createAndStart({ + server: { + address: '127.0.0.1', + port: 3350, + silent: true, + }, + storage: { + directory: join(baseDir, 'node-1', 'storage'), + }, + cluster: { + enabled: true, + nodeId: 'node-1', + quicPort: 4350, + seedNodes: [], + erasure: { + dataShards: 4, + parityShards: 2, + chunkSizeBytes: 1024 * 1024, + }, + drives: { + paths: makeDrivePaths('node-1'), + }, + heartbeatIntervalMs: 500, + heartbeatTimeoutMs: 3000, + }, + }); + nodes.push(node1); + + await new Promise((resolve) => setTimeout(resolve, 500)); + + const node2 = await smartstorage.SmartStorage.createAndStart({ + server: { + address: '127.0.0.1', + port: 3351, + silent: true, + }, + storage: { + directory: join(baseDir, 'node-2', 'storage'), + }, + cluster: { + enabled: true, + nodeId: 'node-2', + quicPort: 4351, + seedNodes: ['127.0.0.1:4350'], + erasure: { + dataShards: 4, + parityShards: 2, + chunkSizeBytes: 1024 * 1024, + }, + drives: { + paths: makeDrivePaths('node-2'), + }, + heartbeatIntervalMs: 500, + heartbeatTimeoutMs: 3000, + }, + }); + nodes.push(node2); + + await new Promise((resolve) => setTimeout(resolve, 500)); + + const node3 = await smartstorage.SmartStorage.createAndStart({ + server: { + address: '127.0.0.1', + port: 3352, + silent: true, + }, + storage: { + directory: join(baseDir, 'node-3', 'storage'), + }, + cluster: { + enabled: true, + nodeId: 'node-3', + quicPort: 4352, + seedNodes: ['127.0.0.1:4350'], + erasure: { + dataShards: 4, + parityShards: 2, + chunkSizeBytes: 1024 * 1024, + }, + drives: { + paths: makeDrivePaths('node-3'), + }, + heartbeatIntervalMs: 500, + heartbeatTimeoutMs: 3000, + }, + }); + nodes.push(node3); +}); + +tap.test('seed node should report joined peers and multi-node erasure topology', async () => { + const seed = nodes[0]; + + await waitFor(async () => { + const health = await seed.getClusterHealth(); + if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) { + throw new Error(JSON.stringify(health)); + } + return health.peers?.length === 2 && health.erasure?.erasureSetCount === 1; + }); + + const health = await seed.getClusterHealth(); + const peerIds = health.peers!.map((peer) => peer.nodeId).sort(); + + expect(health.enabled).toEqual(true); + expect(health.nodeId).toEqual('node-1'); + expect(health.quorumHealthy).toEqual(true); + expect(health.majorityHealthy).toEqual(true); + expect(peerIds).toEqual(['node-2', 'node-3']); + expect(health.erasure?.totalShards).toEqual(6); + expect(health.erasure?.erasureSetCount).toEqual(1); +}); + +tap.test('all nodes should converge to the same multi-node topology', async () => { + for (const node of nodes) { + await waitFor(async () => { + const health = await node.getClusterHealth(); + if (health.peers?.length !== 2 || health.erasure?.erasureSetCount !== 1) { + throw new Error(JSON.stringify(health)); + } + return true; + }); + } +}); + +tap.test('seed node should write shards to the declared remote drives', async () => { + const seed = nodes[0]; + const descriptor = await seed.getStorageDescriptor(); + const client = new S3Client({ + endpoint: `http://${descriptor.endpoint}:${descriptor.port}`, + region: 'us-east-1', + credentials: { + accessKeyId: descriptor.accessKey, + secretAccessKey: descriptor.accessSecret, + }, + forcePathStyle: true, + }); + const bucket = 'multinode-bucket'; + const key = 'distributed.txt'; + const body = 'hello distributed shards'; + + await client.send(new CreateBucketCommand({ Bucket: bucket })); + await client.send(new PutObjectCommand({ Bucket: bucket, Key: key, Body: body })); + + const getResponse = await client.send(new GetObjectCommand({ Bucket: bucket, Key: key })); + expect(await streamToString(getResponse.Body as Readable)).toEqual(body); + + const manifestPath = join( + baseDir, + 'node-1', + 'storage', + '.manifests', + bucket, + `${key}.manifest.json`, + ); + const manifest = JSON.parse(await readFile(manifestPath, 'utf8')) as { + chunks: Array<{ + shardPlacements: Array<{ shardIndex: number; nodeId: string; driveId: string }>; + }>; + }; + const placements = manifest.chunks[0].shardPlacements; + + expect(placements.length).toEqual(6); + expect(placements.some((placement) => placement.nodeId === 'node-2' && placement.driveId === '1')) + .toEqual(true); + expect(placements.some((placement) => placement.nodeId === 'node-3' && placement.driveId === '1')) + .toEqual(true); + + for (const placement of placements) { + const drivePath = makeDrivePaths(placement.nodeId)[Number(placement.driveId)]; + const shardFile = `shard-${placement.shardIndex}.dat`; + expect(await fileExistsBelow(join(drivePath, '.smartstorage', 'data'), shardFile)).toEqual(true); + } +}); + +tap.test('restarted peer should keep durable identity and rejoin topology', async () => { + await nodes[1].stop(); + await new Promise((resolve) => setTimeout(resolve, 500)); + + nodes[1] = await smartstorage.SmartStorage.createAndStart({ + server: { + address: '127.0.0.1', + port: 3351, + silent: true, + }, + storage: { + directory: join(baseDir, 'node-2', 'storage'), + }, + cluster: { + enabled: true, + nodeId: 'node-2', + quicPort: 4351, + seedNodes: ['127.0.0.1:4350'], + erasure: { + dataShards: 4, + parityShards: 2, + chunkSizeBytes: 1024 * 1024, + }, + drives: { + paths: makeDrivePaths('node-2'), + }, + heartbeatIntervalMs: 500, + heartbeatTimeoutMs: 3000, + }, + }); + + await waitFor(async () => { + const health = await nodes[1].getClusterHealth(); + if (health.nodeId !== 'node-2' || health.peers?.length !== 2) { + throw new Error(JSON.stringify(health)); + } + return true; + }); + + const identityPath = join( + baseDir, + 'node-2', + 'storage', + '.smartstorage', + 'cluster', + 'identity.json', + ); + const topologyPath = join( + baseDir, + 'node-2', + 'storage', + '.smartstorage', + 'cluster', + 'topology.json', + ); + const identity = JSON.parse(await readFile(identityPath, 'utf8')) as { + nodeId: string; + clusterId: string; + }; + const topology = JSON.parse(await readFile(topologyPath, 'utf8')) as { + cluster_id: string; + nodes: Array<{ node_id: string }>; + }; + + expect(identity.nodeId).toEqual('node-2'); + expect(identity.clusterId).toEqual(topology.cluster_id); + expect(topology.nodes.some((node) => node.node_id === 'node-1')).toEqual(true); + expect(topology.nodes.some((node) => node.node_id === 'node-3')).toEqual(true); +}); + +tap.test('teardown: stop clustered nodes and clean files', async () => { + for (const node of nodes.reverse()) { + await node.stop(); + } + await rm(baseDir, { recursive: true, force: true }); +}); + +export default tap.start(); diff --git a/test/test.credentials.node.ts b/test/test.credentials.node.ts index e708315..80d7f79 100644 --- a/test/test.credentials.node.ts +++ b/test/test.credentials.node.ts @@ -65,11 +65,11 @@ tap.test('startup credentials authenticate successfully', async () => { expect(response.$metadata.httpStatusCode).toEqual(200); }); -tap.test('listCredentials returns the active startup credential set', async () => { +tap.test('listCredentials returns active credential metadata without secrets', async () => { const credentials = await testSmartStorageInstance.listCredentials(); expect(credentials.length).toEqual(1); expect(credentials[0].accessKeyId).toEqual(INITIAL_CREDENTIAL.accessKeyId); - expect(credentials[0].secretAccessKey).toEqual(INITIAL_CREDENTIAL.secretAccessKey); + expect((credentials[0] as any).secretAccessKey).toEqual(undefined); }); tap.test('invalid replacement input fails cleanly and leaves old credentials active', async () => { diff --git a/test/test.health-http.node.ts b/test/test.health-http.node.ts new file mode 100644 index 0000000..cd5c09c --- /dev/null +++ b/test/test.health-http.node.ts @@ -0,0 +1,50 @@ +import { expect, tap } from '@git.zone/tstest/tapbundle'; +import * as smartstorage from '../ts/index.js'; + +const TEST_PORT = 3353; +let testSmartStorageInstance: smartstorage.SmartStorage; + +tap.test('setup: start storage server for operational endpoint checks', async () => { + testSmartStorageInstance = await smartstorage.SmartStorage.createAndStart({ + server: { + port: TEST_PORT, + silent: true, + region: 'us-east-1', + }, + storage: { + cleanSlate: true, + }, + auth: { + enabled: false, + credentials: [], + }, + }); +}); + +tap.test('operational endpoints expose live ready health and metrics', async () => { + const live = await fetch(`http://localhost:${TEST_PORT}/-/live`); + expect(live.status).toEqual(200); + expect((await live.json()).status).toEqual('alive'); + + const ready = await fetch(`http://localhost:${TEST_PORT}/-/ready`); + expect(ready.status).toEqual(200); + expect((await ready.json()).status).toEqual('ready'); + + const health = await fetch(`http://localhost:${TEST_PORT}/-/health`); + expect(health.status).toEqual(200); + const healthBody = await health.json(); + expect(healthBody.ok).toEqual(true); + expect(healthBody.cluster.enabled).toEqual(false); + + const metrics = await fetch(`http://localhost:${TEST_PORT}/-/metrics`); + expect(metrics.status).toEqual(200); + const metricsBody = await metrics.text(); + expect(metricsBody.includes('smartstorage_requests_total')).toEqual(true); + expect(metricsBody.includes('smartstorage_cluster_enabled 0')).toEqual(true); +}); + +tap.test('teardown: stop storage server', async () => { + await testSmartStorageInstance.stop(); +}); + +export default tap.start(); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 9ad1bff..12ba9b6 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartstorage', - version: '6.3.3', + version: '6.4.0', description: 'A Node.js TypeScript package to create a local S3-compatible storage server using mapped local directories for development and testing purposes.' } diff --git a/ts/index.ts b/ts/index.ts index d1e0bb3..85b1458 100644 --- a/ts/index.ts +++ b/ts/index.ts @@ -9,6 +9,10 @@ export interface IStorageCredential { secretAccessKey: string; } +export interface IStorageCredentialMetadata { + accessKeyId: string; +} + /** * Authentication configuration */ @@ -311,7 +315,7 @@ type TRustStorageCommands = { createBucket: { params: { name: string }; result: {} }; getStorageStats: { params: {}; result: IStorageStats }; listBucketSummaries: { params: {}; result: IBucketSummary[] }; - listCredentials: { params: {}; result: IStorageCredential[] }; + listCredentials: { params: {}; result: IStorageCredentialMetadata[] }; replaceCredentials: { params: { credentials: IStorageCredential[] }; result: {} }; getClusterHealth: { params: {}; result: IClusterHealth }; }; @@ -391,7 +395,7 @@ export class SmartStorage { return this.bridge.sendCommand('listBucketSummaries', {}); } - public async listCredentials(): Promise { + public async listCredentials(): Promise { return this.bridge.sendCommand('listCredentials', {}); }