2026-02-13 13:59:44 +00:00
|
|
|
use anyhow::Result;
|
|
|
|
|
use bytes::Bytes;
|
|
|
|
|
use futures_core::Stream;
|
|
|
|
|
use http_body_util::BodyExt;
|
|
|
|
|
use hyper::body::Incoming;
|
|
|
|
|
use hyper::server::conn::http1;
|
|
|
|
|
use hyper::service::service_fn;
|
|
|
|
|
use hyper::{Method, Request, Response, StatusCode};
|
|
|
|
|
use hyper_util::rt::TokioIo;
|
|
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::net::SocketAddr;
|
|
|
|
|
use std::pin::Pin;
|
|
|
|
|
use std::sync::Arc;
|
2026-04-30 06:08:42 +00:00
|
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
2026-02-13 13:59:44 +00:00
|
|
|
use std::task::{Context, Poll};
|
|
|
|
|
use tokio::io::AsyncReadExt;
|
|
|
|
|
use tokio::net::TcpListener;
|
|
|
|
|
use tokio::sync::watch;
|
|
|
|
|
use tokio_util::io::ReaderStream;
|
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
use crate::action::{self, RequestContext, StorageAction};
|
2026-02-17 16:28:50 +00:00
|
|
|
use crate::auth::{self, AuthenticatedIdentity};
|
2026-03-14 15:20:30 +00:00
|
|
|
use crate::config::SmartStorageConfig;
|
2026-02-17 16:28:50 +00:00
|
|
|
use crate::policy::{self, PolicyDecision, PolicyStore};
|
2026-03-14 15:20:30 +00:00
|
|
|
use crate::error::StorageError;
|
2026-03-21 21:50:42 +00:00
|
|
|
use crate::cluster::coordinator::DistributedStore;
|
2026-03-21 22:00:41 +00:00
|
|
|
use crate::cluster::drive_manager::DriveManager;
|
2026-03-21 22:19:51 +00:00
|
|
|
use crate::cluster::healing::HealingService;
|
2026-03-21 21:50:42 +00:00
|
|
|
use crate::cluster::membership::MembershipManager;
|
2026-04-30 06:08:42 +00:00
|
|
|
use crate::cluster::persistence::{self, ClusterIdentity};
|
2026-03-21 21:50:42 +00:00
|
|
|
use crate::cluster::placement;
|
|
|
|
|
use crate::cluster::protocol::NodeInfo;
|
|
|
|
|
use crate::cluster::quic_transport::QuicTransport;
|
|
|
|
|
use crate::cluster::shard_store::ShardStore;
|
|
|
|
|
use crate::cluster::state::ClusterState;
|
|
|
|
|
use crate::storage::{FileStore, StorageBackend};
|
2026-02-13 13:59:44 +00:00
|
|
|
use crate::xml_response;
|
|
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
struct ServerMetrics {
|
|
|
|
|
started_at: chrono::DateTime<chrono::Utc>,
|
|
|
|
|
total_requests: AtomicU64,
|
|
|
|
|
error_responses: AtomicU64,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ServerMetrics {
|
|
|
|
|
fn new() -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
started_at: chrono::Utc::now(),
|
|
|
|
|
total_requests: AtomicU64::new(0),
|
|
|
|
|
error_responses: AtomicU64::new(0),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn record_response(&self, status: StatusCode) {
|
|
|
|
|
self.total_requests.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
if status.as_u16() >= 400 {
|
|
|
|
|
self.error_responses.fetch_add(1, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
pub struct StorageServer {
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-04-19 11:57:28 +00:00
|
|
|
auth_runtime: Arc<auth::RuntimeCredentialStore>,
|
2026-02-13 13:59:44 +00:00
|
|
|
shutdown_tx: watch::Sender<bool>,
|
2026-04-30 06:08:42 +00:00
|
|
|
cluster_shutdown_txs: Vec<watch::Sender<bool>>,
|
2026-02-13 13:59:44 +00:00
|
|
|
server_handle: tokio::task::JoinHandle<()>,
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
impl StorageServer {
|
|
|
|
|
pub async fn start(config: SmartStorageConfig) -> Result<Self> {
|
2026-04-19 11:57:28 +00:00
|
|
|
let auth_runtime = Arc::new(auth::RuntimeCredentialStore::new(&config.auth));
|
2026-04-30 06:08:42 +00:00
|
|
|
let mut cluster_shutdown_txs = Vec::new();
|
2026-03-21 21:50:42 +00:00
|
|
|
let store: Arc<StorageBackend> = if let Some(ref cluster_config) = config.cluster {
|
|
|
|
|
if cluster_config.enabled {
|
2026-04-30 06:08:42 +00:00
|
|
|
let (store, shutdown_txs) = Self::start_clustered(&config, cluster_config).await?;
|
|
|
|
|
cluster_shutdown_txs = shutdown_txs;
|
|
|
|
|
store
|
2026-03-21 21:50:42 +00:00
|
|
|
} else {
|
|
|
|
|
Self::start_standalone(&config).await?
|
|
|
|
|
}
|
2026-02-13 13:59:44 +00:00
|
|
|
} else {
|
2026-03-21 21:50:42 +00:00
|
|
|
Self::start_standalone(&config).await?
|
|
|
|
|
};
|
2026-02-13 13:59:44 +00:00
|
|
|
|
2026-02-17 16:28:50 +00:00
|
|
|
// Initialize policy store
|
|
|
|
|
let policy_store = Arc::new(PolicyStore::new(store.policies_dir()));
|
|
|
|
|
policy_store.load_from_disk().await?;
|
|
|
|
|
|
2026-02-13 13:59:44 +00:00
|
|
|
let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port)
|
|
|
|
|
.parse()?;
|
|
|
|
|
|
|
|
|
|
let listener = TcpListener::bind(addr).await?;
|
|
|
|
|
let (shutdown_tx, shutdown_rx) = watch::channel(false);
|
|
|
|
|
|
|
|
|
|
let server_store = store.clone();
|
|
|
|
|
let server_config = config.clone();
|
2026-04-19 11:57:28 +00:00
|
|
|
let server_auth_runtime = auth_runtime.clone();
|
2026-02-17 16:28:50 +00:00
|
|
|
let server_policy_store = policy_store.clone();
|
2026-04-30 06:08:42 +00:00
|
|
|
let server_metrics = Arc::new(ServerMetrics::new());
|
2026-02-13 13:59:44 +00:00
|
|
|
|
|
|
|
|
let server_handle = tokio::spawn(async move {
|
|
|
|
|
loop {
|
|
|
|
|
let mut rx = shutdown_rx.clone();
|
|
|
|
|
|
|
|
|
|
tokio::select! {
|
|
|
|
|
result = listener.accept() => {
|
|
|
|
|
match result {
|
|
|
|
|
Ok((stream, _remote_addr)) => {
|
|
|
|
|
let io = TokioIo::new(stream);
|
|
|
|
|
let store = server_store.clone();
|
|
|
|
|
let cfg = server_config.clone();
|
2026-04-19 11:57:28 +00:00
|
|
|
let auth_runtime = server_auth_runtime.clone();
|
2026-02-17 16:28:50 +00:00
|
|
|
let ps = server_policy_store.clone();
|
2026-04-30 06:08:42 +00:00
|
|
|
let metrics = server_metrics.clone();
|
2026-02-13 13:59:44 +00:00
|
|
|
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let svc = service_fn(move |req: Request<Incoming>| {
|
|
|
|
|
let store = store.clone();
|
|
|
|
|
let cfg = cfg.clone();
|
2026-04-19 11:57:28 +00:00
|
|
|
let auth_runtime = auth_runtime.clone();
|
2026-02-17 16:28:50 +00:00
|
|
|
let ps = ps.clone();
|
2026-04-30 06:08:42 +00:00
|
|
|
let metrics = metrics.clone();
|
2026-02-13 13:59:44 +00:00
|
|
|
async move {
|
2026-04-30 06:08:42 +00:00
|
|
|
handle_request(req, store, cfg, auth_runtime, ps, metrics).await
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if let Err(e) = http1::Builder::new()
|
|
|
|
|
.keep_alive(true)
|
|
|
|
|
.serve_connection(io, svc)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
if !e.is_incomplete_message() {
|
|
|
|
|
tracing::error!("Connection error: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
tracing::error!("Accept error: {}", e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ = rx.changed() => {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
if !config.server.silent {
|
2026-03-14 15:20:30 +00:00
|
|
|
tracing::info!("Storage server listening on {}", addr);
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(Self {
|
|
|
|
|
store,
|
2026-04-19 11:57:28 +00:00
|
|
|
auth_runtime,
|
2026-02-13 13:59:44 +00:00
|
|
|
shutdown_tx,
|
2026-04-30 06:08:42 +00:00
|
|
|
cluster_shutdown_txs,
|
2026-02-13 13:59:44 +00:00
|
|
|
server_handle,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn stop(self) {
|
2026-04-30 06:08:42 +00:00
|
|
|
for shutdown_tx in &self.cluster_shutdown_txs {
|
|
|
|
|
let _ = shutdown_tx.send(true);
|
|
|
|
|
}
|
2026-02-13 13:59:44 +00:00
|
|
|
let _ = self.shutdown_tx.send(true);
|
|
|
|
|
let _ = self.server_handle.await;
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-21 21:50:42 +00:00
|
|
|
pub fn store(&self) -> &StorageBackend {
|
2026-02-13 13:59:44 +00:00
|
|
|
&self.store
|
|
|
|
|
}
|
2026-03-21 21:50:42 +00:00
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
pub async fn list_credentials(&self) -> Vec<crate::auth::CredentialMetadata> {
|
2026-04-19 11:57:28 +00:00
|
|
|
self.auth_runtime.list_credentials().await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn replace_credentials(
|
|
|
|
|
&self,
|
|
|
|
|
credentials: Vec<crate::config::Credential>,
|
|
|
|
|
) -> Result<(), StorageError> {
|
|
|
|
|
self.auth_runtime.replace_credentials(credentials).await
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-21 21:50:42 +00:00
|
|
|
async fn start_standalone(config: &SmartStorageConfig) -> Result<Arc<StorageBackend>> {
|
|
|
|
|
let store = Arc::new(StorageBackend::Standalone(
|
|
|
|
|
FileStore::new(config.storage.directory.clone().into()),
|
|
|
|
|
));
|
|
|
|
|
if config.storage.clean_slate {
|
|
|
|
|
store.reset().await?;
|
|
|
|
|
} else {
|
|
|
|
|
store.initialize().await?;
|
|
|
|
|
}
|
|
|
|
|
Ok(store)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn start_clustered(
|
|
|
|
|
config: &SmartStorageConfig,
|
|
|
|
|
cluster_config: &crate::cluster::config::ClusterConfig,
|
2026-04-30 06:08:42 +00:00
|
|
|
) -> Result<(Arc<StorageBackend>, Vec<watch::Sender<bool>>)> {
|
2026-03-21 21:50:42 +00:00
|
|
|
let erasure_config = cluster_config.erasure.clone();
|
2026-04-30 06:08:42 +00:00
|
|
|
let cluster_metadata_dir = persistence::cluster_metadata_dir(&config.storage.directory);
|
|
|
|
|
let identity_path = persistence::identity_path(&cluster_metadata_dir);
|
|
|
|
|
let topology_path = persistence::topology_path(&cluster_metadata_dir);
|
|
|
|
|
let persisted_identity = persistence::load_identity(&identity_path).await?;
|
|
|
|
|
|
|
|
|
|
if let (Some(configured_node_id), Some(identity)) = (&cluster_config.node_id, &persisted_identity) {
|
|
|
|
|
if configured_node_id != &identity.node_id {
|
|
|
|
|
anyhow::bail!(
|
|
|
|
|
"Configured cluster node ID '{}' conflicts with persisted node ID '{}'",
|
|
|
|
|
configured_node_id,
|
|
|
|
|
identity.node_id
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-21 21:50:42 +00:00
|
|
|
let node_id = cluster_config
|
|
|
|
|
.node_id
|
|
|
|
|
.clone()
|
2026-04-30 06:08:42 +00:00
|
|
|
.or_else(|| persisted_identity.as_ref().map(|identity| identity.node_id.clone()))
|
2026-03-21 21:50:42 +00:00
|
|
|
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
2026-04-30 06:08:42 +00:00
|
|
|
let cluster_id = persisted_identity
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(|identity| identity.cluster_id.clone())
|
|
|
|
|
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
|
|
|
|
|
persistence::persist_identity(
|
|
|
|
|
&identity_path,
|
|
|
|
|
&ClusterIdentity::new(node_id.clone(), cluster_id.clone()),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
2026-03-21 21:50:42 +00:00
|
|
|
|
|
|
|
|
// Determine drive paths
|
|
|
|
|
let drive_paths: Vec<std::path::PathBuf> = if cluster_config.drives.paths.is_empty() {
|
|
|
|
|
// Default: use storage directory as a single drive
|
|
|
|
|
vec![std::path::PathBuf::from(&config.storage.directory)]
|
|
|
|
|
} else {
|
|
|
|
|
cluster_config
|
|
|
|
|
.drives
|
|
|
|
|
.paths
|
|
|
|
|
.iter()
|
|
|
|
|
.map(std::path::PathBuf::from)
|
|
|
|
|
.collect()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Ensure directories exist
|
|
|
|
|
let manifest_dir = std::path::PathBuf::from(&config.storage.directory).join(".manifests");
|
|
|
|
|
let buckets_dir = std::path::PathBuf::from(&config.storage.directory).join(".buckets");
|
|
|
|
|
tokio::fs::create_dir_all(&manifest_dir).await?;
|
|
|
|
|
tokio::fs::create_dir_all(&buckets_dir).await?;
|
|
|
|
|
for path in &drive_paths {
|
|
|
|
|
tokio::fs::create_dir_all(path.join(".smartstorage")).await?;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize QUIC transport
|
|
|
|
|
let quic_addr: SocketAddr =
|
|
|
|
|
format!("{}:{}", config.server.address, cluster_config.quic_port).parse()?;
|
|
|
|
|
let transport = Arc::new(QuicTransport::new(quic_addr, node_id.clone()).await?);
|
|
|
|
|
|
|
|
|
|
// Initialize cluster state
|
|
|
|
|
let cluster_state = Arc::new(ClusterState::new(
|
|
|
|
|
node_id.clone(),
|
2026-04-30 06:08:42 +00:00
|
|
|
cluster_id.clone(),
|
2026-03-21 21:50:42 +00:00
|
|
|
erasure_config.data_shards,
|
|
|
|
|
erasure_config.parity_shards,
|
2026-04-30 06:08:42 +00:00
|
|
|
Some(topology_path.clone()),
|
2026-03-21 21:50:42 +00:00
|
|
|
));
|
|
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
let persisted_topology = persistence::load_topology(&topology_path).await?;
|
|
|
|
|
let has_persisted_topology = persisted_topology.is_some();
|
|
|
|
|
if let Some(topology) = persisted_topology {
|
|
|
|
|
if topology.cluster_id != cluster_id {
|
|
|
|
|
anyhow::bail!("Persisted topology cluster ID does not match persisted node identity");
|
|
|
|
|
}
|
|
|
|
|
cluster_state.apply_topology(&topology).await;
|
|
|
|
|
} else if cluster_config.seed_nodes.is_empty() {
|
|
|
|
|
// Form erasure sets from local drives for a first node bootstrap.
|
|
|
|
|
let nodes = vec![(node_id.clone(), drive_paths.len() as u32)];
|
|
|
|
|
let erasure_sets = placement::form_erasure_sets(&nodes, erasure_config.total_shards());
|
|
|
|
|
|
|
|
|
|
if erasure_sets.is_empty() {
|
|
|
|
|
tracing::warn!(
|
|
|
|
|
"Not enough drives ({}) for erasure set size ({}). \
|
|
|
|
|
Need at least {} drives.",
|
|
|
|
|
drive_paths.len(),
|
|
|
|
|
erasure_config.total_shards(),
|
|
|
|
|
erasure_config.total_shards(),
|
|
|
|
|
);
|
|
|
|
|
}
|
2026-03-21 21:50:42 +00:00
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
cluster_state.set_erasure_sets(erasure_sets).await;
|
|
|
|
|
}
|
2026-03-21 21:50:42 +00:00
|
|
|
|
|
|
|
|
// Register self as a node
|
|
|
|
|
let local_node_info = NodeInfo {
|
|
|
|
|
node_id: node_id.clone(),
|
|
|
|
|
quic_addr: quic_addr.to_string(),
|
|
|
|
|
s3_addr: format!("{}:{}", config.server.address, config.server.port),
|
|
|
|
|
drive_count: drive_paths.len() as u32,
|
|
|
|
|
status: "online".to_string(),
|
|
|
|
|
version: env!("CARGO_PKG_VERSION").to_string(),
|
|
|
|
|
};
|
2026-03-21 22:00:41 +00:00
|
|
|
// Initialize drive manager for health monitoring
|
|
|
|
|
let drive_manager = Arc::new(tokio::sync::Mutex::new(
|
2026-04-19 11:57:28 +00:00
|
|
|
DriveManager::from_paths(&drive_paths).await?,
|
2026-03-21 21:50:42 +00:00
|
|
|
));
|
2026-03-21 22:00:41 +00:00
|
|
|
|
|
|
|
|
// Join cluster if seed nodes are configured
|
|
|
|
|
let membership = Arc::new(
|
|
|
|
|
MembershipManager::new(
|
|
|
|
|
cluster_state.clone(),
|
|
|
|
|
transport.clone(),
|
|
|
|
|
cluster_config.heartbeat_interval_ms,
|
2026-04-30 06:08:42 +00:00
|
|
|
cluster_config.heartbeat_timeout_ms,
|
2026-03-21 22:00:41 +00:00
|
|
|
local_node_info,
|
|
|
|
|
)
|
2026-04-19 11:57:28 +00:00
|
|
|
.with_drive_manager(drive_manager.clone()),
|
2026-03-21 22:00:41 +00:00
|
|
|
);
|
2026-03-21 21:50:42 +00:00
|
|
|
membership
|
2026-04-30 06:08:42 +00:00
|
|
|
.join_cluster(
|
|
|
|
|
&cluster_config.seed_nodes,
|
|
|
|
|
cluster_config.seed_nodes.is_empty() && !has_persisted_topology,
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
let final_cluster_id = cluster_state.cluster_id().await;
|
|
|
|
|
if final_cluster_id != cluster_id {
|
|
|
|
|
persistence::persist_identity(
|
|
|
|
|
&identity_path,
|
|
|
|
|
&ClusterIdentity::new(node_id.clone(), final_cluster_id),
|
|
|
|
|
)
|
2026-03-21 21:50:42 +00:00
|
|
|
.await?;
|
2026-04-30 06:08:42 +00:00
|
|
|
}
|
2026-03-21 21:50:42 +00:00
|
|
|
|
2026-03-21 22:19:51 +00:00
|
|
|
// Build local shard stores (one per drive) for shared use
|
|
|
|
|
let local_shard_stores: Vec<Arc<ShardStore>> = drive_paths
|
|
|
|
|
.iter()
|
|
|
|
|
.map(|p| Arc::new(ShardStore::new(p.clone())))
|
|
|
|
|
.collect();
|
|
|
|
|
|
2026-03-21 21:50:42 +00:00
|
|
|
// Start QUIC accept loop for incoming connections
|
2026-04-30 06:08:42 +00:00
|
|
|
let (quic_shutdown_tx, quic_shutdown_rx) = watch::channel(false);
|
2026-03-21 21:50:42 +00:00
|
|
|
let transport_clone = transport.clone();
|
2026-04-30 06:08:42 +00:00
|
|
|
let cluster_state_for_accept = cluster_state.clone();
|
|
|
|
|
let shard_stores_for_accept = local_shard_stores.clone();
|
2026-03-21 21:50:42 +00:00
|
|
|
tokio::spawn(async move {
|
|
|
|
|
transport_clone
|
2026-04-30 06:08:42 +00:00
|
|
|
.accept_loop(shard_stores_for_accept, Some(cluster_state_for_accept), quic_shutdown_rx)
|
2026-03-21 21:50:42 +00:00
|
|
|
.await;
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Start heartbeat loop
|
|
|
|
|
let membership_clone = membership.clone();
|
2026-04-30 06:08:42 +00:00
|
|
|
let (hb_shutdown_tx, hb_shutdown_rx) = watch::channel(false);
|
2026-03-21 21:50:42 +00:00
|
|
|
tokio::spawn(async move {
|
|
|
|
|
membership_clone.heartbeat_loop(hb_shutdown_rx).await;
|
|
|
|
|
});
|
|
|
|
|
|
2026-03-21 22:19:51 +00:00
|
|
|
// Start healing service
|
2026-04-19 11:57:28 +00:00
|
|
|
let healing_runtime = Arc::new(tokio::sync::RwLock::new(
|
|
|
|
|
crate::cluster::healing::HealingRuntimeState::default(),
|
|
|
|
|
));
|
2026-03-21 22:19:51 +00:00
|
|
|
let healing_service = HealingService::new(
|
|
|
|
|
cluster_state.clone(),
|
|
|
|
|
&erasure_config,
|
|
|
|
|
local_shard_stores.clone(),
|
|
|
|
|
manifest_dir.clone(),
|
|
|
|
|
24, // scan every 24 hours
|
2026-04-19 11:57:28 +00:00
|
|
|
healing_runtime.clone(),
|
2026-03-21 22:19:51 +00:00
|
|
|
)?;
|
2026-04-30 06:08:42 +00:00
|
|
|
let (heal_shutdown_tx, heal_shutdown_rx) = watch::channel(false);
|
2026-03-21 22:19:51 +00:00
|
|
|
tokio::spawn(async move {
|
|
|
|
|
healing_service.run(heal_shutdown_rx).await;
|
|
|
|
|
});
|
|
|
|
|
|
2026-03-21 21:50:42 +00:00
|
|
|
// Create distributed store
|
|
|
|
|
let distributed_store = DistributedStore::new(
|
|
|
|
|
cluster_state,
|
|
|
|
|
transport,
|
|
|
|
|
erasure_config,
|
2026-04-19 11:57:28 +00:00
|
|
|
std::path::PathBuf::from(&config.storage.directory),
|
2026-03-21 21:50:42 +00:00
|
|
|
drive_paths,
|
2026-04-19 11:57:28 +00:00
|
|
|
drive_manager,
|
|
|
|
|
healing_runtime,
|
2026-03-21 21:50:42 +00:00
|
|
|
manifest_dir,
|
|
|
|
|
buckets_dir,
|
|
|
|
|
)?;
|
|
|
|
|
|
2026-04-19 11:57:28 +00:00
|
|
|
distributed_store.initialize_runtime_stats().await;
|
|
|
|
|
|
2026-03-21 21:50:42 +00:00
|
|
|
let store = Arc::new(StorageBackend::Clustered(distributed_store));
|
|
|
|
|
|
|
|
|
|
if !config.server.silent {
|
|
|
|
|
tracing::info!(
|
|
|
|
|
"Cluster mode enabled (node_id={}, quic_port={})",
|
|
|
|
|
node_id,
|
|
|
|
|
cluster_config.quic_port
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
Ok((store, vec![quic_shutdown_tx, hb_shutdown_tx, heal_shutdown_tx]))
|
2026-03-21 21:50:42 +00:00
|
|
|
}
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
impl SmartStorageConfig {
|
2026-02-13 13:59:44 +00:00
|
|
|
fn address(&self) -> &str {
|
|
|
|
|
&self.server.address
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================
|
|
|
|
|
// Request handling
|
|
|
|
|
// ============================
|
|
|
|
|
|
|
|
|
|
type BoxBody = http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
|
|
|
|
|
|
|
|
|
|
fn full_body(data: impl Into<Bytes>) -> BoxBody {
|
|
|
|
|
http_body_util::Full::new(data.into())
|
|
|
|
|
.map_err(|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> { match never {} })
|
|
|
|
|
.boxed()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn empty_body() -> BoxBody {
|
|
|
|
|
http_body_util::Empty::new()
|
|
|
|
|
.map_err(|never: std::convert::Infallible| -> Box<dyn std::error::Error + Send + Sync> { match never {} })
|
|
|
|
|
.boxed()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn stream_body(reader: tokio::fs::File, content_length: u64) -> BoxBody {
|
|
|
|
|
let stream = ReaderStream::with_capacity(reader.take(content_length), 64 * 1024);
|
|
|
|
|
let mapped = FrameStream { inner: stream };
|
|
|
|
|
http_body_util::StreamBody::new(mapped).boxed()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Adapter that converts ReaderStream into a Stream of Frame<Bytes>
|
|
|
|
|
struct FrameStream {
|
|
|
|
|
inner: ReaderStream<tokio::io::Take<tokio::fs::File>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Stream for FrameStream {
|
|
|
|
|
type Item = Result<hyper::body::Frame<Bytes>, Box<dyn std::error::Error + Send + Sync>>;
|
|
|
|
|
|
|
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
|
|
|
let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) };
|
|
|
|
|
match inner.poll_next(cx) {
|
|
|
|
|
Poll::Ready(Some(Ok(bytes))) => {
|
|
|
|
|
Poll::Ready(Some(Ok(hyper::body::Frame::data(bytes))))
|
|
|
|
|
}
|
|
|
|
|
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e) as Box<dyn std::error::Error + Send + Sync>))),
|
|
|
|
|
Poll::Ready(None) => Poll::Ready(None),
|
|
|
|
|
Poll::Pending => Poll::Pending,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn xml_response(status: StatusCode, xml: String, request_id: &str) -> Response<BoxBody> {
|
|
|
|
|
Response::builder()
|
|
|
|
|
.status(status)
|
|
|
|
|
.header("content-type", "application/xml")
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(full_body(xml))
|
|
|
|
|
.unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn empty_response(status: StatusCode, request_id: &str) -> Response<BoxBody> {
|
|
|
|
|
Response::builder()
|
|
|
|
|
.status(status)
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(empty_body())
|
|
|
|
|
.unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
fn storage_error_response(err: &StorageError, request_id: &str) -> Response<BoxBody> {
|
2026-02-13 13:59:44 +00:00
|
|
|
let xml = err.to_xml();
|
|
|
|
|
Response::builder()
|
|
|
|
|
.status(err.status)
|
|
|
|
|
.header("content-type", "application/xml")
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(full_body(xml))
|
|
|
|
|
.unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
fn json_response(status: StatusCode, value: serde_json::Value, request_id: &str) -> Response<BoxBody> {
|
|
|
|
|
Response::builder()
|
|
|
|
|
.status(status)
|
|
|
|
|
.header("content-type", "application/json")
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(full_body(value.to_string()))
|
|
|
|
|
.unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn text_response(status: StatusCode, content_type: &str, body: String, request_id: &str) -> Response<BoxBody> {
|
|
|
|
|
Response::builder()
|
|
|
|
|
.status(status)
|
|
|
|
|
.header("content-type", content_type)
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(full_body(body))
|
|
|
|
|
.unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-13 13:59:44 +00:00
|
|
|
async fn handle_request(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-03-14 15:20:30 +00:00
|
|
|
config: SmartStorageConfig,
|
2026-04-19 11:57:28 +00:00
|
|
|
auth_runtime: Arc<auth::RuntimeCredentialStore>,
|
2026-02-17 16:28:50 +00:00
|
|
|
policy_store: Arc<PolicyStore>,
|
2026-04-30 06:08:42 +00:00
|
|
|
metrics: Arc<ServerMetrics>,
|
2026-02-13 13:59:44 +00:00
|
|
|
) -> Result<Response<BoxBody>, std::convert::Infallible> {
|
|
|
|
|
let request_id = Uuid::new_v4().to_string();
|
|
|
|
|
let method = req.method().clone();
|
|
|
|
|
let uri = req.uri().clone();
|
|
|
|
|
let start = std::time::Instant::now();
|
|
|
|
|
|
|
|
|
|
// Handle CORS preflight
|
|
|
|
|
if config.cors.enabled && method == Method::OPTIONS {
|
|
|
|
|
let resp = build_cors_preflight(&config, &request_id);
|
2026-04-30 06:08:42 +00:00
|
|
|
metrics.record_response(resp.status());
|
|
|
|
|
return Ok(resp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if method == Method::GET && uri.path().starts_with("/-/") {
|
|
|
|
|
let resp = match handle_operational_request(uri.path(), store, &config, &metrics, &request_id).await {
|
|
|
|
|
Ok(resp) => resp,
|
|
|
|
|
Err(error) => {
|
|
|
|
|
tracing::error!(error = %error, "Operational endpoint failed");
|
|
|
|
|
json_response(
|
|
|
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
|
|
|
serde_json::json!({ "ok": false, "error": error.to_string() }),
|
|
|
|
|
&request_id,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
metrics.record_response(resp.status());
|
2026-02-13 13:59:44 +00:00
|
|
|
return Ok(resp);
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
// Step 1: Resolve storage action from request
|
2026-02-17 16:28:50 +00:00
|
|
|
let request_ctx = action::resolve_action(&req);
|
|
|
|
|
|
|
|
|
|
// Step 2: Auth + policy pipeline
|
2026-04-19 11:57:28 +00:00
|
|
|
if auth_runtime.enabled() {
|
2026-02-17 16:28:50 +00:00
|
|
|
// Attempt authentication
|
|
|
|
|
let identity = {
|
|
|
|
|
let has_auth_header = req
|
|
|
|
|
.headers()
|
|
|
|
|
.get("authorization")
|
|
|
|
|
.and_then(|v| v.to_str().ok())
|
|
|
|
|
.map(|s| !s.is_empty())
|
|
|
|
|
.unwrap_or(false);
|
|
|
|
|
|
|
|
|
|
if has_auth_header {
|
2026-04-19 11:57:28 +00:00
|
|
|
let credentials = auth_runtime.snapshot_credentials().await;
|
|
|
|
|
match auth::verify_request(&req, &credentials) {
|
2026-02-17 16:28:50 +00:00
|
|
|
Ok(id) => Some(id),
|
|
|
|
|
Err(e) => {
|
|
|
|
|
tracing::warn!("Auth failed: {}", e.message);
|
2026-04-30 06:08:42 +00:00
|
|
|
let resp = storage_error_response(&e, &request_id);
|
|
|
|
|
metrics.record_response(resp.status());
|
|
|
|
|
return Ok(resp);
|
2026-02-17 16:28:50 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
None // Anonymous request
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Step 3: Authorization (policy evaluation)
|
|
|
|
|
if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await {
|
2026-04-30 06:08:42 +00:00
|
|
|
let resp = storage_error_response(&e, &request_id);
|
|
|
|
|
metrics.record_response(resp.status());
|
|
|
|
|
return Ok(resp);
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Route and handle
|
2026-02-17 16:28:50 +00:00
|
|
|
let mut response = match route_request(req, store, &config, &request_id, &policy_store).await {
|
2026-02-13 13:59:44 +00:00
|
|
|
Ok(resp) => resp,
|
|
|
|
|
Err(err) => {
|
2026-03-14 15:20:30 +00:00
|
|
|
if let Some(s3err) = err.downcast_ref::<StorageError>() {
|
|
|
|
|
storage_error_response(s3err, &request_id)
|
2026-02-13 13:59:44 +00:00
|
|
|
} else {
|
|
|
|
|
tracing::error!("Internal error: {}", err);
|
2026-03-14 15:20:30 +00:00
|
|
|
let s3err = StorageError::internal_error(&err.to_string());
|
|
|
|
|
storage_error_response(&s3err, &request_id)
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Add CORS headers if enabled
|
|
|
|
|
if config.cors.enabled {
|
|
|
|
|
add_cors_headers(response.headers_mut(), &config);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let duration = start.elapsed();
|
|
|
|
|
tracing::info!(
|
|
|
|
|
method = %method,
|
|
|
|
|
path = %uri.path(),
|
|
|
|
|
status = %response.status().as_u16(),
|
|
|
|
|
duration_ms = %duration.as_millis(),
|
|
|
|
|
"request"
|
|
|
|
|
);
|
|
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
metrics.record_response(response.status());
|
|
|
|
|
|
2026-02-13 13:59:44 +00:00
|
|
|
Ok(response)
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-30 06:08:42 +00:00
|
|
|
async fn handle_operational_request(
|
|
|
|
|
path: &str,
|
|
|
|
|
store: Arc<StorageBackend>,
|
|
|
|
|
config: &SmartStorageConfig,
|
|
|
|
|
metrics: &ServerMetrics,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
match path {
|
|
|
|
|
"/-/live" | "/-/livez" => Ok(json_response(
|
|
|
|
|
StatusCode::OK,
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"ok": true,
|
|
|
|
|
"status": "alive",
|
|
|
|
|
"startedAt": metrics.started_at.timestamp_millis(),
|
|
|
|
|
}),
|
|
|
|
|
request_id,
|
|
|
|
|
)),
|
|
|
|
|
"/-/ready" | "/-/readyz" => {
|
|
|
|
|
let cluster_health = store.get_cluster_health().await?;
|
|
|
|
|
let cluster_ready = !cluster_health.enabled
|
|
|
|
|
|| (cluster_health.majority_healthy.unwrap_or(false)
|
|
|
|
|
&& cluster_health.quorum_healthy.unwrap_or(false));
|
|
|
|
|
let status = if cluster_ready {
|
|
|
|
|
StatusCode::OK
|
|
|
|
|
} else {
|
|
|
|
|
StatusCode::SERVICE_UNAVAILABLE
|
|
|
|
|
};
|
|
|
|
|
Ok(json_response(
|
|
|
|
|
status,
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"ok": cluster_ready,
|
|
|
|
|
"status": if cluster_ready { "ready" } else { "degraded" },
|
|
|
|
|
"cluster": cluster_health,
|
|
|
|
|
}),
|
|
|
|
|
request_id,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
"/-/health" | "/-/healthz" => {
|
|
|
|
|
let cluster_health = store.get_cluster_health().await?;
|
|
|
|
|
let stats = store.get_storage_stats().await?;
|
|
|
|
|
Ok(json_response(
|
|
|
|
|
StatusCode::OK,
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"ok": true,
|
|
|
|
|
"status": "healthy",
|
|
|
|
|
"version": env!("CARGO_PKG_VERSION"),
|
|
|
|
|
"server": {
|
|
|
|
|
"address": config.server.address,
|
|
|
|
|
"port": config.server.port,
|
|
|
|
|
"startedAt": metrics.started_at.timestamp_millis(),
|
|
|
|
|
},
|
|
|
|
|
"storage": stats,
|
|
|
|
|
"cluster": cluster_health,
|
|
|
|
|
"metrics": {
|
|
|
|
|
"totalRequests": metrics.total_requests.load(Ordering::Relaxed),
|
|
|
|
|
"errorResponses": metrics.error_responses.load(Ordering::Relaxed),
|
|
|
|
|
},
|
|
|
|
|
}),
|
|
|
|
|
request_id,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
"/-/metrics" => {
|
|
|
|
|
let cluster_health = store.get_cluster_health().await?;
|
|
|
|
|
let stats = store.get_storage_stats().await?;
|
|
|
|
|
let cluster_enabled = if cluster_health.enabled { 1 } else { 0 };
|
|
|
|
|
let quorum_healthy = if cluster_health.quorum_healthy.unwrap_or(true) { 1 } else { 0 };
|
|
|
|
|
let body = format!(
|
|
|
|
|
"# HELP smartstorage_requests_total Total HTTP requests observed by smartstorage.\n\
|
|
|
|
|
# TYPE smartstorage_requests_total counter\n\
|
|
|
|
|
smartstorage_requests_total {}\n\
|
|
|
|
|
# HELP smartstorage_error_responses_total HTTP responses with status >= 400.\n\
|
|
|
|
|
# TYPE smartstorage_error_responses_total counter\n\
|
|
|
|
|
smartstorage_error_responses_total {}\n\
|
|
|
|
|
# HELP smartstorage_buckets_total Runtime bucket count.\n\
|
|
|
|
|
# TYPE smartstorage_buckets_total gauge\n\
|
|
|
|
|
smartstorage_buckets_total {}\n\
|
|
|
|
|
# HELP smartstorage_objects_total Runtime object count.\n\
|
|
|
|
|
# TYPE smartstorage_objects_total gauge\n\
|
|
|
|
|
smartstorage_objects_total {}\n\
|
|
|
|
|
# HELP smartstorage_cluster_enabled Cluster mode enabled.\n\
|
|
|
|
|
# TYPE smartstorage_cluster_enabled gauge\n\
|
|
|
|
|
smartstorage_cluster_enabled {}\n\
|
|
|
|
|
# HELP smartstorage_cluster_quorum_healthy Cluster quorum health.\n\
|
|
|
|
|
# TYPE smartstorage_cluster_quorum_healthy gauge\n\
|
|
|
|
|
smartstorage_cluster_quorum_healthy {}\n",
|
|
|
|
|
metrics.total_requests.load(Ordering::Relaxed),
|
|
|
|
|
metrics.error_responses.load(Ordering::Relaxed),
|
|
|
|
|
stats.bucket_count,
|
|
|
|
|
stats.total_object_count,
|
|
|
|
|
cluster_enabled,
|
|
|
|
|
quorum_healthy,
|
|
|
|
|
);
|
|
|
|
|
Ok(text_response(
|
|
|
|
|
StatusCode::OK,
|
|
|
|
|
"text/plain; version=0.0.4",
|
|
|
|
|
body,
|
|
|
|
|
request_id,
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
_ => Ok(json_response(
|
|
|
|
|
StatusCode::NOT_FOUND,
|
|
|
|
|
serde_json::json!({ "ok": false, "error": "Unknown operational endpoint" }),
|
|
|
|
|
request_id,
|
|
|
|
|
)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-17 16:28:50 +00:00
|
|
|
/// Authorize a request based on bucket policies and authentication state.
|
|
|
|
|
async fn authorize_request(
|
|
|
|
|
ctx: &RequestContext,
|
|
|
|
|
identity: Option<&AuthenticatedIdentity>,
|
|
|
|
|
policy_store: &PolicyStore,
|
2026-03-14 15:20:30 +00:00
|
|
|
) -> Result<(), StorageError> {
|
2026-02-17 16:28:50 +00:00
|
|
|
// ListAllMyBuckets requires authentication (no bucket to apply policy to)
|
2026-03-14 15:20:30 +00:00
|
|
|
if ctx.action == StorageAction::ListAllMyBuckets {
|
2026-02-17 16:28:50 +00:00
|
|
|
if identity.is_none() {
|
2026-03-14 15:20:30 +00:00
|
|
|
return Err(StorageError::access_denied());
|
2026-02-17 16:28:50 +00:00
|
|
|
}
|
|
|
|
|
return Ok(());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// If there's a bucket, check its policy
|
|
|
|
|
if let Some(ref bucket) = ctx.bucket {
|
|
|
|
|
if let Some(bucket_policy) = policy_store.get_policy(bucket).await {
|
|
|
|
|
let decision = policy::evaluate_policy(&bucket_policy, ctx, identity);
|
|
|
|
|
match decision {
|
2026-03-14 15:20:30 +00:00
|
|
|
PolicyDecision::Deny => return Err(StorageError::access_denied()),
|
2026-02-17 16:28:50 +00:00
|
|
|
PolicyDecision::Allow => return Ok(()),
|
|
|
|
|
PolicyDecision::NoOpinion => {
|
|
|
|
|
// Fall through to default behavior
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Default: authenticated users get full access, anonymous denied
|
|
|
|
|
if identity.is_none() {
|
2026-03-14 15:20:30 +00:00
|
|
|
return Err(StorageError::access_denied());
|
2026-02-17 16:28:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-13 13:59:44 +00:00
|
|
|
// ============================
|
|
|
|
|
// Routing
|
|
|
|
|
// ============================
|
|
|
|
|
|
|
|
|
|
async fn route_request(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-03-14 15:20:30 +00:00
|
|
|
_config: &SmartStorageConfig,
|
2026-02-13 13:59:44 +00:00
|
|
|
request_id: &str,
|
2026-02-17 16:28:50 +00:00
|
|
|
policy_store: &Arc<PolicyStore>,
|
2026-02-13 13:59:44 +00:00
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let method = req.method().clone();
|
|
|
|
|
let path = req.uri().path().to_string();
|
|
|
|
|
let query_string = req.uri().query().unwrap_or("").to_string();
|
|
|
|
|
let query = parse_query(&query_string);
|
|
|
|
|
|
|
|
|
|
// Parse path: /, /{bucket}, /{bucket}/{key...}
|
|
|
|
|
let segments: Vec<&str> = path
|
|
|
|
|
.trim_start_matches('/')
|
|
|
|
|
.splitn(2, '/')
|
|
|
|
|
.filter(|s| !s.is_empty())
|
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
|
|
match segments.len() {
|
|
|
|
|
0 => {
|
|
|
|
|
// Root: GET / -> ListBuckets
|
|
|
|
|
match method {
|
|
|
|
|
Method::GET => handle_list_buckets(store, request_id).await,
|
|
|
|
|
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
1 => {
|
|
|
|
|
// Bucket level: /{bucket}
|
|
|
|
|
let bucket = percent_decode(segments[0]);
|
2026-02-17 16:28:50 +00:00
|
|
|
|
|
|
|
|
// Check for ?policy query parameter
|
|
|
|
|
if query.contains_key("policy") {
|
|
|
|
|
return match method {
|
|
|
|
|
Method::GET => handle_get_bucket_policy(policy_store, &bucket, request_id).await,
|
|
|
|
|
Method::PUT => handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id).await,
|
|
|
|
|
Method::DELETE => handle_delete_bucket_policy(policy_store, &bucket, request_id).await,
|
|
|
|
|
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-13 13:59:44 +00:00
|
|
|
match method {
|
|
|
|
|
Method::GET => {
|
|
|
|
|
if query.contains_key("uploads") {
|
|
|
|
|
handle_list_multipart_uploads(store, &bucket, request_id).await
|
|
|
|
|
} else {
|
|
|
|
|
handle_list_objects(store, &bucket, &query, request_id).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Method::PUT => handle_create_bucket(store, &bucket, request_id).await,
|
2026-02-17 16:28:50 +00:00
|
|
|
Method::DELETE => handle_delete_bucket(store, &bucket, request_id, policy_store).await,
|
2026-02-13 13:59:44 +00:00
|
|
|
Method::HEAD => handle_head_bucket(store, &bucket, request_id).await,
|
|
|
|
|
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
2 => {
|
|
|
|
|
// Object level: /{bucket}/{key...}
|
|
|
|
|
let bucket = percent_decode(segments[0]);
|
|
|
|
|
let key = percent_decode(segments[1]);
|
|
|
|
|
|
|
|
|
|
match method {
|
|
|
|
|
Method::PUT => {
|
|
|
|
|
if query.contains_key("partNumber") && query.contains_key("uploadId") {
|
|
|
|
|
handle_upload_part(req, store, &query, request_id).await
|
|
|
|
|
} else if req.headers().contains_key("x-amz-copy-source") {
|
|
|
|
|
handle_copy_object(req, store, &bucket, &key, request_id).await
|
|
|
|
|
} else {
|
|
|
|
|
handle_put_object(req, store, &bucket, &key, request_id).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Method::GET => {
|
|
|
|
|
handle_get_object(req, store, &bucket, &key, request_id).await
|
|
|
|
|
}
|
|
|
|
|
Method::HEAD => {
|
|
|
|
|
handle_head_object(store, &bucket, &key, request_id).await
|
|
|
|
|
}
|
|
|
|
|
Method::DELETE => {
|
|
|
|
|
if query.contains_key("uploadId") {
|
|
|
|
|
let upload_id = query.get("uploadId").unwrap();
|
|
|
|
|
handle_abort_multipart(store, upload_id, request_id).await
|
|
|
|
|
} else {
|
|
|
|
|
handle_delete_object(store, &bucket, &key, request_id).await
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Method::POST => {
|
|
|
|
|
if query.contains_key("uploads") {
|
|
|
|
|
handle_initiate_multipart(req, store, &bucket, &key, request_id).await
|
|
|
|
|
} else if query.contains_key("uploadId") {
|
|
|
|
|
let upload_id = query.get("uploadId").unwrap().clone();
|
|
|
|
|
handle_complete_multipart(req, store, &bucket, &key, &upload_id, request_id).await
|
|
|
|
|
} else {
|
2026-03-14 15:20:30 +00:00
|
|
|
let err = StorageError::invalid_request("Invalid POST request");
|
|
|
|
|
Ok(storage_error_response(&err, request_id))
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
_ => Ok(empty_response(StatusCode::BAD_REQUEST, request_id)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================
|
|
|
|
|
// Handlers
|
|
|
|
|
// ============================
|
|
|
|
|
|
|
|
|
|
async fn handle_list_buckets(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let buckets = store.list_buckets().await?;
|
|
|
|
|
let xml = xml_response::list_buckets_xml(&buckets);
|
|
|
|
|
Ok(xml_response(StatusCode::OK, xml, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_create_bucket(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
store.create_bucket(bucket).await?;
|
|
|
|
|
Ok(empty_response(StatusCode::OK, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_delete_bucket(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
request_id: &str,
|
2026-02-17 16:28:50 +00:00
|
|
|
policy_store: &Arc<PolicyStore>,
|
2026-02-13 13:59:44 +00:00
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
store.delete_bucket(bucket).await?;
|
2026-02-17 16:28:50 +00:00
|
|
|
// Clean up bucket policy on deletion
|
|
|
|
|
let _ = policy_store.delete_policy(bucket).await;
|
2026-02-13 13:59:44 +00:00
|
|
|
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_head_bucket(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
if store.bucket_exists(bucket).await {
|
|
|
|
|
Ok(empty_response(StatusCode::OK, request_id))
|
|
|
|
|
} else {
|
2026-03-14 15:20:30 +00:00
|
|
|
Err(StorageError::no_such_bucket().into())
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_list_objects(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
query: &HashMap<String, String>,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let prefix = query.get("prefix").map(|s| s.as_str()).unwrap_or("");
|
|
|
|
|
let delimiter = query.get("delimiter").map(|s| s.as_str()).unwrap_or("");
|
|
|
|
|
let max_keys = query
|
|
|
|
|
.get("max-keys")
|
|
|
|
|
.and_then(|s| s.parse().ok())
|
|
|
|
|
.unwrap_or(1000usize);
|
|
|
|
|
let continuation_token = query.get("continuation-token").map(|s| s.as_str());
|
|
|
|
|
let is_v2 = query.get("list-type").map(|s| s.as_str()) == Some("2");
|
|
|
|
|
|
|
|
|
|
let result = store
|
|
|
|
|
.list_objects(bucket, prefix, delimiter, max_keys, continuation_token)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
let xml = if is_v2 {
|
|
|
|
|
xml_response::list_objects_v2_xml(bucket, &result)
|
|
|
|
|
} else {
|
|
|
|
|
xml_response::list_objects_v1_xml(bucket, &result)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
Ok(xml_response(StatusCode::OK, xml, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_put_object(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
key: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let metadata = extract_metadata(req.headers());
|
|
|
|
|
let body = req.into_body();
|
|
|
|
|
|
|
|
|
|
let result = store.put_object(bucket, key, body, metadata).await?;
|
|
|
|
|
|
|
|
|
|
let resp = Response::builder()
|
|
|
|
|
.status(StatusCode::OK)
|
|
|
|
|
.header("ETag", format!("\"{}\"", result.md5))
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(empty_body())
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
Ok(resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_get_object(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
key: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
// Parse Range header
|
|
|
|
|
let range = parse_range_header(req.headers());
|
|
|
|
|
|
|
|
|
|
let result = store.get_object(bucket, key, range).await?;
|
|
|
|
|
|
|
|
|
|
let content_type = result
|
|
|
|
|
.metadata
|
|
|
|
|
.get("content-type")
|
|
|
|
|
.cloned()
|
|
|
|
|
.unwrap_or_else(|| "binary/octet-stream".to_string());
|
|
|
|
|
|
|
|
|
|
let mut builder = Response::builder()
|
|
|
|
|
.header("ETag", format!("\"{}\"", result.md5))
|
|
|
|
|
.header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string())
|
|
|
|
|
.header("Content-Type", &content_type)
|
|
|
|
|
.header("Accept-Ranges", "bytes")
|
|
|
|
|
.header("x-amz-request-id", request_id);
|
|
|
|
|
|
|
|
|
|
// Add custom metadata headers
|
|
|
|
|
for (k, v) in &result.metadata {
|
|
|
|
|
if k.starts_with("x-amz-meta-") {
|
|
|
|
|
builder = builder.header(k.as_str(), v.as_str());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if let Some((start, end)) = range {
|
|
|
|
|
let content_length = end - start + 1;
|
|
|
|
|
let resp = builder
|
|
|
|
|
.status(StatusCode::PARTIAL_CONTENT)
|
|
|
|
|
.header("Content-Length", content_length.to_string())
|
|
|
|
|
.header(
|
|
|
|
|
"Content-Range",
|
|
|
|
|
format!("bytes {}-{}/{}", start, end, result.size),
|
|
|
|
|
)
|
|
|
|
|
.body(stream_body(result.body, content_length))
|
|
|
|
|
.unwrap();
|
|
|
|
|
Ok(resp)
|
|
|
|
|
} else {
|
|
|
|
|
let resp = builder
|
|
|
|
|
.status(StatusCode::OK)
|
|
|
|
|
.header("Content-Length", result.size.to_string())
|
|
|
|
|
.body(stream_body(result.body, result.content_length))
|
|
|
|
|
.unwrap();
|
|
|
|
|
Ok(resp)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_head_object(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
key: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let result = store.head_object(bucket, key).await?;
|
|
|
|
|
|
|
|
|
|
let content_type = result
|
|
|
|
|
.metadata
|
|
|
|
|
.get("content-type")
|
|
|
|
|
.cloned()
|
|
|
|
|
.unwrap_or_else(|| "binary/octet-stream".to_string());
|
|
|
|
|
|
|
|
|
|
let mut builder = Response::builder()
|
|
|
|
|
.status(StatusCode::OK)
|
|
|
|
|
.header("ETag", format!("\"{}\"", result.md5))
|
|
|
|
|
.header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string())
|
|
|
|
|
.header("Content-Type", &content_type)
|
|
|
|
|
.header("Content-Length", result.size.to_string())
|
|
|
|
|
.header("Accept-Ranges", "bytes")
|
|
|
|
|
.header("x-amz-request-id", request_id);
|
|
|
|
|
|
|
|
|
|
for (k, v) in &result.metadata {
|
|
|
|
|
if k.starts_with("x-amz-meta-") {
|
|
|
|
|
builder = builder.header(k.as_str(), v.as_str());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(builder.body(empty_body()).unwrap())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_delete_object(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
key: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
store.delete_object(bucket, key).await?;
|
|
|
|
|
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_copy_object(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
dest_bucket: &str,
|
|
|
|
|
dest_key: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let copy_source = req
|
|
|
|
|
.headers()
|
|
|
|
|
.get("x-amz-copy-source")
|
|
|
|
|
.and_then(|v| v.to_str().ok())
|
|
|
|
|
.unwrap_or("")
|
|
|
|
|
.to_string();
|
|
|
|
|
|
|
|
|
|
let metadata_directive = req
|
|
|
|
|
.headers()
|
|
|
|
|
.get("x-amz-metadata-directive")
|
|
|
|
|
.and_then(|v| v.to_str().ok())
|
|
|
|
|
.unwrap_or("COPY")
|
|
|
|
|
.to_uppercase();
|
|
|
|
|
|
|
|
|
|
// Parse source: /bucket/key or bucket/key
|
|
|
|
|
let source = copy_source.trim_start_matches('/');
|
|
|
|
|
let first_slash = source.find('/').unwrap_or(source.len());
|
|
|
|
|
let src_bucket = percent_decode(&source[..first_slash]);
|
|
|
|
|
let src_key = if first_slash < source.len() {
|
|
|
|
|
percent_decode(&source[first_slash + 1..])
|
|
|
|
|
} else {
|
|
|
|
|
String::new()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let new_metadata = if metadata_directive == "REPLACE" {
|
|
|
|
|
Some(extract_metadata(req.headers()))
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let result = store
|
|
|
|
|
.copy_object(&src_bucket, &src_key, dest_bucket, dest_key, &metadata_directive, new_metadata)
|
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
|
|
let xml = xml_response::copy_object_result_xml(&result.md5, &result.last_modified.to_rfc3339());
|
|
|
|
|
Ok(xml_response(StatusCode::OK, xml, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-17 16:28:50 +00:00
|
|
|
// ============================
|
|
|
|
|
// Policy handlers
|
|
|
|
|
// ============================
|
|
|
|
|
|
|
|
|
|
async fn handle_get_bucket_policy(
|
|
|
|
|
policy_store: &Arc<PolicyStore>,
|
|
|
|
|
bucket: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
match policy_store.get_policy(bucket).await {
|
|
|
|
|
Some(p) => {
|
|
|
|
|
let json = serde_json::to_string_pretty(&p)?;
|
|
|
|
|
let resp = Response::builder()
|
|
|
|
|
.status(StatusCode::OK)
|
|
|
|
|
.header("content-type", "application/json")
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(full_body(json))
|
|
|
|
|
.unwrap();
|
|
|
|
|
Ok(resp)
|
|
|
|
|
}
|
2026-03-14 15:20:30 +00:00
|
|
|
None => Err(StorageError::no_such_bucket_policy().into()),
|
2026-02-17 16:28:50 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_put_bucket_policy(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: &Arc<StorageBackend>,
|
2026-02-17 16:28:50 +00:00
|
|
|
policy_store: &Arc<PolicyStore>,
|
|
|
|
|
bucket: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
// Verify bucket exists
|
|
|
|
|
if !store.bucket_exists(bucket).await {
|
2026-03-14 15:20:30 +00:00
|
|
|
return Err(StorageError::no_such_bucket().into());
|
2026-02-17 16:28:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read body
|
|
|
|
|
let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes();
|
|
|
|
|
let body_str = String::from_utf8_lossy(&body_bytes);
|
|
|
|
|
|
|
|
|
|
// Validate and parse
|
|
|
|
|
let validated_policy = policy::validate_policy(&body_str)?;
|
|
|
|
|
|
|
|
|
|
// Store
|
|
|
|
|
policy_store
|
|
|
|
|
.put_policy(bucket, validated_policy)
|
|
|
|
|
.await
|
2026-03-14 15:20:30 +00:00
|
|
|
.map_err(|e| StorageError::internal_error(&e.to_string()))?;
|
2026-02-17 16:28:50 +00:00
|
|
|
|
|
|
|
|
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_delete_bucket_policy(
|
|
|
|
|
policy_store: &Arc<PolicyStore>,
|
|
|
|
|
bucket: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
policy_store
|
|
|
|
|
.delete_policy(bucket)
|
|
|
|
|
.await
|
2026-03-14 15:20:30 +00:00
|
|
|
.map_err(|e| StorageError::internal_error(&e.to_string()))?;
|
2026-02-17 16:28:50 +00:00
|
|
|
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
2026-02-13 13:59:44 +00:00
|
|
|
// ============================
|
|
|
|
|
// Multipart handlers
|
|
|
|
|
// ============================
|
|
|
|
|
|
|
|
|
|
async fn handle_initiate_multipart(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
key: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let metadata = extract_metadata(req.headers());
|
|
|
|
|
let upload_id = store.initiate_multipart(bucket, key, metadata).await?;
|
|
|
|
|
let xml = xml_response::initiate_multipart_xml(bucket, key, &upload_id);
|
|
|
|
|
Ok(xml_response(StatusCode::OK, xml, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_upload_part(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
query: &HashMap<String, String>,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let upload_id = query.get("uploadId").unwrap();
|
|
|
|
|
let part_number: u32 = query
|
|
|
|
|
.get("partNumber")
|
|
|
|
|
.and_then(|s| s.parse().ok())
|
|
|
|
|
.unwrap_or(0);
|
|
|
|
|
|
|
|
|
|
if part_number < 1 || part_number > 10000 {
|
2026-03-14 15:20:30 +00:00
|
|
|
return Err(StorageError::invalid_part_number().into());
|
2026-02-13 13:59:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let body = req.into_body();
|
|
|
|
|
let (etag, _size) = store.upload_part(upload_id, part_number, body).await?;
|
|
|
|
|
|
|
|
|
|
let resp = Response::builder()
|
|
|
|
|
.status(StatusCode::OK)
|
|
|
|
|
.header("ETag", format!("\"{}\"", etag))
|
|
|
|
|
.header("x-amz-request-id", request_id)
|
|
|
|
|
.body(empty_body())
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
|
Ok(resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_complete_multipart(
|
|
|
|
|
req: Request<Incoming>,
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
key: &str,
|
|
|
|
|
upload_id: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
// Read request body (XML)
|
|
|
|
|
let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes();
|
|
|
|
|
let body_str = String::from_utf8_lossy(&body_bytes);
|
|
|
|
|
|
|
|
|
|
// Parse parts from XML using regex-like approach
|
|
|
|
|
let parts = parse_complete_multipart_xml(&body_str);
|
|
|
|
|
|
|
|
|
|
let result = store.complete_multipart(upload_id, &parts).await?;
|
|
|
|
|
|
|
|
|
|
let xml = xml_response::complete_multipart_xml(bucket, key, &result.etag);
|
|
|
|
|
Ok(xml_response(StatusCode::OK, xml, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_abort_multipart(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
upload_id: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
store.abort_multipart(upload_id).await?;
|
|
|
|
|
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn handle_list_multipart_uploads(
|
2026-03-21 21:50:42 +00:00
|
|
|
store: Arc<StorageBackend>,
|
2026-02-13 13:59:44 +00:00
|
|
|
bucket: &str,
|
|
|
|
|
request_id: &str,
|
|
|
|
|
) -> Result<Response<BoxBody>> {
|
|
|
|
|
let uploads = store.list_multipart_uploads(bucket).await?;
|
|
|
|
|
let xml = xml_response::list_multipart_uploads_xml(bucket, &uploads);
|
|
|
|
|
Ok(xml_response(StatusCode::OK, xml, request_id))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================
|
|
|
|
|
// Helpers
|
|
|
|
|
// ============================
|
|
|
|
|
|
|
|
|
|
fn parse_query(query_string: &str) -> HashMap<String, String> {
|
|
|
|
|
let mut map = HashMap::new();
|
|
|
|
|
if query_string.is_empty() {
|
|
|
|
|
return map;
|
|
|
|
|
}
|
|
|
|
|
for pair in query_string.split('&') {
|
|
|
|
|
let mut parts = pair.splitn(2, '=');
|
|
|
|
|
let key = parts.next().unwrap_or("");
|
|
|
|
|
let value = parts.next().unwrap_or("");
|
|
|
|
|
let key = percent_decode(key);
|
|
|
|
|
let value = percent_decode(value);
|
|
|
|
|
map.insert(key, value);
|
|
|
|
|
}
|
|
|
|
|
map
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn percent_decode(s: &str) -> String {
|
|
|
|
|
percent_encoding::percent_decode_str(s)
|
|
|
|
|
.decode_utf8_lossy()
|
|
|
|
|
.to_string()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn extract_metadata(headers: &hyper::HeaderMap) -> HashMap<String, String> {
|
|
|
|
|
let mut metadata = HashMap::new();
|
|
|
|
|
|
|
|
|
|
for (name, value) in headers {
|
|
|
|
|
let name_str = name.as_str().to_lowercase();
|
|
|
|
|
if let Ok(val) = value.to_str() {
|
|
|
|
|
match name_str.as_str() {
|
|
|
|
|
"content-type" | "cache-control" | "content-disposition"
|
|
|
|
|
| "content-encoding" | "content-language" | "expires" => {
|
|
|
|
|
metadata.insert(name_str, val.to_string());
|
|
|
|
|
}
|
|
|
|
|
_ if name_str.starts_with("x-amz-meta-") => {
|
|
|
|
|
metadata.insert(name_str, val.to_string());
|
|
|
|
|
}
|
|
|
|
|
_ => {}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Default content-type
|
|
|
|
|
if !metadata.contains_key("content-type") {
|
|
|
|
|
metadata.insert("content-type".to_string(), "binary/octet-stream".to_string());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
metadata
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn parse_range_header(headers: &hyper::HeaderMap) -> Option<(u64, u64)> {
|
|
|
|
|
let range_val = headers.get("range")?.to_str().ok()?;
|
|
|
|
|
let bytes_prefix = "bytes=";
|
|
|
|
|
if !range_val.starts_with(bytes_prefix) {
|
|
|
|
|
return None;
|
|
|
|
|
}
|
|
|
|
|
let range_spec = &range_val[bytes_prefix.len()..];
|
|
|
|
|
let mut parts = range_spec.splitn(2, '-');
|
|
|
|
|
let start: u64 = parts.next()?.parse().ok()?;
|
|
|
|
|
let end_str = parts.next()?;
|
|
|
|
|
let end: u64 = if end_str.is_empty() {
|
|
|
|
|
// If no end specified, we'll handle this later based on file size
|
|
|
|
|
u64::MAX
|
|
|
|
|
} else {
|
|
|
|
|
end_str.parse().ok()?
|
|
|
|
|
};
|
|
|
|
|
Some((start, end))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> {
|
|
|
|
|
let mut parts = Vec::new();
|
|
|
|
|
|
|
|
|
|
// Simple XML parsing for <Part><PartNumber>N</PartNumber><ETag>...</ETag></Part>
|
|
|
|
|
let mut remaining = xml;
|
|
|
|
|
while let Some(part_start) = remaining.find("<Part>") {
|
|
|
|
|
let after_part = &remaining[part_start + 6..];
|
|
|
|
|
if let Some(part_end) = after_part.find("</Part>") {
|
|
|
|
|
let part_content = &after_part[..part_end];
|
|
|
|
|
|
|
|
|
|
let part_number = extract_xml_value(part_content, "PartNumber")
|
|
|
|
|
.and_then(|s| s.parse::<u32>().ok());
|
|
|
|
|
let etag = extract_xml_value(part_content, "ETag")
|
|
|
|
|
.map(|s| s.replace('"', ""));
|
|
|
|
|
|
|
|
|
|
if let (Some(pn), Some(et)) = (part_number, etag) {
|
|
|
|
|
parts.push((pn, et));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
remaining = &after_part[part_end + 7..];
|
|
|
|
|
} else {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
parts.sort_by_key(|(pn, _)| *pn);
|
|
|
|
|
parts
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn extract_xml_value<'a>(xml: &'a str, tag: &str) -> Option<String> {
|
|
|
|
|
let open = format!("<{}>", tag);
|
|
|
|
|
let close = format!("</{}>", tag);
|
|
|
|
|
let start = xml.find(&open)? + open.len();
|
|
|
|
|
let end = xml.find(&close)?;
|
|
|
|
|
Some(xml[start..end].to_string())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================
|
|
|
|
|
// CORS
|
|
|
|
|
// ============================
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
fn build_cors_preflight(config: &SmartStorageConfig, request_id: &str) -> Response<BoxBody> {
|
2026-02-13 13:59:44 +00:00
|
|
|
let mut builder = Response::builder()
|
|
|
|
|
.status(StatusCode::NO_CONTENT)
|
|
|
|
|
.header("x-amz-request-id", request_id);
|
|
|
|
|
|
|
|
|
|
if let Some(ref origins) = config.cors.allowed_origins {
|
|
|
|
|
builder = builder.header("Access-Control-Allow-Origin", origins.join(", "));
|
|
|
|
|
}
|
|
|
|
|
if let Some(ref methods) = config.cors.allowed_methods {
|
|
|
|
|
builder = builder.header("Access-Control-Allow-Methods", methods.join(", "));
|
|
|
|
|
}
|
|
|
|
|
if let Some(ref headers) = config.cors.allowed_headers {
|
|
|
|
|
builder = builder.header("Access-Control-Allow-Headers", headers.join(", "));
|
|
|
|
|
}
|
|
|
|
|
if let Some(max_age) = config.cors.max_age {
|
|
|
|
|
builder = builder.header("Access-Control-Max-Age", max_age.to_string());
|
|
|
|
|
}
|
|
|
|
|
if config.cors.allow_credentials == Some(true) {
|
|
|
|
|
builder = builder.header("Access-Control-Allow-Credentials", "true");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
builder.body(empty_body()).unwrap()
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-14 15:20:30 +00:00
|
|
|
fn add_cors_headers(headers: &mut hyper::HeaderMap, config: &SmartStorageConfig) {
|
2026-02-13 13:59:44 +00:00
|
|
|
if let Some(ref origins) = config.cors.allowed_origins {
|
|
|
|
|
headers.insert(
|
|
|
|
|
"access-control-allow-origin",
|
|
|
|
|
origins.join(", ").parse().unwrap(),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
if let Some(ref exposed) = config.cors.exposed_headers {
|
|
|
|
|
headers.insert(
|
|
|
|
|
"access-control-expose-headers",
|
|
|
|
|
exposed.join(", ").parse().unwrap(),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
if config.cors.allow_credentials == Some(true) {
|
|
|
|
|
headers.insert(
|
|
|
|
|
"access-control-allow-credentials",
|
|
|
|
|
"true".parse().unwrap(),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|