use anyhow::Result; use bytes::Bytes; use futures_core::Stream; use http_body_util::BodyExt; use hyper::body::Incoming; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Method, Request, Response, StatusCode}; use hyper_util::rt::TokioIo; use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use tokio::io::AsyncReadExt; use tokio::net::TcpListener; use tokio::sync::watch; use tokio_util::io::ReaderStream; use uuid::Uuid; use crate::action::{self, RequestContext, S3Action}; use crate::auth::{self, AuthenticatedIdentity}; use crate::config::S3Config; use crate::policy::{self, PolicyDecision, PolicyStore}; use crate::s3_error::S3Error; use crate::storage::FileStore; use crate::xml_response; pub struct S3Server { store: Arc, config: S3Config, shutdown_tx: watch::Sender, server_handle: tokio::task::JoinHandle<()>, } impl S3Server { pub async fn start(config: S3Config) -> Result { let store = Arc::new(FileStore::new(config.storage.directory.clone().into())); // Initialize or reset storage if config.storage.clean_slate { store.reset().await?; } else { store.initialize().await?; } // Initialize policy store let policy_store = Arc::new(PolicyStore::new(store.policies_dir())); policy_store.load_from_disk().await?; let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port) .parse()?; let listener = TcpListener::bind(addr).await?; let (shutdown_tx, shutdown_rx) = watch::channel(false); let server_store = store.clone(); let server_config = config.clone(); let server_policy_store = policy_store.clone(); let server_handle = tokio::spawn(async move { loop { let mut rx = shutdown_rx.clone(); tokio::select! { result = listener.accept() => { match result { Ok((stream, _remote_addr)) => { let io = TokioIo::new(stream); let store = server_store.clone(); let cfg = server_config.clone(); let ps = server_policy_store.clone(); tokio::spawn(async move { let svc = service_fn(move |req: Request| { let store = store.clone(); let cfg = cfg.clone(); let ps = ps.clone(); async move { handle_request(req, store, cfg, ps).await } }); if let Err(e) = http1::Builder::new() .keep_alive(true) .serve_connection(io, svc) .await { if !e.is_incomplete_message() { tracing::error!("Connection error: {}", e); } } }); } Err(e) => { tracing::error!("Accept error: {}", e); } } } _ = rx.changed() => { break; } } } }); if !config.server.silent { tracing::info!("S3 server listening on {}", addr); } Ok(Self { store, config, shutdown_tx, server_handle, }) } pub async fn stop(self) { let _ = self.shutdown_tx.send(true); let _ = self.server_handle.await; } pub fn store(&self) -> &FileStore { &self.store } } impl S3Config { fn address(&self) -> &str { &self.server.address } } // ============================ // Request handling // ============================ type BoxBody = http_body_util::combinators::BoxBody>; fn full_body(data: impl Into) -> BoxBody { http_body_util::Full::new(data.into()) .map_err(|never: std::convert::Infallible| -> Box { match never {} }) .boxed() } fn empty_body() -> BoxBody { http_body_util::Empty::new() .map_err(|never: std::convert::Infallible| -> Box { match never {} }) .boxed() } fn stream_body(reader: tokio::fs::File, content_length: u64) -> BoxBody { let stream = ReaderStream::with_capacity(reader.take(content_length), 64 * 1024); let mapped = FrameStream { inner: stream }; http_body_util::StreamBody::new(mapped).boxed() } /// Adapter that converts ReaderStream into a Stream of Frame struct FrameStream { inner: ReaderStream>, } impl Stream for FrameStream { type Item = Result, Box>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let inner = unsafe { self.map_unchecked_mut(|s| &mut s.inner) }; match inner.poll_next(cx) { Poll::Ready(Some(Ok(bytes))) => { Poll::Ready(Some(Ok(hyper::body::Frame::data(bytes)))) } Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e) as Box))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } } } fn xml_response(status: StatusCode, xml: String, request_id: &str) -> Response { Response::builder() .status(status) .header("content-type", "application/xml") .header("x-amz-request-id", request_id) .body(full_body(xml)) .unwrap() } fn empty_response(status: StatusCode, request_id: &str) -> Response { Response::builder() .status(status) .header("x-amz-request-id", request_id) .body(empty_body()) .unwrap() } fn s3_error_response(err: &S3Error, request_id: &str) -> Response { let xml = err.to_xml(); Response::builder() .status(err.status) .header("content-type", "application/xml") .header("x-amz-request-id", request_id) .body(full_body(xml)) .unwrap() } async fn handle_request( req: Request, store: Arc, config: S3Config, policy_store: Arc, ) -> Result, std::convert::Infallible> { let request_id = Uuid::new_v4().to_string(); let method = req.method().clone(); let uri = req.uri().clone(); let start = std::time::Instant::now(); // Handle CORS preflight if config.cors.enabled && method == Method::OPTIONS { let resp = build_cors_preflight(&config, &request_id); return Ok(resp); } // Step 1: Resolve S3 action from request let request_ctx = action::resolve_action(&req); // Step 2: Auth + policy pipeline if config.auth.enabled { // Attempt authentication let identity = { let has_auth_header = req .headers() .get("authorization") .and_then(|v| v.to_str().ok()) .map(|s| !s.is_empty()) .unwrap_or(false); if has_auth_header { match auth::verify_request(&req, &config) { Ok(id) => Some(id), Err(e) => { tracing::warn!("Auth failed: {}", e.message); return Ok(s3_error_response(&e, &request_id)); } } } else { None // Anonymous request } }; // Step 3: Authorization (policy evaluation) if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await { return Ok(s3_error_response(&e, &request_id)); } } // Route and handle let mut response = match route_request(req, store, &config, &request_id, &policy_store).await { Ok(resp) => resp, Err(err) => { if let Some(s3err) = err.downcast_ref::() { s3_error_response(s3err, &request_id) } else { tracing::error!("Internal error: {}", err); let s3err = S3Error::internal_error(&err.to_string()); s3_error_response(&s3err, &request_id) } } }; // Add CORS headers if enabled if config.cors.enabled { add_cors_headers(response.headers_mut(), &config); } let duration = start.elapsed(); tracing::info!( method = %method, path = %uri.path(), status = %response.status().as_u16(), duration_ms = %duration.as_millis(), "request" ); Ok(response) } /// Authorize a request based on bucket policies and authentication state. async fn authorize_request( ctx: &RequestContext, identity: Option<&AuthenticatedIdentity>, policy_store: &PolicyStore, ) -> Result<(), S3Error> { // ListAllMyBuckets requires authentication (no bucket to apply policy to) if ctx.action == S3Action::ListAllMyBuckets { if identity.is_none() { return Err(S3Error::access_denied()); } return Ok(()); } // If there's a bucket, check its policy if let Some(ref bucket) = ctx.bucket { if let Some(bucket_policy) = policy_store.get_policy(bucket).await { let decision = policy::evaluate_policy(&bucket_policy, ctx, identity); match decision { PolicyDecision::Deny => return Err(S3Error::access_denied()), PolicyDecision::Allow => return Ok(()), PolicyDecision::NoOpinion => { // Fall through to default behavior } } } } // Default: authenticated users get full access, anonymous denied if identity.is_none() { return Err(S3Error::access_denied()); } Ok(()) } // ============================ // Routing // ============================ async fn route_request( req: Request, store: Arc, _config: &S3Config, request_id: &str, policy_store: &Arc, ) -> Result> { let method = req.method().clone(); let path = req.uri().path().to_string(); let query_string = req.uri().query().unwrap_or("").to_string(); let query = parse_query(&query_string); // Parse path: /, /{bucket}, /{bucket}/{key...} let segments: Vec<&str> = path .trim_start_matches('/') .splitn(2, '/') .filter(|s| !s.is_empty()) .collect(); match segments.len() { 0 => { // Root: GET / -> ListBuckets match method { Method::GET => handle_list_buckets(store, request_id).await, _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), } } 1 => { // Bucket level: /{bucket} let bucket = percent_decode(segments[0]); // Check for ?policy query parameter if query.contains_key("policy") { return match method { Method::GET => handle_get_bucket_policy(policy_store, &bucket, request_id).await, Method::PUT => handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id).await, Method::DELETE => handle_delete_bucket_policy(policy_store, &bucket, request_id).await, _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), }; } match method { Method::GET => { if query.contains_key("uploads") { handle_list_multipart_uploads(store, &bucket, request_id).await } else { handle_list_objects(store, &bucket, &query, request_id).await } } Method::PUT => handle_create_bucket(store, &bucket, request_id).await, Method::DELETE => handle_delete_bucket(store, &bucket, request_id, policy_store).await, Method::HEAD => handle_head_bucket(store, &bucket, request_id).await, _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), } } 2 => { // Object level: /{bucket}/{key...} let bucket = percent_decode(segments[0]); let key = percent_decode(segments[1]); match method { Method::PUT => { if query.contains_key("partNumber") && query.contains_key("uploadId") { handle_upload_part(req, store, &query, request_id).await } else if req.headers().contains_key("x-amz-copy-source") { handle_copy_object(req, store, &bucket, &key, request_id).await } else { handle_put_object(req, store, &bucket, &key, request_id).await } } Method::GET => { handle_get_object(req, store, &bucket, &key, request_id).await } Method::HEAD => { handle_head_object(store, &bucket, &key, request_id).await } Method::DELETE => { if query.contains_key("uploadId") { let upload_id = query.get("uploadId").unwrap(); handle_abort_multipart(store, upload_id, request_id).await } else { handle_delete_object(store, &bucket, &key, request_id).await } } Method::POST => { if query.contains_key("uploads") { handle_initiate_multipart(req, store, &bucket, &key, request_id).await } else if query.contains_key("uploadId") { let upload_id = query.get("uploadId").unwrap().clone(); handle_complete_multipart(req, store, &bucket, &key, &upload_id, request_id).await } else { let err = S3Error::invalid_request("Invalid POST request"); Ok(s3_error_response(&err, request_id)) } } _ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)), } } _ => Ok(empty_response(StatusCode::BAD_REQUEST, request_id)), } } // ============================ // Handlers // ============================ async fn handle_list_buckets( store: Arc, request_id: &str, ) -> Result> { let buckets = store.list_buckets().await?; let xml = xml_response::list_buckets_xml(&buckets); Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_create_bucket( store: Arc, bucket: &str, request_id: &str, ) -> Result> { store.create_bucket(bucket).await?; Ok(empty_response(StatusCode::OK, request_id)) } async fn handle_delete_bucket( store: Arc, bucket: &str, request_id: &str, policy_store: &Arc, ) -> Result> { store.delete_bucket(bucket).await?; // Clean up bucket policy on deletion let _ = policy_store.delete_policy(bucket).await; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_head_bucket( store: Arc, bucket: &str, request_id: &str, ) -> Result> { if store.bucket_exists(bucket).await { Ok(empty_response(StatusCode::OK, request_id)) } else { Err(S3Error::no_such_bucket().into()) } } async fn handle_list_objects( store: Arc, bucket: &str, query: &HashMap, request_id: &str, ) -> Result> { let prefix = query.get("prefix").map(|s| s.as_str()).unwrap_or(""); let delimiter = query.get("delimiter").map(|s| s.as_str()).unwrap_or(""); let max_keys = query .get("max-keys") .and_then(|s| s.parse().ok()) .unwrap_or(1000usize); let continuation_token = query.get("continuation-token").map(|s| s.as_str()); let is_v2 = query.get("list-type").map(|s| s.as_str()) == Some("2"); let result = store .list_objects(bucket, prefix, delimiter, max_keys, continuation_token) .await?; let xml = if is_v2 { xml_response::list_objects_v2_xml(bucket, &result) } else { xml_response::list_objects_v1_xml(bucket, &result) }; Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_put_object( req: Request, store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { let metadata = extract_metadata(req.headers()); let body = req.into_body(); let result = store.put_object(bucket, key, body, metadata).await?; let resp = Response::builder() .status(StatusCode::OK) .header("ETag", format!("\"{}\"", result.md5)) .header("x-amz-request-id", request_id) .body(empty_body()) .unwrap(); Ok(resp) } async fn handle_get_object( req: Request, store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { // Parse Range header let range = parse_range_header(req.headers()); let result = store.get_object(bucket, key, range).await?; let content_type = result .metadata .get("content-type") .cloned() .unwrap_or_else(|| "binary/octet-stream".to_string()); let mut builder = Response::builder() .header("ETag", format!("\"{}\"", result.md5)) .header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string()) .header("Content-Type", &content_type) .header("Accept-Ranges", "bytes") .header("x-amz-request-id", request_id); // Add custom metadata headers for (k, v) in &result.metadata { if k.starts_with("x-amz-meta-") { builder = builder.header(k.as_str(), v.as_str()); } } if let Some((start, end)) = range { let content_length = end - start + 1; let resp = builder .status(StatusCode::PARTIAL_CONTENT) .header("Content-Length", content_length.to_string()) .header( "Content-Range", format!("bytes {}-{}/{}", start, end, result.size), ) .body(stream_body(result.body, content_length)) .unwrap(); Ok(resp) } else { let resp = builder .status(StatusCode::OK) .header("Content-Length", result.size.to_string()) .body(stream_body(result.body, result.content_length)) .unwrap(); Ok(resp) } } async fn handle_head_object( store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { let result = store.head_object(bucket, key).await?; let content_type = result .metadata .get("content-type") .cloned() .unwrap_or_else(|| "binary/octet-stream".to_string()); let mut builder = Response::builder() .status(StatusCode::OK) .header("ETag", format!("\"{}\"", result.md5)) .header("Last-Modified", result.last_modified.format("%a, %d %b %Y %H:%M:%S GMT").to_string()) .header("Content-Type", &content_type) .header("Content-Length", result.size.to_string()) .header("Accept-Ranges", "bytes") .header("x-amz-request-id", request_id); for (k, v) in &result.metadata { if k.starts_with("x-amz-meta-") { builder = builder.header(k.as_str(), v.as_str()); } } Ok(builder.body(empty_body()).unwrap()) } async fn handle_delete_object( store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { store.delete_object(bucket, key).await?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_copy_object( req: Request, store: Arc, dest_bucket: &str, dest_key: &str, request_id: &str, ) -> Result> { let copy_source = req .headers() .get("x-amz-copy-source") .and_then(|v| v.to_str().ok()) .unwrap_or("") .to_string(); let metadata_directive = req .headers() .get("x-amz-metadata-directive") .and_then(|v| v.to_str().ok()) .unwrap_or("COPY") .to_uppercase(); // Parse source: /bucket/key or bucket/key let source = copy_source.trim_start_matches('/'); let first_slash = source.find('/').unwrap_or(source.len()); let src_bucket = percent_decode(&source[..first_slash]); let src_key = if first_slash < source.len() { percent_decode(&source[first_slash + 1..]) } else { String::new() }; let new_metadata = if metadata_directive == "REPLACE" { Some(extract_metadata(req.headers())) } else { None }; let result = store .copy_object(&src_bucket, &src_key, dest_bucket, dest_key, &metadata_directive, new_metadata) .await?; let xml = xml_response::copy_object_result_xml(&result.md5, &result.last_modified.to_rfc3339()); Ok(xml_response(StatusCode::OK, xml, request_id)) } // ============================ // Policy handlers // ============================ async fn handle_get_bucket_policy( policy_store: &Arc, bucket: &str, request_id: &str, ) -> Result> { match policy_store.get_policy(bucket).await { Some(p) => { let json = serde_json::to_string_pretty(&p)?; let resp = Response::builder() .status(StatusCode::OK) .header("content-type", "application/json") .header("x-amz-request-id", request_id) .body(full_body(json)) .unwrap(); Ok(resp) } None => Err(S3Error::no_such_bucket_policy().into()), } } async fn handle_put_bucket_policy( req: Request, store: &Arc, policy_store: &Arc, bucket: &str, request_id: &str, ) -> Result> { // Verify bucket exists if !store.bucket_exists(bucket).await { return Err(S3Error::no_such_bucket().into()); } // Read body let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes(); let body_str = String::from_utf8_lossy(&body_bytes); // Validate and parse let validated_policy = policy::validate_policy(&body_str)?; // Store policy_store .put_policy(bucket, validated_policy) .await .map_err(|e| S3Error::internal_error(&e.to_string()))?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_delete_bucket_policy( policy_store: &Arc, bucket: &str, request_id: &str, ) -> Result> { policy_store .delete_policy(bucket) .await .map_err(|e| S3Error::internal_error(&e.to_string()))?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } // ============================ // Multipart handlers // ============================ async fn handle_initiate_multipart( req: Request, store: Arc, bucket: &str, key: &str, request_id: &str, ) -> Result> { let metadata = extract_metadata(req.headers()); let upload_id = store.initiate_multipart(bucket, key, metadata).await?; let xml = xml_response::initiate_multipart_xml(bucket, key, &upload_id); Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_upload_part( req: Request, store: Arc, query: &HashMap, request_id: &str, ) -> Result> { let upload_id = query.get("uploadId").unwrap(); let part_number: u32 = query .get("partNumber") .and_then(|s| s.parse().ok()) .unwrap_or(0); if part_number < 1 || part_number > 10000 { return Err(S3Error::invalid_part_number().into()); } let body = req.into_body(); let (etag, _size) = store.upload_part(upload_id, part_number, body).await?; let resp = Response::builder() .status(StatusCode::OK) .header("ETag", format!("\"{}\"", etag)) .header("x-amz-request-id", request_id) .body(empty_body()) .unwrap(); Ok(resp) } async fn handle_complete_multipart( req: Request, store: Arc, bucket: &str, key: &str, upload_id: &str, request_id: &str, ) -> Result> { // Read request body (XML) let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes(); let body_str = String::from_utf8_lossy(&body_bytes); // Parse parts from XML using regex-like approach let parts = parse_complete_multipart_xml(&body_str); let result = store.complete_multipart(upload_id, &parts).await?; let xml = xml_response::complete_multipart_xml(bucket, key, &result.etag); Ok(xml_response(StatusCode::OK, xml, request_id)) } async fn handle_abort_multipart( store: Arc, upload_id: &str, request_id: &str, ) -> Result> { store.abort_multipart(upload_id).await?; Ok(empty_response(StatusCode::NO_CONTENT, request_id)) } async fn handle_list_multipart_uploads( store: Arc, bucket: &str, request_id: &str, ) -> Result> { let uploads = store.list_multipart_uploads(bucket).await?; let xml = xml_response::list_multipart_uploads_xml(bucket, &uploads); Ok(xml_response(StatusCode::OK, xml, request_id)) } // ============================ // Helpers // ============================ fn parse_query(query_string: &str) -> HashMap { let mut map = HashMap::new(); if query_string.is_empty() { return map; } for pair in query_string.split('&') { let mut parts = pair.splitn(2, '='); let key = parts.next().unwrap_or(""); let value = parts.next().unwrap_or(""); let key = percent_decode(key); let value = percent_decode(value); map.insert(key, value); } map } fn percent_decode(s: &str) -> String { percent_encoding::percent_decode_str(s) .decode_utf8_lossy() .to_string() } fn extract_metadata(headers: &hyper::HeaderMap) -> HashMap { let mut metadata = HashMap::new(); for (name, value) in headers { let name_str = name.as_str().to_lowercase(); if let Ok(val) = value.to_str() { match name_str.as_str() { "content-type" | "cache-control" | "content-disposition" | "content-encoding" | "content-language" | "expires" => { metadata.insert(name_str, val.to_string()); } _ if name_str.starts_with("x-amz-meta-") => { metadata.insert(name_str, val.to_string()); } _ => {} } } } // Default content-type if !metadata.contains_key("content-type") { metadata.insert("content-type".to_string(), "binary/octet-stream".to_string()); } metadata } fn parse_range_header(headers: &hyper::HeaderMap) -> Option<(u64, u64)> { let range_val = headers.get("range")?.to_str().ok()?; let bytes_prefix = "bytes="; if !range_val.starts_with(bytes_prefix) { return None; } let range_spec = &range_val[bytes_prefix.len()..]; let mut parts = range_spec.splitn(2, '-'); let start: u64 = parts.next()?.parse().ok()?; let end_str = parts.next()?; let end: u64 = if end_str.is_empty() { // If no end specified, we'll handle this later based on file size u64::MAX } else { end_str.parse().ok()? }; Some((start, end)) } fn parse_complete_multipart_xml(xml: &str) -> Vec<(u32, String)> { let mut parts = Vec::new(); // Simple XML parsing for N... let mut remaining = xml; while let Some(part_start) = remaining.find("") { let after_part = &remaining[part_start + 6..]; if let Some(part_end) = after_part.find("") { let part_content = &after_part[..part_end]; let part_number = extract_xml_value(part_content, "PartNumber") .and_then(|s| s.parse::().ok()); let etag = extract_xml_value(part_content, "ETag") .map(|s| s.replace('"', "")); if let (Some(pn), Some(et)) = (part_number, etag) { parts.push((pn, et)); } remaining = &after_part[part_end + 7..]; } else { break; } } parts.sort_by_key(|(pn, _)| *pn); parts } fn extract_xml_value<'a>(xml: &'a str, tag: &str) -> Option { let open = format!("<{}>", tag); let close = format!("", tag); let start = xml.find(&open)? + open.len(); let end = xml.find(&close)?; Some(xml[start..end].to_string()) } // ============================ // CORS // ============================ fn build_cors_preflight(config: &S3Config, request_id: &str) -> Response { let mut builder = Response::builder() .status(StatusCode::NO_CONTENT) .header("x-amz-request-id", request_id); if let Some(ref origins) = config.cors.allowed_origins { builder = builder.header("Access-Control-Allow-Origin", origins.join(", ")); } if let Some(ref methods) = config.cors.allowed_methods { builder = builder.header("Access-Control-Allow-Methods", methods.join(", ")); } if let Some(ref headers) = config.cors.allowed_headers { builder = builder.header("Access-Control-Allow-Headers", headers.join(", ")); } if let Some(max_age) = config.cors.max_age { builder = builder.header("Access-Control-Max-Age", max_age.to_string()); } if config.cors.allow_credentials == Some(true) { builder = builder.header("Access-Control-Allow-Credentials", "true"); } builder.body(empty_body()).unwrap() } fn add_cors_headers(headers: &mut hyper::HeaderMap, config: &S3Config) { if let Some(ref origins) = config.cors.allowed_origins { headers.insert( "access-control-allow-origin", origins.join(", ").parse().unwrap(), ); } if let Some(ref exposed) = config.cors.exposed_headers { headers.insert( "access-control-expose-headers", exposed.join(", ").parse().unwrap(), ); } if config.cors.allow_credentials == Some(true) { headers.insert( "access-control-allow-credentials", "true".parse().unwrap(), ); } }