feat(auth,policy): add AWS SigV4 authentication and S3 bucket policy support

This commit is contained in:
2026-02-17 16:28:50 +00:00
parent 0b9d8c4a72
commit eb232b6e8e
18 changed files with 2616 additions and 55 deletions

View File

@@ -18,7 +18,10 @@ use tokio::sync::watch;
use tokio_util::io::ReaderStream;
use uuid::Uuid;
use crate::action::{self, RequestContext, S3Action};
use crate::auth::{self, AuthenticatedIdentity};
use crate::config::S3Config;
use crate::policy::{self, PolicyDecision, PolicyStore};
use crate::s3_error::S3Error;
use crate::storage::FileStore;
use crate::xml_response;
@@ -41,6 +44,10 @@ impl S3Server {
store.initialize().await?;
}
// Initialize policy store
let policy_store = Arc::new(PolicyStore::new(store.policies_dir()));
policy_store.load_from_disk().await?;
let addr: SocketAddr = format!("{}:{}", config.address(), config.server.port)
.parse()?;
@@ -49,6 +56,7 @@ impl S3Server {
let server_store = store.clone();
let server_config = config.clone();
let server_policy_store = policy_store.clone();
let server_handle = tokio::spawn(async move {
loop {
@@ -61,13 +69,15 @@ impl S3Server {
let io = TokioIo::new(stream);
let store = server_store.clone();
let cfg = server_config.clone();
let ps = server_policy_store.clone();
tokio::spawn(async move {
let svc = service_fn(move |req: Request<Incoming>| {
let store = store.clone();
let cfg = cfg.clone();
let ps = ps.clone();
async move {
handle_request(req, store, cfg).await
handle_request(req, store, cfg, ps).await
}
});
@@ -198,6 +208,7 @@ async fn handle_request(
req: Request<Incoming>,
store: Arc<FileStore>,
config: S3Config,
policy_store: Arc<PolicyStore>,
) -> Result<Response<BoxBody>, std::convert::Infallible> {
let request_id = Uuid::new_v4().to_string();
let method = req.method().clone();
@@ -210,16 +221,41 @@ async fn handle_request(
return Ok(resp);
}
// Auth check
// Step 1: Resolve S3 action from request
let request_ctx = action::resolve_action(&req);
// Step 2: Auth + policy pipeline
if config.auth.enabled {
if let Err(e) = check_auth(&req, &config) {
tracing::warn!("Auth failed: {}", e.message);
// Attempt authentication
let identity = {
let has_auth_header = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok())
.map(|s| !s.is_empty())
.unwrap_or(false);
if has_auth_header {
match auth::verify_request(&req, &config) {
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("Auth failed: {}", e.message);
return Ok(s3_error_response(&e, &request_id));
}
}
} else {
None // Anonymous request
}
};
// Step 3: Authorization (policy evaluation)
if let Err(e) = authorize_request(&request_ctx, identity.as_ref(), &policy_store).await {
return Ok(s3_error_response(&e, &request_id));
}
}
// Route and handle
let mut response = match route_request(req, store, &config, &request_id).await {
let mut response = match route_request(req, store, &config, &request_id, &policy_store).await {
Ok(resp) => resp,
Err(err) => {
if let Some(s3err) = err.downcast_ref::<S3Error>() {
@@ -249,6 +285,42 @@ async fn handle_request(
Ok(response)
}
/// Authorize a request based on bucket policies and authentication state.
async fn authorize_request(
ctx: &RequestContext,
identity: Option<&AuthenticatedIdentity>,
policy_store: &PolicyStore,
) -> Result<(), S3Error> {
// ListAllMyBuckets requires authentication (no bucket to apply policy to)
if ctx.action == S3Action::ListAllMyBuckets {
if identity.is_none() {
return Err(S3Error::access_denied());
}
return Ok(());
}
// If there's a bucket, check its policy
if let Some(ref bucket) = ctx.bucket {
if let Some(bucket_policy) = policy_store.get_policy(bucket).await {
let decision = policy::evaluate_policy(&bucket_policy, ctx, identity);
match decision {
PolicyDecision::Deny => return Err(S3Error::access_denied()),
PolicyDecision::Allow => return Ok(()),
PolicyDecision::NoOpinion => {
// Fall through to default behavior
}
}
}
}
// Default: authenticated users get full access, anonymous denied
if identity.is_none() {
return Err(S3Error::access_denied());
}
Ok(())
}
// ============================
// Routing
// ============================
@@ -258,6 +330,7 @@ async fn route_request(
store: Arc<FileStore>,
_config: &S3Config,
request_id: &str,
policy_store: &Arc<PolicyStore>,
) -> Result<Response<BoxBody>> {
let method = req.method().clone();
let path = req.uri().path().to_string();
@@ -282,6 +355,17 @@ async fn route_request(
1 => {
// Bucket level: /{bucket}
let bucket = percent_decode(segments[0]);
// Check for ?policy query parameter
if query.contains_key("policy") {
return match method {
Method::GET => handle_get_bucket_policy(policy_store, &bucket, request_id).await,
Method::PUT => handle_put_bucket_policy(req, &store, policy_store, &bucket, request_id).await,
Method::DELETE => handle_delete_bucket_policy(policy_store, &bucket, request_id).await,
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
};
}
match method {
Method::GET => {
if query.contains_key("uploads") {
@@ -291,7 +375,7 @@ async fn route_request(
}
}
Method::PUT => handle_create_bucket(store, &bucket, request_id).await,
Method::DELETE => handle_delete_bucket(store, &bucket, request_id).await,
Method::DELETE => handle_delete_bucket(store, &bucket, request_id, policy_store).await,
Method::HEAD => handle_head_bucket(store, &bucket, request_id).await,
_ => Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED, request_id)),
}
@@ -369,8 +453,11 @@ async fn handle_delete_bucket(
store: Arc<FileStore>,
bucket: &str,
request_id: &str,
policy_store: &Arc<PolicyStore>,
) -> Result<Response<BoxBody>> {
store.delete_bucket(bucket).await?;
// Clean up bucket policy on deletion
let _ = policy_store.delete_policy(bucket).await;
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
}
@@ -577,6 +664,70 @@ async fn handle_copy_object(
Ok(xml_response(StatusCode::OK, xml, request_id))
}
// ============================
// Policy handlers
// ============================
async fn handle_get_bucket_policy(
policy_store: &Arc<PolicyStore>,
bucket: &str,
request_id: &str,
) -> Result<Response<BoxBody>> {
match policy_store.get_policy(bucket).await {
Some(p) => {
let json = serde_json::to_string_pretty(&p)?;
let resp = Response::builder()
.status(StatusCode::OK)
.header("content-type", "application/json")
.header("x-amz-request-id", request_id)
.body(full_body(json))
.unwrap();
Ok(resp)
}
None => Err(S3Error::no_such_bucket_policy().into()),
}
}
async fn handle_put_bucket_policy(
req: Request<Incoming>,
store: &Arc<FileStore>,
policy_store: &Arc<PolicyStore>,
bucket: &str,
request_id: &str,
) -> Result<Response<BoxBody>> {
// Verify bucket exists
if !store.bucket_exists(bucket).await {
return Err(S3Error::no_such_bucket().into());
}
// Read body
let body_bytes = req.collect().await.map_err(|e| anyhow::anyhow!("Body error: {}", e))?.to_bytes();
let body_str = String::from_utf8_lossy(&body_bytes);
// Validate and parse
let validated_policy = policy::validate_policy(&body_str)?;
// Store
policy_store
.put_policy(bucket, validated_policy)
.await
.map_err(|e| S3Error::internal_error(&e.to_string()))?;
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
}
async fn handle_delete_bucket_policy(
policy_store: &Arc<PolicyStore>,
bucket: &str,
request_id: &str,
) -> Result<Response<BoxBody>> {
policy_store
.delete_policy(bucket)
.await
.map_err(|e| S3Error::internal_error(&e.to_string()))?;
Ok(empty_response(StatusCode::NO_CONTENT, request_id))
}
// ============================
// Multipart handlers
// ============================
@@ -820,46 +971,3 @@ fn add_cors_headers(headers: &mut hyper::HeaderMap, config: &S3Config) {
);
}
}
// ============================
// Auth
// ============================
fn check_auth(req: &Request<Incoming>, config: &S3Config) -> Result<(), S3Error> {
let auth_header = req
.headers()
.get("authorization")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
if auth_header.is_empty() {
return Err(S3Error::access_denied());
}
// Extract access key from AWS v2 or v4 signature
let access_key = if auth_header.starts_with("AWS4-HMAC-SHA256") {
// v4: AWS4-HMAC-SHA256 Credential=KEY/date/region/s3/aws4_request, ...
auth_header
.split("Credential=")
.nth(1)
.and_then(|s| s.split('/').next())
} else if auth_header.starts_with("AWS ") {
// v2: AWS KEY:signature
auth_header
.strip_prefix("AWS ")
.and_then(|s| s.split(':').next())
} else {
None
};
let access_key = access_key.unwrap_or("");
// Check against configured credentials
for cred in &config.auth.credentials {
if cred.access_key_id == access_key {
return Ok(());
}
}
Err(S3Error::access_denied())
}