Files
smartstorage/rust/src/cluster/coordinator.rs

852 lines
28 KiB
Rust
Raw Normal View History

use anyhow::Result;
use chrono::{DateTime, Utc};
use http_body_util::BodyExt;
use hyper::body::Incoming;
use md5::{Digest, Md5};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use super::config::ErasureConfig;
use super::erasure::ErasureCoder;
use super::metadata::{ChunkManifest, ObjectManifest, ShardPlacement};
use super::placement::ErasureSet;
use super::protocol::ShardWriteRequest;
use super::quic_transport::QuicTransport;
use super::shard_store::{ShardId, ShardStore};
use super::state::ClusterState;
use crate::storage::{
BucketInfo, CompleteMultipartResult, CopyResult, GetResult, HeadResult, ListObjectEntry,
ListObjectsResult, MultipartUploadInfo, PutResult,
};
/// Distributed storage coordinator.
///
/// Handles S3 operations by distributing erasure-coded shards across
/// the cluster via QUIC, with quorum-based consistency.
pub struct DistributedStore {
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
erasure_coder: ErasureCoder,
/// Local shard stores, one per drive. Index = drive index.
local_shard_stores: Vec<Arc<ShardStore>>,
/// Root directory for manifests on this node
manifest_dir: PathBuf,
/// Root directory for buckets metadata
buckets_dir: PathBuf,
erasure_config: ErasureConfig,
}
impl DistributedStore {
pub fn new(
state: Arc<ClusterState>,
transport: Arc<QuicTransport>,
erasure_config: ErasureConfig,
drive_paths: Vec<PathBuf>,
manifest_dir: PathBuf,
buckets_dir: PathBuf,
) -> Result<Self> {
let erasure_coder = ErasureCoder::new(&erasure_config)?;
let local_shard_stores = drive_paths
.iter()
.map(|p| Arc::new(ShardStore::new(p.clone())))
.collect();
Ok(Self {
state,
transport,
erasure_coder,
local_shard_stores,
manifest_dir,
buckets_dir,
erasure_config,
})
}
// ============================
// Object operations
// ============================
pub async fn put_object(
&self,
bucket: &str,
key: &str,
body: Incoming,
metadata: HashMap<String, String>,
) -> Result<PutResult> {
if !self.bucket_exists(bucket).await {
return Err(crate::error::StorageError::no_such_bucket().into());
}
let erasure_set = self
.state
.get_erasure_set_for_object(bucket, key)
.await
.ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?;
let chunk_size = self.erasure_config.chunk_size_bytes;
let mut chunk_buffer = Vec::with_capacity(chunk_size);
let mut chunk_index: u32 = 0;
let mut chunks = Vec::new();
let mut total_size: u64 = 0;
let mut full_hasher = Md5::new();
// Stream body, processing one chunk at a time
let mut body = body;
loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data() {
full_hasher.update(&data);
total_size += data.len() as u64;
chunk_buffer.extend_from_slice(&data);
// Process complete chunks
while chunk_buffer.len() >= chunk_size {
let chunk_data: Vec<u8> =
chunk_buffer.drain(..chunk_size).collect();
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
bucket,
key,
chunk_index,
&chunk_data,
)
.await?;
chunks.push(chunk_manifest);
chunk_index += 1;
}
}
}
Some(Err(e)) => return Err(anyhow::anyhow!("Body read error: {}", e)),
None => break,
}
}
// Process final partial chunk
if !chunk_buffer.is_empty() {
let chunk_manifest = self
.encode_and_distribute_chunk(&erasure_set, bucket, key, chunk_index, &chunk_buffer)
.await?;
chunks.push(chunk_manifest);
}
let md5_hex = format!("{:x}", full_hasher.finalize());
// Build and store manifest
let manifest = ObjectManifest {
bucket: bucket.to_string(),
key: key.to_string(),
version_id: uuid::Uuid::new_v4().to_string(),
size: total_size,
content_md5: md5_hex.clone(),
content_type: metadata
.get("content-type")
.cloned()
.unwrap_or_else(|| "binary/octet-stream".to_string()),
metadata,
created_at: Utc::now().to_rfc3339(),
last_modified: Utc::now().to_rfc3339(),
data_shards: self.erasure_config.data_shards,
parity_shards: self.erasure_config.parity_shards,
chunk_size: self.erasure_config.chunk_size_bytes,
chunks,
};
self.store_manifest(&manifest).await?;
Ok(PutResult { md5: md5_hex })
}
pub async fn get_object(
&self,
bucket: &str,
key: &str,
range: Option<(u64, u64)>,
) -> Result<GetResult> {
let manifest = self.load_manifest(bucket, key).await?;
// Determine which chunks to fetch based on range
let chunk_size = manifest.chunk_size as u64;
let (first_chunk, last_chunk, byte_offset_in_first, byte_end_in_last) =
if let Some((start, end)) = range {
let first = (start / chunk_size) as usize;
let last = (end / chunk_size) as usize;
let offset = (start % chunk_size) as usize;
let end_in_last = (end % chunk_size) as usize + 1;
(first, last, offset, end_in_last)
} else {
(0, manifest.chunks.len() - 1, 0, 0)
};
// Reconstruct the needed chunks
let mut full_data = Vec::new();
for chunk_idx in first_chunk..=last_chunk.min(manifest.chunks.len() - 1) {
let chunk = &manifest.chunks[chunk_idx];
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?;
full_data.extend_from_slice(&reconstructed);
}
// Apply range if requested
let (response_data, content_length) = if let Some((start, end)) = range {
let adjusted_start = byte_offset_in_first;
let total_range_bytes = (end - start + 1) as usize;
let adjusted_end = adjusted_start + total_range_bytes;
let sliced = full_data[adjusted_start..adjusted_end.min(full_data.len())].to_vec();
let len = sliced.len() as u64;
(sliced, len)
} else {
let len = full_data.len() as u64;
(full_data, len)
};
// Write to a temp file for streaming (matches FileStore's GetResult interface)
let temp_path = self.manifest_dir.join(format!(
".tmp_get_{}_{}",
uuid::Uuid::new_v4(),
key.replace('/', "_")
));
fs::write(&temp_path, &response_data).await?;
let file = fs::File::open(&temp_path).await?;
// Clean up temp file after opening (Unix: file stays accessible via fd)
let _ = fs::remove_file(&temp_path).await;
let last_modified: DateTime<Utc> = manifest
.last_modified
.parse()
.unwrap_or_else(|_| Utc::now());
Ok(GetResult {
size: manifest.size,
last_modified,
md5: manifest.content_md5.clone(),
metadata: manifest.metadata.clone(),
body: file,
content_length,
})
}
pub async fn head_object(&self, bucket: &str, key: &str) -> Result<HeadResult> {
let manifest = self.load_manifest(bucket, key).await?;
let last_modified: DateTime<Utc> = manifest
.last_modified
.parse()
.unwrap_or_else(|_| Utc::now());
Ok(HeadResult {
size: manifest.size,
last_modified,
md5: manifest.content_md5,
metadata: manifest.metadata,
})
}
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
// Load manifest to find all shards
if let Ok(manifest) = self.load_manifest(bucket, key).await {
// Delete shards from all drives
for chunk in &manifest.chunks {
for placement in &chunk.shard_placements {
let shard_id = ShardId {
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
if placement.node_id == self.state.local_node_id() {
// Local delete
if let Some(store) = self
.local_shard_stores
.get(placement.drive_id.parse::<usize>().unwrap_or(0))
{
let _ = store.delete_shard(&shard_id).await;
}
}
// TODO: send delete to remote nodes via QUIC
}
}
}
// Delete manifest
self.delete_manifest(bucket, key).await?;
Ok(())
}
pub async fn copy_object(
&self,
src_bucket: &str,
src_key: &str,
dest_bucket: &str,
dest_key: &str,
_metadata_directive: &str,
new_metadata: Option<HashMap<String, String>>,
) -> Result<CopyResult> {
// Load source manifest
let src_manifest = self.load_manifest(src_bucket, src_key).await?;
// Determine metadata
let metadata = if let Some(meta) = new_metadata {
meta
} else {
src_manifest.metadata.clone()
};
// Read source object fully, then reconstruct
let mut full_data = Vec::new();
for chunk in &src_manifest.chunks {
let reconstructed = self.fetch_and_reconstruct_chunk(chunk).await?;
full_data.extend_from_slice(&reconstructed);
}
// Compute MD5 of full data
let mut hasher = Md5::new();
hasher.update(&full_data);
let md5_hex = format!("{:x}", hasher.finalize());
// Get erasure set for destination
let erasure_set = self
.state
.get_erasure_set_for_object(dest_bucket, dest_key)
.await
.ok_or_else(|| anyhow::anyhow!("No erasure sets available"))?;
// Re-encode and distribute in chunks
let chunk_size = self.erasure_config.chunk_size_bytes;
let mut chunks = Vec::new();
let mut chunk_index = 0u32;
for chunk_data in full_data.chunks(chunk_size) {
let chunk_manifest = self
.encode_and_distribute_chunk(
&erasure_set,
dest_bucket,
dest_key,
chunk_index,
chunk_data,
)
.await?;
chunks.push(chunk_manifest);
chunk_index += 1;
}
let last_modified = Utc::now();
// Build and store manifest
let manifest = ObjectManifest {
bucket: dest_bucket.to_string(),
key: dest_key.to_string(),
version_id: uuid::Uuid::new_v4().to_string(),
size: full_data.len() as u64,
content_md5: md5_hex.clone(),
content_type: metadata
.get("content-type")
.cloned()
.unwrap_or_else(|| "binary/octet-stream".to_string()),
metadata,
created_at: last_modified.to_rfc3339(),
last_modified: last_modified.to_rfc3339(),
data_shards: self.erasure_config.data_shards,
parity_shards: self.erasure_config.parity_shards,
chunk_size: self.erasure_config.chunk_size_bytes,
chunks,
};
self.store_manifest(&manifest).await?;
Ok(CopyResult {
md5: md5_hex,
last_modified,
})
}
pub async fn list_objects(
&self,
bucket: &str,
prefix: &str,
delimiter: &str,
max_keys: usize,
continuation_token: Option<&str>,
) -> Result<ListObjectsResult> {
if !self.bucket_exists(bucket).await {
return Err(crate::error::StorageError::no_such_bucket().into());
}
// List manifests for this bucket
let manifest_bucket_dir = self.manifest_dir.join(bucket);
let mut keys = Vec::new();
if manifest_bucket_dir.is_dir() {
self.collect_manifest_keys(&manifest_bucket_dir, &manifest_bucket_dir, &mut keys)
.await?;
}
// Apply prefix filter
if !prefix.is_empty() {
keys.retain(|k| k.starts_with(prefix));
}
keys.sort();
// Handle continuation token
if let Some(token) = continuation_token {
if let Some(pos) = keys.iter().position(|k| k.as_str() > token) {
keys = keys[pos..].to_vec();
} else {
keys.clear();
}
}
// Handle delimiter and pagination
let mut common_prefixes: Vec<String> = Vec::new();
let mut common_prefix_set = std::collections::HashSet::new();
let mut contents: Vec<ListObjectEntry> = Vec::new();
let mut is_truncated = false;
for key in &keys {
if !delimiter.is_empty() {
let remaining = &key[prefix.len()..];
if let Some(delim_idx) = remaining.find(delimiter) {
let cp = format!(
"{}{}",
prefix,
&remaining[..delim_idx + delimiter.len()]
);
if common_prefix_set.insert(cp.clone()) {
common_prefixes.push(cp);
}
continue;
}
}
if contents.len() >= max_keys {
is_truncated = true;
break;
}
if let Ok(manifest) = self.load_manifest(bucket, key).await {
let last_modified: DateTime<Utc> = manifest
.last_modified
.parse()
.unwrap_or_else(|_| Utc::now());
contents.push(ListObjectEntry {
key: key.clone(),
size: manifest.size,
last_modified,
md5: manifest.content_md5,
});
}
}
let next_continuation_token = if is_truncated {
contents.last().map(|e| e.key.clone())
} else {
None
};
common_prefixes.sort();
Ok(ListObjectsResult {
contents,
common_prefixes,
is_truncated,
next_continuation_token,
prefix: prefix.to_string(),
delimiter: delimiter.to_string(),
max_keys,
})
}
// ============================
// Bucket operations
// ============================
pub async fn list_buckets(&self) -> Result<Vec<BucketInfo>> {
let mut buckets = Vec::new();
if !self.buckets_dir.is_dir() {
return Ok(buckets);
}
let mut entries = fs::read_dir(&self.buckets_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let meta = entry.metadata().await?;
if meta.is_dir() {
let name = entry.file_name().to_string_lossy().to_string();
if name.starts_with('.') {
continue;
}
let creation_date: DateTime<Utc> = meta
.created()
.unwrap_or(meta.modified().unwrap_or(std::time::SystemTime::UNIX_EPOCH))
.into();
buckets.push(BucketInfo {
name,
creation_date,
});
}
}
buckets.sort_by(|a, b| a.name.cmp(&b.name));
Ok(buckets)
}
pub async fn bucket_exists(&self, bucket: &str) -> bool {
self.buckets_dir.join(bucket).is_dir()
}
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
let bucket_path = self.buckets_dir.join(bucket);
fs::create_dir_all(&bucket_path).await?;
// Also create manifest bucket dir
let manifest_bucket = self.manifest_dir.join(bucket);
fs::create_dir_all(&manifest_bucket).await?;
Ok(())
}
pub async fn delete_bucket(&self, bucket: &str) -> Result<()> {
let bucket_path = self.buckets_dir.join(bucket);
if !bucket_path.is_dir() {
return Err(crate::error::StorageError::no_such_bucket().into());
}
// Check if empty (check manifests)
let manifest_bucket = self.manifest_dir.join(bucket);
if manifest_bucket.is_dir() {
let mut entries = fs::read_dir(&manifest_bucket).await?;
if entries.next_entry().await?.is_some() {
return Err(crate::error::StorageError::bucket_not_empty().into());
}
}
let _ = fs::remove_dir_all(&bucket_path).await;
let _ = fs::remove_dir_all(&manifest_bucket).await;
Ok(())
}
// ============================
// Multipart (delegated to local temp storage for now)
// ============================
pub async fn initiate_multipart(
&self,
_bucket: &str,
_key: &str,
_metadata: HashMap<String, String>,
) -> Result<String> {
// TODO: Implement distributed multipart
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
}
pub async fn upload_part(
&self,
_upload_id: &str,
_part_number: u32,
_body: Incoming,
) -> Result<(String, u64)> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
}
pub async fn complete_multipart(
&self,
_upload_id: &str,
_parts: &[(u32, String)],
) -> Result<CompleteMultipartResult> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
}
pub async fn abort_multipart(&self, _upload_id: &str) -> Result<()> {
anyhow::bail!("Multipart uploads not yet supported in cluster mode")
}
pub async fn list_multipart_uploads(
&self,
_bucket: &str,
) -> Result<Vec<MultipartUploadInfo>> {
Ok(Vec::new())
}
// ============================
// Internal: erasure encode + distribute
// ============================
async fn encode_and_distribute_chunk(
&self,
erasure_set: &ErasureSet,
bucket: &str,
key: &str,
chunk_index: u32,
chunk_data: &[u8],
) -> Result<ChunkManifest> {
let shards = self.erasure_coder.encode_chunk(chunk_data)?;
let quorum = self.erasure_config.write_quorum();
let total = shards.len();
let mut shard_placements = Vec::with_capacity(total);
let mut successes = 0u32;
let mut failures = 0u32;
// Distribute shards to drives in the erasure set
for (shard_idx, shard_data) in shards.iter().enumerate() {
let drive = erasure_set
.drives
.get(shard_idx)
.ok_or_else(|| anyhow::anyhow!("Not enough drives in erasure set"))?;
let checksum = crc32c::crc32c(shard_data);
let shard_id = ShardId {
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index,
shard_index: shard_idx as u32,
};
let result = if drive.node_id == self.state.local_node_id() {
// Local write
if let Some(store) =
self.local_shard_stores.get(drive.drive_index as usize)
{
store.write_shard(&shard_id, shard_data, checksum).await
} else {
Err(anyhow::anyhow!("Local drive {} not found", drive.drive_index))
}
} else {
// Remote write via QUIC
self.write_shard_remote(
&drive.node_id,
bucket,
key,
chunk_index,
shard_idx as u32,
shard_data,
checksum,
)
.await
};
match result {
Ok(()) => {
successes += 1;
shard_placements.push(ShardPlacement {
shard_index: shard_idx as u32,
node_id: drive.node_id.clone(),
drive_id: drive.drive_index.to_string(),
checksum,
shard_size: shard_data.len(),
});
}
Err(e) => {
failures += 1;
tracing::warn!(
shard_index = shard_idx,
node = %drive.node_id,
error = %e,
"Shard write failed"
);
if failures as usize > total - quorum {
anyhow::bail!(
"Write quorum not achievable: {}/{} failures",
failures,
total
);
}
}
}
}
if (successes as usize) < quorum {
anyhow::bail!(
"Write quorum not met: only {}/{} succeeded (need {})",
successes,
total,
quorum
);
}
Ok(ChunkManifest {
chunk_index,
data_size: chunk_data.len(),
shard_placements,
})
}
async fn write_shard_remote(
&self,
node_id: &str,
bucket: &str,
key: &str,
chunk_index: u32,
shard_index: u32,
data: &[u8],
checksum: u32,
) -> Result<()> {
let node_info = self
.state
.get_node(node_id)
.await
.ok_or_else(|| anyhow::anyhow!("Node {} not found", node_id))?;
let addr: SocketAddr = node_info.quic_addr.parse()?;
let conn = self.transport.get_connection(node_id, addr).await?;
let request = ShardWriteRequest {
request_id: uuid::Uuid::new_v4().to_string(),
bucket: bucket.to_string(),
key: key.to_string(),
chunk_index,
shard_index,
shard_data_length: data.len() as u64,
checksum,
object_metadata: HashMap::new(),
};
let ack = self
.transport
.send_shard_write(&conn, request, data)
.await?;
if ack.success {
Ok(())
} else {
anyhow::bail!(
"Remote shard write failed: {}",
ack.error.unwrap_or_default()
)
}
}
// ============================
// Internal: fetch + reconstruct
// ============================
async fn fetch_and_reconstruct_chunk(&self, chunk: &ChunkManifest) -> Result<Vec<u8>> {
let k = self.erasure_config.data_shards;
let total = self.erasure_config.total_shards();
let mut shards: Vec<Option<Vec<u8>>> = vec![None; total];
let mut succeeded = 0usize;
// Try to fetch shards (local first, then remote)
for placement in &chunk.shard_placements {
let shard_id = ShardId {
bucket: String::new(), // Not needed for read
key: String::new(),
chunk_index: chunk.chunk_index,
shard_index: placement.shard_index,
};
let result = if placement.node_id == self.state.local_node_id() {
// Local read
let store_idx = placement.drive_id.parse::<usize>().unwrap_or(0);
if let Some(store) = self.local_shard_stores.get(store_idx) {
// Need to set proper bucket/key on shard_id for local reads
// We get this from the chunk's context, but we don't have it here.
// This will be passed through the manifest's shard placements.
store.read_shard(&shard_id).await.ok()
} else {
None
}
} else {
// Remote read via QUIC
// TODO: implement remote shard read
None
};
if let Some((data, _checksum)) = result {
shards[placement.shard_index as usize] = Some(data);
succeeded += 1;
if succeeded >= k {
break; // Have enough shards
}
}
}
if succeeded < k {
anyhow::bail!(
"Read quorum not met: only {}/{} shards available for chunk {}",
succeeded,
k,
chunk.chunk_index
);
}
self.erasure_coder
.decode_chunk(&mut shards, chunk.data_size)
}
// ============================
// Manifest storage (local filesystem)
// ============================
async fn store_manifest(&self, manifest: &ObjectManifest) -> Result<()> {
let path = self.manifest_path(&manifest.bucket, &manifest.key);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent).await?;
}
let json = serde_json::to_string_pretty(manifest)?;
// Atomic write via temp + rename
let temp_path = path.with_extension("manifest.tmp");
fs::write(&temp_path, json).await?;
fs::rename(&temp_path, &path).await?;
Ok(())
}
async fn load_manifest(&self, bucket: &str, key: &str) -> Result<ObjectManifest> {
let path = self.manifest_path(bucket, key);
if !path.exists() {
return Err(crate::error::StorageError::no_such_key().into());
}
let json = fs::read_to_string(&path).await?;
let manifest: ObjectManifest = serde_json::from_str(&json)?;
Ok(manifest)
}
async fn delete_manifest(&self, bucket: &str, key: &str) -> Result<()> {
let path = self.manifest_path(bucket, key);
let _ = fs::remove_file(&path).await;
// Clean up empty parent dirs
if let Some(parent) = path.parent() {
let _ = fs::remove_dir(parent).await;
}
Ok(())
}
fn manifest_path(&self, bucket: &str, key: &str) -> PathBuf {
self.manifest_dir
.join(bucket)
.join(format!("{}.manifest.json", key))
}
async fn collect_manifest_keys(
&self,
base_dir: &std::path::Path,
dir: &std::path::Path,
keys: &mut Vec<String>,
) -> Result<()> {
let mut entries = match fs::read_dir(dir).await {
Ok(e) => e,
Err(_) => return Ok(()),
};
while let Some(entry) = entries.next_entry().await? {
let meta = entry.metadata().await?;
let name = entry.file_name().to_string_lossy().to_string();
if meta.is_dir() {
Box::pin(self.collect_manifest_keys(base_dir, &entry.path(), keys)).await?;
} else if name.ends_with(".manifest.json") {
let relative = entry
.path()
.strip_prefix(base_dir)
.unwrap_or(std::path::Path::new(""))
.to_string_lossy()
.to_string();
let key = relative.trim_end_matches(".manifest.json").to_string();
keys.push(key);
}
}
Ok(())
}
}