use anyhow::Result; use chrono::{DateTime, Utc}; use http_body_util::BodyExt; use hyper::body::Incoming; use md5::{Digest, Md5}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::{Path, PathBuf}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter}; use uuid::Uuid; use crate::s3_error::S3Error; // ============================ // Result types // ============================ pub struct PutResult { pub size: u64, pub md5: String, } pub struct GetResult { pub key: String, pub size: u64, pub last_modified: DateTime, pub md5: String, pub metadata: HashMap, pub body: tokio::fs::File, pub content_length: u64, } pub struct HeadResult { pub key: String, pub size: u64, pub last_modified: DateTime, pub md5: String, pub metadata: HashMap, } pub struct CopyResult { pub size: u64, pub md5: String, pub last_modified: DateTime, } pub struct ListObjectEntry { pub key: String, pub size: u64, pub last_modified: DateTime, pub md5: String, } pub struct ListObjectsResult { pub contents: Vec, pub common_prefixes: Vec, pub is_truncated: bool, pub next_continuation_token: Option, pub prefix: String, pub delimiter: String, pub max_keys: usize, } pub struct BucketInfo { pub name: String, pub creation_date: DateTime, } pub struct MultipartUploadInfo { pub upload_id: String, pub bucket: String, pub key: String, pub initiated: DateTime, } pub struct CompleteMultipartResult { pub etag: String, pub size: u64, } // ============================ // Multipart metadata (disk format, compatible with TS) // ============================ #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct MultipartMetadata { upload_id: String, bucket: String, key: String, initiated: String, metadata: HashMap, parts: Vec, } #[derive(Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct PartMetadata { part_number: u32, etag: String, size: u64, last_modified: String, } // ============================ // FileStore // ============================ pub struct FileStore { root_dir: PathBuf, } impl FileStore { pub fn new(root_dir: PathBuf) -> Self { Self { root_dir } } pub async fn initialize(&self) -> Result<()> { fs::create_dir_all(&self.root_dir).await?; Ok(()) } pub async fn reset(&self) -> Result<()> { if self.root_dir.exists() { fs::remove_dir_all(&self.root_dir).await?; } fs::create_dir_all(&self.root_dir).await?; Ok(()) } // ============================ // Bucket operations // ============================ pub async fn list_buckets(&self) -> Result> { let mut buckets = Vec::new(); let mut entries = fs::read_dir(&self.root_dir).await?; while let Some(entry) = entries.next_entry().await? { let meta = entry.metadata().await?; if meta.is_dir() { let name = entry.file_name().to_string_lossy().to_string(); // Skip hidden dirs like .multipart if name.starts_with('.') { continue; } let creation_date: DateTime = meta .created() .unwrap_or(meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH)) .into(); buckets.push(BucketInfo { name, creation_date, }); } } buckets.sort_by(|a, b| a.name.cmp(&b.name)); Ok(buckets) } pub async fn bucket_exists(&self, bucket: &str) -> bool { self.root_dir.join(bucket).is_dir() } pub async fn create_bucket(&self, bucket: &str) -> Result<()> { let bucket_path = self.root_dir.join(bucket); fs::create_dir_all(&bucket_path).await?; Ok(()) } pub async fn delete_bucket(&self, bucket: &str) -> Result<()> { let bucket_path = self.root_dir.join(bucket); if !bucket_path.is_dir() { return Err(S3Error::no_such_bucket().into()); } // Check if bucket is empty (ignore hidden files) let mut entries = fs::read_dir(&bucket_path).await?; while let Some(_entry) = entries.next_entry().await? { return Err(S3Error::bucket_not_empty().into()); } fs::remove_dir_all(&bucket_path).await?; Ok(()) } // ============================ // Object operations // ============================ pub async fn put_object( &self, bucket: &str, key: &str, body: Incoming, metadata: HashMap, ) -> Result { if !self.bucket_exists(bucket).await { return Err(S3Error::no_such_bucket().into()); } let object_path = self.object_path(bucket, key); if let Some(parent) = object_path.parent() { fs::create_dir_all(parent).await?; } let file = fs::File::create(&object_path).await?; let mut writer = BufWriter::new(file); let mut hasher = Md5::new(); let mut total_size: u64 = 0; // Stream body frames directly to file let mut body = body; loop { match body.frame().await { Some(Ok(frame)) => { if let Ok(data) = frame.into_data() { hasher.update(&data); total_size += data.len() as u64; writer.write_all(&data).await?; } } Some(Err(e)) => { return Err(anyhow::anyhow!("Body read error: {}", e)); } None => break, } } writer.flush().await?; drop(writer); let md5_hex = format!("{:x}", hasher.finalize()); // Write MD5 sidecar let md5_path = format!("{}.md5", object_path.display()); fs::write(&md5_path, &md5_hex).await?; // Write metadata sidecar let metadata_path = format!("{}.metadata.json", object_path.display()); let metadata_json = serde_json::to_string_pretty(&metadata)?; fs::write(&metadata_path, metadata_json).await?; Ok(PutResult { size: total_size, md5: md5_hex, }) } pub async fn put_object_bytes( &self, bucket: &str, key: &str, data: &[u8], metadata: HashMap, ) -> Result { if !self.bucket_exists(bucket).await { return Err(S3Error::no_such_bucket().into()); } let object_path = self.object_path(bucket, key); if let Some(parent) = object_path.parent() { fs::create_dir_all(parent).await?; } let mut hasher = Md5::new(); hasher.update(data); let md5_hex = format!("{:x}", hasher.finalize()); fs::write(&object_path, data).await?; // Write MD5 sidecar let md5_path = format!("{}.md5", object_path.display()); fs::write(&md5_path, &md5_hex).await?; // Write metadata sidecar let metadata_path = format!("{}.metadata.json", object_path.display()); let metadata_json = serde_json::to_string_pretty(&metadata)?; fs::write(&metadata_path, metadata_json).await?; Ok(PutResult { size: data.len() as u64, md5: md5_hex, }) } pub async fn get_object( &self, bucket: &str, key: &str, range: Option<(u64, u64)>, ) -> Result { let object_path = self.object_path(bucket, key); if !object_path.exists() { return Err(S3Error::no_such_key().into()); } let file_meta = fs::metadata(&object_path).await?; let size = file_meta.len(); let last_modified: DateTime = file_meta.modified()?.into(); let md5 = self.read_md5(&object_path).await; let metadata = self.read_metadata(&object_path).await; let mut file = fs::File::open(&object_path).await?; let content_length = if let Some((start, end)) = range { file.seek(std::io::SeekFrom::Start(start)).await?; end - start + 1 } else { size }; Ok(GetResult { key: key.to_string(), size, last_modified, md5, metadata, body: file, content_length, }) } pub async fn head_object(&self, bucket: &str, key: &str) -> Result { let object_path = self.object_path(bucket, key); if !object_path.exists() { return Err(S3Error::no_such_key().into()); } // Only stat the file, don't open it let file_meta = fs::metadata(&object_path).await?; let size = file_meta.len(); let last_modified: DateTime = file_meta.modified()?.into(); let md5 = self.read_md5(&object_path).await; let metadata = self.read_metadata(&object_path).await; Ok(HeadResult { key: key.to_string(), size, last_modified, md5, metadata, }) } pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> { let object_path = self.object_path(bucket, key); let md5_path = format!("{}.md5", object_path.display()); let metadata_path = format!("{}.metadata.json", object_path.display()); // S3 doesn't error if object doesn't exist let _ = fs::remove_file(&object_path).await; let _ = fs::remove_file(&md5_path).await; let _ = fs::remove_file(&metadata_path).await; // Clean up empty parent directories up to bucket level let bucket_path = self.root_dir.join(bucket); let mut current = object_path.parent().map(|p| p.to_path_buf()); while let Some(dir) = current { if dir == bucket_path { break; } if fs::read_dir(&dir).await.is_ok() { let mut entries = fs::read_dir(&dir).await?; if entries.next_entry().await?.is_none() { let _ = fs::remove_dir(&dir).await; } else { break; } } current = dir.parent().map(|p| p.to_path_buf()); } Ok(()) } pub async fn copy_object( &self, src_bucket: &str, src_key: &str, dest_bucket: &str, dest_key: &str, metadata_directive: &str, new_metadata: Option>, ) -> Result { let src_path = self.object_path(src_bucket, src_key); let dest_path = self.object_path(dest_bucket, dest_key); if !src_path.exists() { return Err(S3Error::no_such_key().into()); } if !self.bucket_exists(dest_bucket).await { return Err(S3Error::no_such_bucket().into()); } if let Some(parent) = dest_path.parent() { fs::create_dir_all(parent).await?; } // Copy object file fs::copy(&src_path, &dest_path).await?; // Handle metadata if metadata_directive == "COPY" { let src_meta_path = format!("{}.metadata.json", src_path.display()); let dest_meta_path = format!("{}.metadata.json", dest_path.display()); let _ = fs::copy(&src_meta_path, &dest_meta_path).await; } else if let Some(meta) = new_metadata { let dest_meta_path = format!("{}.metadata.json", dest_path.display()); let json = serde_json::to_string_pretty(&meta)?; fs::write(&dest_meta_path, json).await?; } // Copy MD5 let src_md5_path = format!("{}.md5", src_path.display()); let dest_md5_path = format!("{}.md5", dest_path.display()); let _ = fs::copy(&src_md5_path, &dest_md5_path).await; let file_meta = fs::metadata(&dest_path).await?; let md5 = self.read_md5(&dest_path).await; let last_modified: DateTime = file_meta.modified()?.into(); Ok(CopyResult { size: file_meta.len(), md5, last_modified, }) } pub async fn list_objects( &self, bucket: &str, prefix: &str, delimiter: &str, max_keys: usize, continuation_token: Option<&str>, ) -> Result { let bucket_path = self.root_dir.join(bucket); if !bucket_path.is_dir() { return Err(S3Error::no_such_bucket().into()); } // Collect all object keys recursively let mut keys = Vec::new(); self.collect_keys(&bucket_path, &bucket_path, &mut keys) .await?; // Apply prefix filter if !prefix.is_empty() { keys.retain(|k| k.starts_with(prefix)); } keys.sort(); // Handle continuation token if let Some(token) = continuation_token { if let Some(pos) = keys.iter().position(|k| k.as_str() > token) { keys = keys[pos..].to_vec(); } else { keys.clear(); } } // Handle delimiter and pagination let mut common_prefixes: Vec = Vec::new(); let mut common_prefix_set = std::collections::HashSet::new(); let mut contents: Vec = Vec::new(); let mut is_truncated = false; for key in &keys { if !delimiter.is_empty() { let remaining = &key[prefix.len()..]; if let Some(delim_idx) = remaining.find(delimiter) { let cp = format!( "{}{}", prefix, &remaining[..delim_idx + delimiter.len()] ); if common_prefix_set.insert(cp.clone()) { common_prefixes.push(cp); } continue; } } if contents.len() >= max_keys { is_truncated = true; break; } let object_path = self.object_path(bucket, key); if let Ok(meta) = fs::metadata(&object_path).await { let md5 = self.read_md5(&object_path).await; let last_modified: DateTime = meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH).into(); contents.push(ListObjectEntry { key: key.clone(), size: meta.len(), last_modified, md5, }); } } let next_continuation_token = if is_truncated { contents.last().map(|e| e.key.clone()) } else { None }; common_prefixes.sort(); Ok(ListObjectsResult { contents, common_prefixes, is_truncated, next_continuation_token, prefix: prefix.to_string(), delimiter: delimiter.to_string(), max_keys, }) } // ============================ // Multipart operations // ============================ fn multipart_dir(&self) -> PathBuf { self.root_dir.join(".multipart") } pub async fn initiate_multipart( &self, bucket: &str, key: &str, metadata: HashMap, ) -> Result { let upload_id = Uuid::new_v4().to_string().replace('-', ""); let upload_dir = self.multipart_dir().join(&upload_id); fs::create_dir_all(&upload_dir).await?; let meta = MultipartMetadata { upload_id: upload_id.clone(), bucket: bucket.to_string(), key: key.to_string(), initiated: Utc::now().to_rfc3339(), metadata, parts: Vec::new(), }; let meta_path = upload_dir.join("metadata.json"); let json = serde_json::to_string_pretty(&meta)?; fs::write(&meta_path, json).await?; Ok(upload_id) } pub async fn upload_part( &self, upload_id: &str, part_number: u32, body: Incoming, ) -> Result<(String, u64)> { let upload_dir = self.multipart_dir().join(upload_id); if !upload_dir.is_dir() { return Err(S3Error::no_such_upload().into()); } let part_path = upload_dir.join(format!("part-{}", part_number)); let file = fs::File::create(&part_path).await?; let mut writer = BufWriter::new(file); let mut hasher = Md5::new(); let mut size: u64 = 0; let mut body = body; loop { match body.frame().await { Some(Ok(frame)) => { if let Ok(data) = frame.into_data() { hasher.update(&data); size += data.len() as u64; writer.write_all(&data).await?; } } Some(Err(e)) => { return Err(anyhow::anyhow!("Body read error: {}", e)); } None => break, } } writer.flush().await?; drop(writer); let etag = format!("{:x}", hasher.finalize()); // Update metadata self.update_multipart_metadata(upload_id, part_number, &etag, size) .await?; Ok((etag, size)) } async fn update_multipart_metadata( &self, upload_id: &str, part_number: u32, etag: &str, size: u64, ) -> Result<()> { let meta_path = self.multipart_dir().join(upload_id).join("metadata.json"); let content = fs::read_to_string(&meta_path).await?; let mut meta: MultipartMetadata = serde_json::from_str(&content)?; // Remove existing part with same number meta.parts.retain(|p| p.part_number != part_number); meta.parts.push(PartMetadata { part_number, etag: etag.to_string(), size, last_modified: Utc::now().to_rfc3339(), }); meta.parts.sort_by_key(|p| p.part_number); let json = serde_json::to_string_pretty(&meta)?; fs::write(&meta_path, json).await?; Ok(()) } pub async fn complete_multipart( &self, upload_id: &str, parts: &[(u32, String)], ) -> Result { let upload_dir = self.multipart_dir().join(upload_id); if !upload_dir.is_dir() { return Err(S3Error::no_such_upload().into()); } // Read metadata to get bucket/key let meta_path = upload_dir.join("metadata.json"); let content = fs::read_to_string(&meta_path).await?; let meta: MultipartMetadata = serde_json::from_str(&content)?; let object_path = self.object_path(&meta.bucket, &meta.key); if let Some(parent) = object_path.parent() { fs::create_dir_all(parent).await?; } // Concatenate parts into final object, stream each part let dest_file = fs::File::create(&object_path).await?; let mut writer = BufWriter::new(dest_file); let mut hasher = Md5::new(); let mut total_size: u64 = 0; for (part_number, _etag) in parts { let part_path = upload_dir.join(format!("part-{}", part_number)); if !part_path.exists() { return Err(anyhow::anyhow!("Part {} not found", part_number)); } let mut part_file = fs::File::open(&part_path).await?; let mut buf = vec![0u8; 64 * 1024]; // 64KB buffer loop { let n = part_file.read(&mut buf).await?; if n == 0 { break; } hasher.update(&buf[..n]); writer.write_all(&buf[..n]).await?; total_size += n as u64; } } writer.flush().await?; drop(writer); let etag = format!("{:x}", hasher.finalize()); // Write MD5 sidecar let md5_path = format!("{}.md5", object_path.display()); fs::write(&md5_path, &etag).await?; // Write metadata sidecar let metadata_path = format!("{}.metadata.json", object_path.display()); let metadata_json = serde_json::to_string_pretty(&meta.metadata)?; fs::write(&metadata_path, metadata_json).await?; // Clean up multipart directory let _ = fs::remove_dir_all(&upload_dir).await; Ok(CompleteMultipartResult { etag, size: total_size, }) } pub async fn abort_multipart(&self, upload_id: &str) -> Result<()> { let upload_dir = self.multipart_dir().join(upload_id); if !upload_dir.is_dir() { return Err(S3Error::no_such_upload().into()); } fs::remove_dir_all(&upload_dir).await?; Ok(()) } pub async fn list_multipart_uploads( &self, bucket: &str, ) -> Result> { let multipart_dir = self.multipart_dir(); if !multipart_dir.is_dir() { return Ok(Vec::new()); } let mut uploads = Vec::new(); let mut entries = fs::read_dir(&multipart_dir).await?; while let Some(entry) = entries.next_entry().await? { if !entry.metadata().await?.is_dir() { continue; } let meta_path = entry.path().join("metadata.json"); if let Ok(content) = fs::read_to_string(&meta_path).await { if let Ok(meta) = serde_json::from_str::(&content) { if meta.bucket == bucket { let initiated = DateTime::parse_from_rfc3339(&meta.initiated) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or_else(|_| Utc::now()); uploads.push(MultipartUploadInfo { upload_id: meta.upload_id, bucket: meta.bucket, key: meta.key, initiated, }); } } } } Ok(uploads) } // ============================ // Helpers // ============================ fn object_path(&self, bucket: &str, key: &str) -> PathBuf { let encoded = encode_key(key); self.root_dir .join(bucket) .join(format!("{}._S3_object", encoded)) } async fn read_md5(&self, object_path: &Path) -> String { let md5_path = format!("{}.md5", object_path.display()); match fs::read_to_string(&md5_path).await { Ok(s) => s.trim().to_string(), Err(_) => { // Calculate MD5 if sidecar missing match self.calculate_md5(object_path).await { Ok(hash) => { let _ = fs::write(&md5_path, &hash).await; hash } Err(_) => String::new(), } } } } async fn calculate_md5(&self, path: &Path) -> Result { let mut file = fs::File::open(path).await?; let mut hasher = Md5::new(); let mut buf = vec![0u8; 64 * 1024]; loop { let n = file.read(&mut buf).await?; if n == 0 { break; } hasher.update(&buf[..n]); } Ok(format!("{:x}", hasher.finalize())) } async fn read_metadata(&self, object_path: &Path) -> HashMap { let meta_path = format!("{}.metadata.json", object_path.display()); match fs::read_to_string(&meta_path).await { Ok(s) => serde_json::from_str(&s).unwrap_or_default(), Err(_) => HashMap::new(), } } fn collect_keys<'a>( &'a self, bucket_path: &'a Path, dir: &'a Path, keys: &'a mut Vec, ) -> std::pin::Pin> + Send + 'a>> { Box::pin(async move { let mut entries = match fs::read_dir(dir).await { Ok(e) => e, Err(_) => return Ok(()), }; while let Some(entry) = entries.next_entry().await? { let meta = entry.metadata().await?; let name = entry.file_name().to_string_lossy().to_string(); if meta.is_dir() { self.collect_keys(bucket_path, &entry.path(), keys).await?; } else if name.ends_with("._S3_object") && !name.ends_with(".metadata.json") && !name.ends_with(".md5") { let relative = entry .path() .strip_prefix(bucket_path) .unwrap_or(Path::new("")) .to_string_lossy() .to_string(); let key = decode_key(relative.trim_end_matches("._S3_object")); keys.push(key); } } Ok(()) }) } } // ============================ // Key encoding (identity on Linux) // ============================ fn encode_key(key: &str) -> String { if cfg!(windows) { key.chars() .map(|c| match c { '<' | '>' | ':' | '"' | '\\' | '|' | '?' | '*' => { format!("&{:02x}", c as u32) } _ => c.to_string(), }) .collect() } else { key.to_string() } } fn decode_key(encoded: &str) -> String { if cfg!(windows) { let mut result = String::new(); let mut chars = encoded.chars(); while let Some(c) = chars.next() { if c == '&' { let hex: String = chars.by_ref().take(2).collect(); if let Ok(byte) = u8::from_str_radix(&hex, 16) { result.push(byte as char); } else { result.push('&'); result.push_str(&hex); } } else { result.push(c); } } result } else { encoded.to_string() } }