use anyhow::Result; use bytes::Bytes; use futures_core::Stream; use http_body_util::BodyExt; use hyper::body::Incoming; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use hyper_util::rt::TokioIo; use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::AsyncReadExt; use tokio::net::TcpListener; use tokio::sync::watch; use tokio_util::io::ReaderStream; use uuid::Uuid; use crate::action::{self, RequestContext, StorageAction}; use crate::auth::{self, AuthenticatedIdentity}; use crate::cluster::coordinator::DistributedStore; use crate::cluster::drive_manager::DriveManager; use crate::cluster::healing::HealingService; use crate::cluster::membership::MembershipManager; use crate::cluster::persistence::{self, ClusterIdentity}; use crate::cluster::placement; use crate::cluster::protocol::NodeInfo; use crate::cluster::quic_transport::QuicTransport; use crate::cluster::shard_store::ShardStore; use crate::cluster::state::ClusterState; use crate::config::{Credential, SmartStorageConfig}; use crate::error::StorageError; use crate::policy::{self, PolicyDecision, PolicyStore}; use crate::storage::{FileStore, StorageBackend}; use crate::xml_response; struct ServerMetrics { started_at: chrono::DateTime, total_requests: AtomicU64, error_responses: AtomicU64, } impl ServerMetrics { fn new() -> Self { Self { started_at: chrono::Utc::now(), total_requests: AtomicU64::new(0), error_responses: AtomicU64::new(0), } } fn record_response(&self, status: StatusCode) { self.total_requests.fetch_add(1, Ordering::Relaxed); if status.as_u16() >= 400 { self.error_responses.fetch_add(1, Ordering::Relaxed); } } } pub struct StorageServer { store: Arc, auth_runtime: Arc, shutdown_tx: watch::Sender, cluster_shutdown_txs: Vec>, server_handle: tokio::task::JoinHandle<()>, } impl StorageServer { pub async fn start(config: SmartStorageConfig) -> Result { let mut cluster_shutdown_txs = Vec::new(); let store: Arc = if let Some(ref cluster_config) = config.cluster { if cluster_config.enabled { let (store, shutdown_txs) = Self::start_clustered(&config, cluster_config).await?; cluster_shutdown_txs = shutdown_txs; store } else { Self::start_standalone(&config).await? } } else { Self::start_standalone(&config).await? }; // Initialize policy store let policy_store = Arc::new(PolicyStore::new(store.policies_dir())); policy_store.load_from_disk().await?; let auth_runtime = Arc::new( auth::RuntimeCredentialStore::new(&config.auth, Some(Self::credentials_path(&config))) .await?, ); let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port).parse()?; let listener = TcpListener::bind(addr).await?; let (shutdown_tx, shutdown_rx) = watch::channel(false); let server_store = store.clone(); let server_config = config.clone(); let server_auth_runtime = auth_runtime.clone(); let server_policy_store = policy_store.clone(); let server_metrics = Arc::new(ServerMetrics::new()); let server_handle = tokio::spawn(async move { loop { let mut rx = shutdown_rx.clone(); tokio::select! { result = listener.accept() => { match result { Ok((stream, _remote_addr)) => { let io = TokioIo::new(stream); let store = server_store.clone(); let cfg = server_config.clone(); let auth_runtime = server_auth_runtime.clone(); let ps = server_policy_store.clone(); let metrics = server_metrics.clone(); tokio::spawn(async move { let svc = service_fn(move |req: Request| { let store = store.clone(); let cfg = cfg.clone(); let auth_runtime = auth_runtime.clone(); let ps = ps.clone(); let metrics = metrics.clone(); async move { handle_request(req, store, cfg, auth_runtime, ps, metrics).await } }); if let Err(e) = http1::Builder::new() .keep_alive(true) .serve_connection(io, svc) .await { if !e.is_incomplete_message() { tracing::error!("Connection error: {}", e); } } }); } Err(e) => { tracing::error!("Accept error: {}", e); } } } _ = rx.changed() => { break; } } } }); if !config.server.silent { tracing::info!("Storage server listening on {}", addr); } Ok(Self { store, auth_runtime, shutdown_tx, cluster_shutdown_txs, server_handle, }) } pub async fn stop(self) { for shutdown_tx in &self.cluster_shutdown_txs { let _ = shutdown_tx.send(true); } let _ = self.shutdown_tx.send(true); let _ = self.server_handle.await; } pub fn store(&self) -> &StorageBackend { &self.store } pub async fn list_credentials(&self) -> Vec { self.auth_runtime.list_credentials().await } pub async fn replace_credentials( &self, credentials: Vec, ) -> Result<(), StorageError> { self.auth_runtime.replace_credentials(credentials).await } pub async fn create_bucket_tenant( &self, bucket_name: &str, credential: Credential, ) -> Result { self.ensure_tenant_auth_enabled()?; let replacement = self .auth_runtime .replace_bucket_tenant_credential_with_snapshot(bucket_name, credential) .await?; if let Err(error) = self.store.create_bucket(bucket_name).await { if let Err(rollback_error) = self .auth_runtime .replace_credentials(replacement.previous_credentials) .await { return Err(anyhow::anyhow!( "Failed to create tenant bucket: {}; credential rollback failed: {}", error, rollback_error.message )); } return Err(error); } Ok(replacement.credential) } pub async fn rotate_bucket_tenant_credentials( &self, bucket_name: &str, credential: Credential, ) -> Result { self.ensure_tenant_auth_enabled()?; if !self.store.bucket_exists(bucket_name).await { return Err(StorageError::no_such_bucket().into()); } Ok(self .auth_runtime .replace_bucket_tenant_credential(bucket_name, credential) .await?) } pub async fn delete_bucket_tenant( &self, bucket_name: &str, access_key_id: Option<&str>, ) -> Result<()> { self.ensure_tenant_auth_enabled()?; let removal = self .auth_runtime .remove_bucket_tenant_credentials(bucket_name, access_key_id) .await?; if access_key_id.is_none() && self.store.bucket_exists(bucket_name).await { if let Err(error) = self.store.delete_bucket_recursive(bucket_name).await { if let Err(rollback_error) = self .auth_runtime .replace_credentials(removal.previous_credentials) .await { return Err(anyhow::anyhow!( "Failed to delete tenant bucket: {}; credential rollback failed: {}", error, rollback_error.message )); } return Err(error); } } Ok(()) } pub async fn list_bucket_tenants(&self) -> Result> { self.ensure_tenant_auth_enabled()?; Ok(self.auth_runtime.list_bucket_tenants().await) } pub async fn get_bucket_tenant_credential( &self, bucket_name: &str, ) -> Result> { self.ensure_tenant_auth_enabled()?; Ok(self .auth_runtime .get_bucket_tenant_credential(bucket_name) .await) } fn ensure_tenant_auth_enabled(&self) -> Result<()> { if !self.auth_runtime.enabled() { anyhow::bail!("Bucket tenants require auth.enabled=true"); } Ok(()) } fn credentials_path(config: &SmartStorageConfig) -> std::path::PathBuf { std::path::PathBuf::from(&config.storage.directory) .join(".smartstorage") .join("credentials.json") } async fn start_standalone(config: &SmartStorageConfig) -> Result> { let store = Arc::new(StorageBackend::Standalone(FileStore::new( config.storage.directory.clone().into(), ))); if config.storage.clean_slate { store.reset().await?; } else { store.initialize().await?; } Ok(store) } async fn start_clustered( config: &SmartStorageConfig, cluster_config: &crate::cluster::config::ClusterConfig, ) -> Result<(Arc, Vec>)> { let erasure_config = cluster_config.erasure.clone(); let cluster_metadata_dir = persistence::cluster_metadata_dir(&config.storage.directory); let identity_path = persistence::identity_path(&cluster_metadata_dir); let topology_path = persistence::topology_path(&cluster_metadata_dir); let persisted_identity = persistence::load_identity(&identity_path).await?; if let (Some(configured_node_id), Some(identity)) = (&cluster_config.node_id, &persisted_identity) { if configured_node_id != &identity.node_id { anyhow::bail!( "Configured cluster node ID '{}' conflicts with persisted node ID '{}'", configured_node_id, identity.node_id ); } } let node_id = cluster_config .node_id .clone() .or_else(|| { persisted_identity .as_ref() .map(|identity| identity.node_id.clone()) }) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); let cluster_id = persisted_identity .as_ref() .map(|identity| identity.cluster_id.clone()) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); persistence::persist_identity( &identity_path, &ClusterIdentity::new(node_id.clone(), cluster_id.clone()), ) .await?; // Determine drive paths let drive_paths: Vec = if cluster_config.drives.paths.is_empty() { // Default: use storage directory as a single drive vec![std::path::PathBuf::from(&config.storage.directory)] } else { cluster_config .drives .paths .iter() .map(std::path::PathBuf::from) .collect() }; // Ensure directories exist let manifest_dir = std::path::PathBuf::from(&config.storage.directory).join(".manifests"); let buckets_dir = std::path::PathBuf::from(&config.storage.directory).join(".buckets"); tokio::fs::create_dir_all(&manifest_dir).await?; tokio::fs::create_dir_all(&buckets_dir).await?; for path in &drive_paths { tokio::fs::create_dir_all(path.join(".smartstorage")).await?; } // Initialize QUIC transport let quic_addr: SocketAddr = format!("{}:{}", config.server.address, cluster_config.quic_port).parse()?; let transport = Arc::new(QuicTransport::new(quic_addr, node_id.clone()).await?); // Initialize cluster state let cluster_state = Arc::new(ClusterState::new( node_id.clone(), cluster_id.clone(), erasure_config.data_shards, erasure_config.parity_shards, Some(topology_path.clone()), )); let persisted_topology = persistence::load_topology(&topology_path).await?; let has_persisted_topology = persisted_topology.is_some(); if let Some(topology) = persisted_topology { if topology.cluster_id != cluster_id { anyhow::bail!( "Persisted topology cluster ID does not match persisted node identity" ); } cluster_state.apply_topology(&topology).await; } else if cluster_config.seed_nodes.is_empty() { // Form erasure sets from local drives for a first node bootstrap. let nodes = vec![(node_id.clone(), drive_paths.len() as u32)]; let erasure_sets = placement::form_erasure_sets(&nodes, erasure_config.total_shards()); if erasure_sets.is_empty() { tracing::warn!( "Not enough drives ({}) for erasure set size ({}). \ Need at least {} drives.", drive_paths.len(), erasure_config.total_shards(), erasure_config.total_shards(), ); } cluster_state.set_erasure_sets(erasure_sets).await; } // Register self as a node let local_node_info = NodeInfo { node_id: node_id.clone(), quic_addr: quic_addr.to_string(), s3_addr: format!("{}:{}", config.server.address, config.server.port), drive_count: drive_paths.len() as u32, status: "online".to_string(), version: env!("CARGO_PKG_VERSION").to_string(), }; // Initialize drive manager for health monitoring let drive_manager = Arc::new(tokio::sync::Mutex::new( DriveManager::from_paths(&drive_paths).await?, )); // Join cluster if seed nodes are configured let membership = Arc::new( MembershipManager::new( cluster_state.clone(), transport.clone(), cluster_config.heartbeat_interval_ms, cluster_config.heartbeat_timeout_ms, local_node_info, ) .with_drive_manager(drive_manager.clone()), ); membership .join_cluster( &cluster_config.seed_nodes, cluster_config.seed_nodes.is_empty() && !has_persisted_topology, ) .await?; let final_cluster_id = cluster_state.cluster_id().await; if final_cluster_id != cluster_id { persistence::persist_identity( &identity_path, &ClusterIdentity::new(node_id.clone(), final_cluster_id), ) .await?; } // Build local shard stores (one per drive) for shared use let local_shard_stores: Vec> = drive_paths .iter() .map(|p| Arc::new(ShardStore::new(p.clone()))) .collect(); // Start QUIC accept loop for incoming connections let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false); let transport_clone = transport.clone(); let cluster_state_for_accept = cluster_state.clone(); let shard_stores_for_accept = local_shard_stores.clone(); tokio::spawn(async move { transport_clone .accept_loop( shard_stores_for_accept, Some(cluster_state_for_accept), quic_shutdown_rx, ) .await; }); // Start heartbeat loop let membership_clone = membership.clone(); let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false); tokio::spawn(async move { membership_clone.heartbeat_loop(hb_shutdown_rx).await; }); // Start healing service let healing_runtime = Arc::new(tokio::sync::RwLock::new( crate::cluster::healing::HealingRuntimeState::default(), )); let healing_service = HealingService::new( cluster_state.clone(), &erasure_config, local_shard_stores.clone(), manifest_dir.clone(), 24, // scan every 24 hours healing_runtime.clone(), )?; let (heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false); tokio::spawn(async move { healing_service.run(heal_shutdown_rx).await; }); // Create distributed store let distributed_store = DistributedStore::new( cluster_state, transport, erasure_config, std::path::PathBuf::from(&config.storage.directory), drive_paths, drive_manager, healing_runtime, manifest_dir, buckets_dir, )?; distributed_store.initialize_runtime_stats().await; let store = Arc::new(StorageBackend::Clustered(distributed_store)); if !config.server.silent { tracing::info!( "Cluster mode enabled (node_id={}, quic_port={})", node_id, cluster_config.quic_port ); } Ok(( store, vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx], )) } } impl SmartStorageConfig { fn address(&self) -> &str { &self.server.address } } // ============================ // Request handling // ============================ type BoxBody = http_body_util::combinators::BoxBody>; fn full_body(data: impl Into) -> BoxBody { http_body_util::Full::new(data.into()) .map_err( |never: std::convert::Infallible| -> Box { match never {} }, ) .boxed() } fn empty_body() -> BoxBody { http_body_util::Empty::new() .map_err( |never: std::convert::Infallible| -> Box { match never {} }, ) .boxed() } fn stream_body(reader: tokio::fs::File, content_length: u64) -> BoxBody { let stream = ReaderStream::with_capacity(reader.take(content_length), 64 * 1024); let mapped = FrameStream { inner: stream }; http_body_util::StreamBody::new(mapped).boxed() } /// Adapter that converts ReaderStream into a Stream of Frame struct FrameStream { inner: ReaderStream>, } impl Stream for FrameStream { type Item = Result, Box>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) }; match inner.poll_next(cx) { Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(hyper::body::Frame::data(bytes)))), Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err( Box::new(e) as Box ))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } fn xml_response(status: StatusCode, xml: String, request_id: &str) -> Response { Response::builder() .status(status) .header("content-type", "application/xml") .header("x-amz-request-id", request_id) .body(full_body(xml)) .unwrap() } fn empty_response(status: StatusCode, request_id: &str) -> Response { Response::builder() .status(status) .header("x-amz-request-id", request_id) .body(empty_body()) .unwrap() } fn storage_error_response(err: &StorageError, request_id: &str) -> Response { let xml = err.to_xml(); Response::builder() .status(err.status) .header("content-type", "application/xml") .header("x-amz-request-id", request_id) .body(full_body(xml)) .unwrap() } fn json_response( status: StatusCode, value: serde_json::Value, request_id: &str, ) -> Response { Response::builder() .status(status) .header("content-type", "application/json") .header("x-amz-request-id", request_id) .body(full_body(value.to_string())) .unwrap() } fn text_response( status: StatusCode, content_type: &str, body: String, request_id: &str, ) -> Response { Response::builder() .status(status) .header("content-type", content_type) .header("x-amz-request-id", request_id) .body(full_body(body)) .unwrap() } async fn handle_request( req: Request, store: Arc, config: SmartStorageConfig, auth_runtime: Arc, policy_store: Arc, metrics: Arc, ) -> Result, std::convert::Infallible> { let request_id = Uuid::new_v4().to_string(); let method = req.method().clone(); let uri = req.uri().clone(); let start = std::time::Instant::now(); // Handle CORS preflight if config.cors.enabled && method == Method::OPTIONS { let resp = build_cors_preflight(&config, &request_id); metrics.record_response(resp.status()); return Ok(resp); } if method == Method::GET && uri.path().starts_with("/-/") { let resp = match handle_operational_request(uri.path(), store, &config, &metrics, &request_id) .await { Ok(resp) => resp, Err(error) => { tracing::error!(error = %error, "Operational endpoint failed"); json_response( StatusCode::INTERNAL_SERVER_ERROR, serde_json::json!({ "ok": false, "error": error.to_string() }), &request_id, ) } }; metrics.record_response(resp.status()); return Ok(resp); } // Step 1: Resolve storage action from request let request_ctx = action::resolve_action(&req); // Step 2: Auth + policy pipeline if auth_runtime.enabled() { // Attempt authentication let identity = { let has_auth_header = req .headers() .get("authorization") .and_then(|v| v.to_str().ok()) .map(|s| !s.is_empty()) .unwrap_or(false); if has_auth_header { let credentials = auth_runtime.snapshot_credentials().await; match auth::verify_request(&req, &credentials) { Ok(id) => Some(id), Err(e) => { tracing::warn!("Auth failed: {}", e.message); let resp = storage_error_response(&e, &request_id); metrics.record_response(resp.status()); return Ok(resp); } } } else { None // Anonymous request } }; // Step 3: Authorization (policy evaluation) if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await { let resp = storage_error_response(&e, &request_id); metrics.record_response(resp.status()); return Ok(resp); } } // Route and handle let mut response = match route_request(req, store, &config, &request_id, &policy_store).await { Ok(resp) => resp, Err(err) => { if let Some(s3err) = err.downcast_ref::() { storage_error_response(s3err, &request_id) } else { tracing::error!("Internal error: {}", err); let s3err = StorageError::internal_error(&err.to_string()); storage_error_response(&s3err, &request_id) } } }; // Add CORS headers if enabled if config.cors.enabled { add_cors_headers(response.headers_mut(), &config); } let duration = start.elapsed(); tracing::info!( method = %method, path = %uri.path(), status = %response.status().as_u16(), duration_ms = %duration.as_millis(), "request" ); metrics.record_response(response.status()); Ok(response) } async fn handle_operational_request( path: &str, store: Arc, config: &SmartStorageConfig, metrics: &ServerMetrics, request_id: &str, ) -> Result> { match path { "/-/live" | "/-/livez" => Ok(json_response( StatusCode::OK, serde_json::json!({ "ok": true, "status": "alive", "startedAt": metrics.started_at.timestamp_millis(), }), request_id, )), "/-/ready" | "/-/readyz" => { let cluster_health = store.get_cluster_health().await?; let cluster_ready = !cluster_health.enabled || (cluster_health.majority_healthy.unwrap_or(false) && cluster_health.quorum_healthy.unwrap_or(false)); let status = if cluster_ready { StatusCode::OK } else { StatusCode::SERVICE_UNAVAILABLE }; Ok(json_response( status, serde_json::json!({ "ok": cluster_ready, "status": if cluster_ready { "ready" } else { "degraded" }, "cluster": cluster_health, }), request_id, )) } "/-/health" | "/-/healthz" => { let cluster_health = store.get_cluster_health().await?; let stats = store.get_storage_stats().await?; Ok(json_response( StatusCode::OK, serde_json::json!({ "ok": true, "status": "healthy", "version": env!("CARGO_PKG_VERSION"), "server": { "address": config.server.address, "port": config.server.port, "startedAt": metrics.started_at.timestamp_millis(), }, "storage": stats, "cluster": cluster_health, "metrics": { "totalRequests": metrics.total_requests.load(Ordering::Relaxed), "errorResponses": metrics.error_responses.load(Ordering::Relaxed), }, }), request_id, )) } "/-/metrics" => { let cluster_health = store.get_cluster_health().await?; let stats = store.get_storage_stats().await?; let cluster_enabled = if cluster_health.enabled { 1 } else { 0 }; let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) { 1 } else { 0 }; let body = format!( "# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\ # TYPE smartstorage_requests_total counter\n\ smartstorage_requests_total {}\n\ # HELP smartstorage_error_responses_total HTTP responses with status >= 400.\n\ # TYPE smartstorage_error_responses_total counter\n\ smartstorage_error_responses_total {}\n\ # HELP smartstorage_buckets_total Runtime bucket count.\n\ # TYPE smartstorage_buckets_total gauge\n\ smartstorage_buckets_total {}\n\ # HELP smartstorage_objects_total Runtime object count.\n\ # TYPE smartstorage_objects_total gauge\n\ smartstorage_objects_total {}\n\ # HELP smartstorage_cluster_enabled Cluster mode enabled.\n\ # TYPE smartstorage_cluster_enabled gauge\n\ smartstorage_cluster_enabled {}\n\ # HELP smartstorage_cluster_quorum_healthy Cluster quorum health.\n\ # TYPE smartstorage_cluster_quorum_healthy gauge\n\ smartstorage_cluster_quorum_healthy {}\n", metrics.total_requests.load(Ordering::Relaxed), metrics.error_responses.load(Ordering::Relaxed), stats.bucket_count, stats.total_object_count, cluster_enabled, quorum_healthy, ); Ok(text_response( StatusCode::OK, "text/plain; version=0.0.4", body, request_id, )) } _ => Ok(json_response( StatusCode::NOT_FOUND, serde_json::json!({ "ok": false, "error": "Unknown operational endpoint" }), request_id, )), } } /// Authorize a request based on bucket policies and authentication state. async fn authorize_request( ctx: &RequestContext, identity: Option<&AuthenticatedIdentity>, policy_store: &PolicyStore, ) -> Result<(), StorageError> { if let Some(identity) = identity { if let Some(bucket_name) = identity.bucket_name.as_deref() { authorize_scoped_credential(ctx, bucket_name)?; } } // ListAllMyBuckets requires authentication (no bucket to apply policy to) if ctx.action == StorageAction::ListAllMyBuckets { if identity.is_none() { return Err(StorageError::access_denied()); } return Ok(()); } // If there's a bucket, check its policy if let Some(ref bucket) = ctx.bucket { if let Some(bucket_policy) = policy_store.get_policy(bucket).await { let decision = policy::evaluate_policy(&bucket_policy, ctx, identity); match decision { PolicyDecision::Deny => return Err(StorageError::access_denied()), PolicyDecision::Allow => return Ok(()), PolicyDecision::NoOpinion => { // Fall through to default behavior } } } } // Default: authenticated users get full access, anonymous denied if identity.is_none() { return Err(StorageError::access_denied()); } Ok(()) } fn authorize_scoped_credential( ctx: &RequestContext, bucket_name: &str, ) -> Result<(), StorageError> { let Some(request_bucket) = ctx.bucket.as_deref() else { return Err(StorageError::access_denied()); }; if request_bucket != bucket_name { return Err(StorageError::access_denied()); } if let Some(source_bucket) = ctx.source_bucket.as_deref() { if source_bucket != bucket_name { return Err(StorageError::access_denied()); } } match ctx.action { StorageAction::CreateBucket | StorageAction::DeleteBucket | StorageAction::GetBucketPolicy | StorageAction::PutBucketPolicy | StorageAction::DeleteBucketPolicy | StorageAction::ListAllMyBuckets => Err(StorageError::access_denied()), StorageAction::HeadBucket | StorageAction::ListBucket | StorageAction::GetObject | StorageAction::HeadObject | StorageAction::PutObject | StorageAction::DeleteObject | StorageAction::CopyObject | StorageAction::ListBucketMultipartUploads | StorageAction::AbortMultipartUpload | StorageAction::InitiateMultipartUpload | StorageAction::UploadPart | StorageAction::CompleteMultipartUpload => Ok(()), } } // ============================ // Routing // ============================ async fn route_request( req: Request, store: Arc, _config: &SmartStorageConfig, request_id: &str, policy_store: &Arc, ) -> Result> { let method = req.method().clone(); let path = req.uri().path().to_string(); let query_string = req.uri().query().unwrap_or("").to_string(); let query = parse_query(&query_string); // Parse path: /, /{bucket}, /{bucket}/{key...} let segments: Vec<&str> = path .trim_start_matches('/') .splitn(2, '/') .filter(|s| !s.is_empty()) .collect(); match segments.len() { 0 => { // Root: GET / -> ListBuckets match method { Method::GET => handle_list_buckets(store, request_id).await, _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), } } 1 => { // Bucket level: /{bucket} let bucket = percent_decode(segments[0]); // Check for ?policy query parameter if query.contains_key("policy") { return match method { Method::GET => { handle_get_bucket_policy(policy_store, &bucket, request_id).await } Method::PUT => { handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id) .await } Method::DELETE => { handle_delete_bucket_policy(policy_store, &bucket, request_id).await } _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), }; } match method { Method::GET => { if query.contains_key("uploads") { handle_list_multipart_uploads(store, &bucket, request_id).await } else { handle_list_objects(store, &bucket, &query, request_id).await } } Method::PUT => handle_create_bucket(store, &bucket, request_id).await, Method::DELETE => { handle_delete_bucket(store, &bucket, request_id, policy_store).await } Method::HEAD => handle_head_bucket(store, &bucket, request_id).await, _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), } } 2 => { // Object level: /{bucket}/{key...} let bucket = percent_decode(segments[0]); let key = percent_decode(segments[1]); match method { Method::PUT => { if query.contains_key("partNumber") && query.contains_key("uploadId") { handle_upload_part(req, store, &query, request_id).await } else if req.headers().contains_key("x-amz-copy-source") { handle_copy_object(req, store, &bucket, &key, request_id).await } else { handle_put_object(req, store, &bucket, &key, request_id).await } } Method::GET => handle_get_object(req, store, &bucket, &key, request_id).await, Method::HEAD => handle_head_object(store, &bucket, &key, request_id).await, Method::DELETE => { if query.contains_key("uploadId") { let upload_id = query.get("uploadId").unwrap(); handle_abort_multipart(store, upload_id, request_id).await } else { handle_delete_object(store, &bucket, &key, request_id).await } } Method::POST => { if query.contains_key("uploads") { handle_initiate_multipart(req, store, &bucket, &key, request_id).await } else if query.contains_key("uploadId") { let upload_id = query.get("uploadId").unwrap().clone(); handle_complete_multipart(req, store, &bucket, &key, &upload_id, request_id) .await } else { let err = StorageError::invalid_request("Invalid POST request"); Ok(storage_error_response(&err, request_id)) } } _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), } } _ => Ok(empty_response(StatusCode::BAD_REQUEST, request_id)), } } // ============================ // Handlers // ============================ async fn handle_list_buckets( store: Arc, request_id: &str, ) -> Result> { let buckets = store.list_buckets().await?; let xml = xml_response::list_buckets_xml(&buckets); Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_create_bucket( store: Arc, bucket: &str, request_id: &str, ) -> Result> { store.create_bucket(bucket).await?; Ok(empty_response(StatusCode::OK, request_id)) } async fn handle_delete_bucket( store: Arc, bucket: &str, request_id: &str, policy_store: &Arc, ) -> Result> { store.delete_bucket(bucket).await?; // Clean up bucket policy on deletion let _ = policy_store.delete_policy(bucket).await; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_head_bucket( store: Arc, bucket: &str, request_id: &str, ) -> Result> { if store.bucket_exists(bucket).await { Ok(empty_response(StatusCode::OK, request_id)) } else { Err(StorageError::no_such_bucket().into()) } } async fn handle_list_objects( store: Arc, bucket: &str, query: &HashMap, request_id: &str, ) -> Result> { let prefix = query.get("prefix").map(|s| s.as_str()).unwrap_or(""); let delimiter = query.get("delimiter").map(|s| s.as_str()).unwrap_or(""); let max_keys = query .get("max-keys") .and_then(|s| s.parse().ok()) .unwrap_or(1000usize); let continuation_token = query.get("continuation-token").map(|s| s.as_str()); let is_v2 = query.get("list-type").map(|s| s.as_str()) == Some("2"); let result = store .list_objects(bucket, prefix, delimiter, max_keys, continuation_token) .await?; let xml = if is_v2 { xml_response::list_objects_v2_xml(bucket, &result) } else { xml_response::list_objects_v1_xml(bucket, &result) }; Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_put_object( req: Request, store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { let metadata = extract_metadata(req.headers()); let body = req.into_body(); let result = store.put_object(bucket, key, body, metadata).await?; let resp = Response::builder() .status(StatusCode::OK) .header("ETag", format!("\"{}\"", result.md5)) .header("x-amz-request-id", request_id) .body(empty_body()) .unwrap(); Ok(resp) } async fn handle_get_object( req: Request, store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { // Parse Range header let range = parse_range_header(req.headers()); let result = store.get_object(bucket, key, range).await?; let content_type = result .metadata .get("content-type") .cloned() .unwrap_or_else(|| "binary/octet-stream".to_string()); let mut builder = Response::builder() .header("ETag", format!("\"{}\"", result.md5)) .header( "Last-Modified", result .last_modified .format("%a, %d %b %Y %H:%M:%S GMT") .to_string(), ) .header("Content-Type", &content_type) .header("Accept-Ranges", "bytes") .header("x-amz-request-id", request_id); // Add custom metadata headers for (k, v) in &result.metadata { if k.starts_with("x-amz-meta-") { builder = builder.header(k.as_str(), v.as_str()); } } if let Some((start, end)) = range { let content_length = end - start + 1; let resp = builder .status(StatusCode::PARTIAL_CONTENT) .header("Content-Length", content_length.to_string()) .header( "Content-Range", format!("bytes {}-{}/{}", start, end, result.size), ) .body(stream_body(result.body, content_length)) .unwrap(); Ok(resp) } else { let resp = builder .status(StatusCode::OK) .header("Content-Length", result.size.to_string()) .body(stream_body(result.body, result.content_length)) .unwrap(); Ok(resp) } } async fn handle_head_object( store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { let result = store.head_object(bucket, key).await?; let content_type = result .metadata .get("content-type") .cloned() .unwrap_or_else(|| "binary/octet-stream".to_string()); let mut builder = Response::builder() .status(StatusCode::OK) .header("ETag", format!("\"{}\"", result.md5)) .header( "Last-Modified", result .last_modified .format("%a, %d %b %Y %H:%M:%S GMT") .to_string(), ) .header("Content-Type", &content_type) .header("Content-Length", result.size.to_string()) .header("Accept-Ranges", "bytes") .header("x-amz-request-id", request_id); for (k, v) in &result.metadata { if k.starts_with("x-amz-meta-") { builder = builder.header(k.as_str(), v.as_str()); } } Ok(builder.body(empty_body()).unwrap()) } async fn handle_delete_object( store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { store.delete_object(bucket, key).await?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_copy_object( req: Request, store: Arc, dest_bucket: &str, dest_key: &str, request_id: &str, ) -> Result> { let copy_source = req .headers() .get("x-amz-copy-source") .and_then(|v| v.to_str().ok()) .unwrap_or("") .to_string(); let metadata_directive = req .headers() .get("x-amz-metadata-directive") .and_then(|v| v.to_str().ok()) .unwrap_or("COPY") .to_uppercase(); // Parse source: /bucket/key or bucket/key let source = copy_source.trim_start_matches('/'); let first_slash = source.find('/').unwrap_or(source.len()); let src_bucket = percent_decode(&source[..first_slash]); let src_key = if first_slash < source.len() { percent_decode(&source[first_slash + 1..]) } else { String::new() }; let new_metadata = if metadata_directive == "REPLACE" { Some(extract_metadata(req.headers())) } else { None }; let result = store .copy_object( &src_bucket, &src_key, dest_bucket, dest_key, &metadata_directive, new_metadata, ) .await?; let xml = xml_response::copy_object_result_xml(&result.md5, &result.last_modified.to_rfc3339()); Ok(xml_response(StatusCode::OK, xml, request_id)) } // ============================ // Policy handlers // ============================ async fn handle_get_bucket_policy( policy_store: &Arc, bucket: &str, request_id: &str, ) -> Result> { match policy_store.get_policy(bucket).await { Some(p) => { let json = serde_json::to_string_pretty(&p)?; let resp = Response::builder() .status(StatusCode::OK) .header("content-type", "application/json") .header("x-amz-request-id", request_id) .body(full_body(json)) .unwrap(); Ok(resp) } None => Err(StorageError::no_such_bucket_policy().into()), } } async fn handle_put_bucket_policy( req: Request, store: &Arc, policy_store: &Arc, bucket: &str, request_id: &str, ) -> Result> { // Verify bucket exists if !store.bucket_exists(bucket).await { return Err(StorageError::no_such_bucket().into()); } // Read body let body_bytes = req .collect() .await .map_err(|e| anyhow::anyhow!("Body error: {}", e))? .to_bytes(); let body_str = String::from_utf8_lossy(&body_bytes); // Validate and parse let validated_policy = policy::validate_policy(&body_str)?; // Store policy_store .put_policy(bucket, validated_policy) .await .map_err(|e| StorageError::internal_error(&e.to_string()))?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_delete_bucket_policy( policy_store: &Arc, bucket: &str, request_id: &str, ) -> Result> { policy_store .delete_policy(bucket) .await .map_err(|e| StorageError::internal_error(&e.to_string()))?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } // ============================ // Multipart handlers // ============================ async fn handle_initiate_multipart( req: Request, store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { let metadata = extract_metadata(req.headers()); let upload_id = store.initiate_multipart(bucket, key, metadata).await?; let xml = xml_response::initiate_multipart_xml(bucket, key, &upload_id); Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_upload_part( req: Request, store: Arc, query: &HashMap, request_id: &str, ) -> Result> { let upload_id = query.get("uploadId").unwrap(); let part_number: u32 = query .get("partNumber") .and_then(|s| s.parse().ok()) .unwrap_or(0); if part_number < 1 || part_number > 10000 { return Err(StorageError::invalid_part_number().into()); } let body = req.into_body(); let (etag, _size) = store.upload_part(upload_id, part_number, body).await?; let resp = Response::builder() .status(StatusCode::OK) .header("ETag", format!("\"{}\"", etag)) .header("x-amz-request-id", request_id) .body(empty_body()) .unwrap(); Ok(resp) } async fn handle_complete_multipart( req: Request, store: Arc, bucket: &str, key: &str, upload_id: &str, request_id: &str, ) -> Result> { // Read request body (XML) let body_bytes = req .collect() .await .map_err(|e| anyhow::anyhow!("Body error: {}", e))? .to_bytes(); let body_str = String::from_utf8_lossy(&body_bytes); // Parse parts from XML using regex-like approach let parts = parse_complete_multipart_xml(&body_str); let result = store.complete_multipart(upload_id, &parts).await?; let xml = xml_response::complete_multipart_xml(bucket, key, &result.etag); Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_abort_multipart( store: Arc, upload_id: &str, request_id: &str, ) -> Result> { store.abort_multipart(upload_id).await?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_list_multipart_uploads( store: Arc, bucket: &str, request_id: &str, ) -> Result> { let uploads = store.list_multipart_uploads(bucket).await?; let xml = xml_response::list_multipart_uploads_xml(bucket, &uploads); Ok(xml_response(StatusCode::OK, xml, request_id)) } // ============================ // Helpers // ============================ fn parse_query(query_string: &str) -> HashMap { let mut map = HashMap::new(); if query_string.is_empty() { return map; } for pair in query_string.split('&') { let mut parts = pair.splitn(2, '='); let key = parts.next().unwrap_or(""); let value = parts.next().unwrap_or(""); let key = percent_decode(key); let value = percent_decode(value); map.insert(key, value); } map } fn percent_decode(s: &str) -> String { percent_encoding::percent_decode_str(s) .decode_utf8_lossy() .to_string() } fn extract_metadata(headers: &hyper::HeaderMap) -> HashMap { let mut metadata = HashMap::new(); for (name, value) in headers { let name_str = name.as_str().to_lowercase(); if let Ok(val) = value.to_str() { match name_str.as_str() { "content-type" | "cache-control" | "content-disposition" | "content-encoding" | "content-language" | "expires" => { metadata.insert(name_str, val.to_string()); } _ if name_str.starts_with("x-amz-meta-") => { metadata.insert(name_str, val.to_string()); } _ => {} } } } // Default content-type if !metadata.contains_key("content-type") { metadata.insert( "content-type".to_string(), "binary/octet-stream".to_string(), ); } metadata } fn parse_range_header(headers: &hyper::HeaderMap) -> Option<(u64, u64)> { let range_val = headers.get("range")?.to_str().ok()?; let bytes_prefix = "bytes="; if !range_val.starts_with(bytes_prefix) { return None; } let range_spec = &range_val[bytes_prefix.len()..]; let mut parts = range_spec.splitn(2, '-'); let start: u64 = parts.next()?.parse().ok()?; let end_str = parts.next()?; let end: u64 = if end_str.is_empty() { // If no end specified, we'll handle this later based on file size u64::MAX } else { end_str.parse().ok()? }; Some((start, end)) } fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> { let mut parts = Vec::new(); // Simple XML parsing for N... let mut remaining = xml; while let Some(part_start) = remaining.find("") { let after_part = &remaining[part_start + 6..]; if let Some(part_end) = after_part.find("") { let part_content = &after_part[..part_end]; let part_number = extract_xml_value(part_content, "PartNumber").and_then(|s| s.parse::().ok()); let etag = extract_xml_value(part_content, "ETag").map(|s| s.replace('"', "")); if let (Some(pn), Some(et)) = (part_number, etag) { parts.push((pn, et)); } remaining = &after_part[part_end + 7..]; } else { break; } } parts.sort_by_key(|(pn, _)| *pn); parts } fn extract_xml_value<'a>(xml: &'a str, tag: &str) -> Option { let open = format!("<{}>", tag); let close = format!("", tag); let start = xml.find(&open)? + open.len(); let end = xml.find(&close)?; Some(xml[start..end].to_string()) } // ============================ // CORS // ============================ fn build_cors_preflight(config: &SmartStorageConfig, request_id: &str) -> Response { let mut builder = Response::builder() .status(StatusCode::NO_CONTENT) .header("x-amz-request-id", request_id); if let Some(ref origins) = config.cors.allowed_origins { builder = builder.header("Access-Control-Allow-Origin", origins.join(", ")); } if let Some(ref methods) = config.cors.allowed_methods { builder = builder.header("Access-Control-Allow-Methods", methods.join(", ")); } if let Some(ref headers) = config.cors.allowed_headers { builder = builder.header("Access-Control-Allow-Headers", headers.join(", ")); } if let Some(max_age) = config.cors.max_age { builder = builder.header("Access-Control-Max-Age", max_age.to_string()); } if config.cors.allow_credentials == Some(true) { builder = builder.header("Access-Control-Allow-Credentials", "true"); } builder.body(empty_body()).unwrap() } fn add_cors_headers(headers: &mut hyper::HeaderMap, config: &SmartStorageConfig) { if let Some(ref origins) = config.cors.allowed_origins { headers.insert( "access-control-allow-origin", origins.join(", ").parse().unwrap(), ); } if let Some(ref exposed) = config.cors.exposed_headers { headers.insert( "access-control-expose-headers", exposed.join(", ").parse().unwrap(), ); } if config.cors.allow_credentials == Some(true) { headers.insert("access-control-allow-credentials", "true".parse().unwrap()); } }