feat(cluster,server,auth): add operational health endpoints, persist cluster topology, and hide credential secrets from runtime listings

This commit is contained in:
2026-04-30 06:08:42 +00:00
parent c2b40ee240
commit a31e477359
16 changed files with 1120 additions and 123 deletions
+260 -30
View File
@@ -11,6 +11,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context, Poll};
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
@@ -27,6 +28,7 @@ use crate::cluster::coordinator::DistributedStore;
use crate::cluster::drive_manager::DriveManager;
use crate::cluster::healing::HealingService;
use crate::cluster::membership::MembershipManager;
use crate::cluster::persistence::{self, ClusterIdentity};
use crate::cluster::placement;
use crate::cluster::protocol::NodeInfo;
use crate::cluster::quic_transport::QuicTransport;
@@ -35,19 +37,46 @@ use crate::cluster::state::ClusterState;
use crate::storage::{FileStore, StorageBackend};
use crate::xml_response;
struct ServerMetrics {
started_at: chrono::DateTime<chrono::Utc>,
total_requests: AtomicU64,
error_responses: AtomicU64,
}
impl ServerMetrics {
fn new() -> Self {
Self {
started_at: chrono::Utc::now(),
total_requests: AtomicU64::new(0),
error_responses: AtomicU64::new(0),
}
}
fn record_response(&self, status: StatusCode) {
self.total_requests.fetch_add(1, Ordering::Relaxed);
if status.as_u16() >= 400 {
self.error_responses.fetch_add(1, Ordering::Relaxed);
}
}
}
pub struct StorageServer {
store: Arc<StorageBackend>,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
shutdown_tx: watch::Sender<bool>,
cluster_shutdown_txs: Vec<watch::Sender<bool>>,
server_handle: tokio::task::JoinHandle<()>,
}
impl StorageServer {
pub async fn start(config: SmartStorageConfig) -> Result<Self> {
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
let mut cluster_shutdown_txs = Vec::new();
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
if cluster_config.enabled {
Self::start_clustered(&config, cluster_config).await?
let (store, shutdown_txs) = Self::start_clustered(&config, cluster_config).await?;
cluster_shutdown_txs = shutdown_txs;
store
} else {
Self::start_standalone(&config).await?
}
@@ -69,6 +98,7 @@ impl StorageServer {
let server_config = config.clone();
let server_auth_runtime = auth_runtime.clone();
let server_policy_store = policy_store.clone();
let server_metrics = Arc::new(ServerMetrics::new());
let server_handle = tokio::spawn(async move {
loop {
@@ -83,6 +113,7 @@ impl StorageServer {
let cfg = server_config.clone();
let auth_runtime = server_auth_runtime.clone();
let ps = server_policy_store.clone();
let metrics = server_metrics.clone();
tokio::spawn(async move {
let svc = service_fn(move |req: Request<Incoming>| {
@@ -90,8 +121,9 @@ impl StorageServer {
let cfg = cfg.clone();
let auth_runtime = auth_runtime.clone();
let ps = ps.clone();
let metrics = metrics.clone();
async move {
handle_request(req, store, cfg, auth_runtime, ps).await
handle_request(req, store, cfg, auth_runtime, ps, metrics).await
}
});
@@ -126,11 +158,15 @@ impl StorageServer {
store,
auth_runtime,
shutdown_tx,
cluster_shutdown_txs,
server_handle,
})
}
pub async fn stop(self) {
for shutdown_tx in &self.cluster_shutdown_txs {
let _ = shutdown_tx.send(true);
}
let _ = self.shutdown_tx.send(true);
let _ = self.server_handle.await;
}
@@ -139,7 +175,7 @@ impl StorageServer {
&self.store
}
pub async fn list_credentials(&self) -> Vec<crate::config::Credential> {
pub async fn list_credentials(&self) -> Vec<crate::auth::CredentialMetadata> {
self.auth_runtime.list_credentials().await
}
@@ -165,12 +201,37 @@ impl StorageServer {
async fn start_clustered(
config: &SmartStorageConfig,
cluster_config: &crate::cluster::config::ClusterConfig,
) -> Result<Arc<StorageBackend>> {
) -> Result<(Arc<StorageBackend>, Vec<watch::Sender<bool>>)> {
let erasure_config = cluster_config.erasure.clone();
let cluster_metadata_dir = persistence::cluster_metadata_dir(&config.storage.directory);
let identity_path = persistence::identity_path(&cluster_metadata_dir);
let topology_path = persistence::topology_path(&cluster_metadata_dir);
let persisted_identity = persistence::load_identity(&identity_path).await?;
if let (Some(configured_node_id), Some(identity)) = (&cluster_config.node_id, &persisted_identity) {
if configured_node_id != &identity.node_id {
anyhow::bail!(
"Configured cluster node ID '{}' conflicts with persisted node ID '{}'",
configured_node_id,
identity.node_id
);
}
}
let node_id = cluster_config
.node_id
.clone()
.or_else(|| persisted_identity.as_ref().map(|identity| identity.node_id.clone()))
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let cluster_id = persisted_identity
.as_ref()
.map(|identity| identity.cluster_id.clone())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
persistence::persist_identity(
&identity_path,
&ClusterIdentity::new(node_id.clone(), cluster_id.clone()),
)
.await?;
// Determine drive paths
let drive_paths: Vec<std::path::PathBuf> = if cluster_config.drives.paths.is_empty() {
@@ -202,28 +263,37 @@ impl StorageServer {
// Initialize cluster state
let cluster_state = Arc::new(ClusterState::new(
node_id.clone(),
uuid::Uuid::new_v4().to_string(),
cluster_id.clone(),
erasure_config.data_shards,
erasure_config.parity_shards,
Some(topology_path.clone()),
));
// Form erasure sets from local drives (single-node for now)
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
let erasure_sets =
placement::form_erasure_sets(&nodes, erasure_config.total_shards());
let persisted_topology = persistence::load_topology(&topology_path).await?;
let has_persisted_topology = persisted_topology.is_some();
if let Some(topology) = persisted_topology {
if topology.cluster_id != cluster_id {
anyhow::bail!("Persisted topology cluster ID does not match persisted node identity");
}
cluster_state.apply_topology(&topology).await;
} else if cluster_config.seed_nodes.is_empty() {
// Form erasure sets from local drives for a first node bootstrap.
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
let erasure_sets = placement::form_erasure_sets(&nodes, erasure_config.total_shards());
if erasure_sets.is_empty() {
tracing::warn!(
"Not enough drives ({}) for erasure set size ({}). \
Need at least {} drives.",
drive_paths.len(),
erasure_config.total_shards(),
erasure_config.total_shards(),
);
if erasure_sets.is_empty() {
tracing::warn!(
"Not enough drives ({}) for erasure set size ({}). \
Need at least {} drives.",
drive_paths.len(),
erasure_config.total_shards(),
erasure_config.total_shards(),
);
}
cluster_state.set_erasure_sets(erasure_sets).await;
}
cluster_state.set_erasure_sets(erasure_sets).await;
// Register self as a node
let local_node_info = NodeInfo {
node_id: node_id.clone(),
@@ -233,8 +303,6 @@ impl StorageServer {
status: "online".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
};
cluster_state.add_node(local_node_info.clone()).await;
// Initialize drive manager for health monitoring
let drive_manager = Arc::new(tokio::sync::Mutex::new(
DriveManager::from_paths(&drive_paths).await?,
@@ -246,13 +314,25 @@ impl StorageServer {
cluster_state.clone(),
transport.clone(),
cluster_config.heartbeat_interval_ms,
cluster_config.heartbeat_timeout_ms,
local_node_info,
)
.with_drive_manager(drive_manager.clone()),
);
membership
.join_cluster(&cluster_config.seed_nodes)
.join_cluster(
&cluster_config.seed_nodes,
cluster_config.seed_nodes.is_empty() && !has_persisted_topology,
)
.await?;
let final_cluster_id = cluster_state.cluster_id().await;
if final_cluster_id != cluster_id {
persistence::persist_identity(
&identity_path,
&ClusterIdentity::new(node_id.clone(), final_cluster_id),
)
.await?;
}
// Build local shard stores (one per drive) for shared use
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
@@ -261,18 +341,19 @@ impl StorageServer {
.collect();
// Start QUIC accept loop for incoming connections
let shard_store_for_accept = local_shard_stores[0].clone();
let (_quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
let transport_clone = transport.clone();
let cluster_state_for_accept = cluster_state.clone();
let shard_stores_for_accept = local_shard_stores.clone();
tokio::spawn(async move {
transport_clone
.accept_loop(shard_store_for_accept, quic_shutdown_rx)
.accept_loop(shard_stores_for_accept, Some(cluster_state_for_accept), quic_shutdown_rx)
.await;
});
// Start heartbeat loop
let membership_clone = membership.clone();
let (_hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
});
@@ -289,7 +370,7 @@ impl StorageServer {
24, // scan every 24 hours
healing_runtime.clone(),
)?;
let (_heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
let (heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
tokio::spawn(async move {
healing_service.run(heal_shutdown_rx).await;
});
@@ -319,7 +400,7 @@ impl StorageServer {
);
}
Ok(store)
Ok((store, vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx]))
}
}
@@ -401,12 +482,31 @@ fn storage_error_response(err: &StorageError, request_id: &str) -> Response<BoxB
.unwrap()
}
fn json_response(status: StatusCode, value: serde_json::Value, request_id: &str) -> Response<BoxBody> {
Response::builder()
.status(status)
.header("content-type", "application/json")
.header("x-amz-request-id", request_id)
.body(full_body(value.to_string()))
.unwrap()
}
fn text_response(status: StatusCode, content_type: &str, body: String, request_id: &str) -> Response<BoxBody> {
Response::builder()
.status(status)
.header("content-type", content_type)
.header("x-amz-request-id", request_id)
.body(full_body(body))
.unwrap()
}
async fn handle_request(
req: Request<Incoming>,
store: Arc<StorageBackend>,
config: SmartStorageConfig,
auth_runtime: Arc<auth::RuntimeCredentialStore>,
policy_store: Arc<PolicyStore>,
metrics: Arc<ServerMetrics>,
) -> Result<Response<BoxBody>, std::convert::Infallible> {
let request_id = Uuid::new_v4().to_string();
let method = req.method().clone();
@@ -416,6 +516,23 @@ async fn handle_request(
// Handle CORS preflight
if config.cors.enabled && method == Method::OPTIONS {
let resp = build_cors_preflight(&config, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
if method == Method::GET && uri.path().starts_with("/-/") {
let resp = match handle_operational_request(uri.path(), store, &config, &metrics, &request_id).await {
Ok(resp) => resp,
Err(error) => {
tracing::error!(error = %error, "Operational endpoint failed");
json_response(
StatusCode::INTERNAL_SERVER_ERROR,
serde_json::json!({ "ok": false, "error": error.to_string() }),
&request_id,
)
}
};
metrics.record_response(resp.status());
return Ok(resp);
}
@@ -439,7 +556,9 @@ async fn handle_request(
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("Auth failed: {}", e.message);
return Ok(storage_error_response(&e, &request_id));
let resp = storage_error_response(&e, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
}
} else {
@@ -449,7 +568,9 @@ async fn handle_request(
// Step 3: Authorization (policy evaluation)
if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await {
return Ok(storage_error_response(&e, &request_id));
let resp = storage_error_response(&e, &request_id);
metrics.record_response(resp.status());
return Ok(resp);
}
}
@@ -481,9 +602,118 @@ async fn handle_request(
"request"
);
metrics.record_response(response.status());
Ok(response)
}
async fn handle_operational_request(
path: &str,
store: Arc<StorageBackend>,
config: &SmartStorageConfig,
metrics: &ServerMetrics,
request_id: &str,
) -> Result<Response<BoxBody>> {
match path {
"/-/live" | "/-/livez" => Ok(json_response(
StatusCode::OK,
serde_json::json!({
"ok": true,
"status": "alive",
"startedAt": metrics.started_at.timestamp_millis(),
}),
request_id,
)),
"/-/ready" | "/-/readyz" => {
let cluster_health = store.get_cluster_health().await?;
let cluster_ready = !cluster_health.enabled
|| (cluster_health.majority_healthy.unwrap_or(false)
&& cluster_health.quorum_healthy.unwrap_or(false));
let status = if cluster_ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
Ok(json_response(
status,
serde_json::json!({
"ok": cluster_ready,
"status": if cluster_ready { "ready" } else { "degraded" },
"cluster": cluster_health,
}),
request_id,
))
}
"/-/health" | "/-/healthz" => {
let cluster_health = store.get_cluster_health().await?;
let stats = store.get_storage_stats().await?;
Ok(json_response(
StatusCode::OK,
serde_json::json!({
"ok": true,
"status": "healthy",
"version": env!("CARGO_PKG_VERSION"),
"server": {
"address": config.server.address,
"port": config.server.port,
"startedAt": metrics.started_at.timestamp_millis(),
},
"storage": stats,
"cluster": cluster_health,
"metrics": {
"totalRequests": metrics.total_requests.load(Ordering::Relaxed),
"errorResponses": metrics.error_responses.load(Ordering::Relaxed),
},
}),
request_id,
))
}
"/-/metrics" => {
let cluster_health = store.get_cluster_health().await?;
let stats = store.get_storage_stats().await?;
let cluster_enabled = if cluster_health.enabled { 1 } else { 0 };
let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) { 1 } else { 0 };
let body = format!(
"# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\
# TYPE smartstorage_requests_total counter\n\
smartstorage_requests_total {}\n\
# HELP smartstorage_error_responses_total HTTP responses with status >= 400.\n\
# TYPE smartstorage_error_responses_total counter\n\
smartstorage_error_responses_total {}\n\
# HELP smartstorage_buckets_total Runtime bucket count.\n\
# TYPE smartstorage_buckets_total gauge\n\
smartstorage_buckets_total {}\n\
# HELP smartstorage_objects_total Runtime object count.\n\
# TYPE smartstorage_objects_total gauge\n\
smartstorage_objects_total {}\n\
# HELP smartstorage_cluster_enabled Cluster mode enabled.\n\
# TYPE smartstorage_cluster_enabled gauge\n\
smartstorage_cluster_enabled {}\n\
# HELP smartstorage_cluster_quorum_healthy Cluster quorum health.\n\
# TYPE smartstorage_cluster_quorum_healthy gauge\n\
smartstorage_cluster_quorum_healthy {}\n",
metrics.total_requests.load(Ordering::Relaxed),
metrics.error_responses.load(Ordering::Relaxed),
stats.bucket_count,
stats.total_object_count,
cluster_enabled,
quorum_healthy,
);
Ok(text_response(
StatusCode::OK,
"text/plain; version=0.0.4",
body,
request_id,
))
}
_ => Ok(json_response(
StatusCode::NOT_FOUND,
serde_json::json!({ "ok": false, "error": "Unknown operational endpoint" }),
request_id,
)),
}
}
/// Authorize a request based on bucket policies and authentication state.
async fn authorize_request(
ctx: &RequestContext,