From 66aa43494e3ade4b536c538a9989e79896fb7a01 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 22 Mar 2026 00:14:17 +0000 Subject: [PATCH] feat: wire parity into ingest pipeline, optimize restore with nonce caching - Parity generation auto-triggers after every N packs during ingest - ParityConfig stored in repository config.json - Nonce stored in global index entries, eliminating IDX re-reads during encrypted restore (fast path) with IDX cache fallback - Repair now attempts parity-based pack reconstruction before reindexing --- rust/src/config.rs | 15 ++++++++++++ rust/src/global_index.rs | 8 +++++++ rust/src/ingest.rs | 49 +++++++++++++++++++++++++++++++------- rust/src/restore.rs | 51 +++++++++++++++++++++++++++++----------- 4 files changed, 101 insertions(+), 22 deletions(-) diff --git a/rust/src/config.rs b/rust/src/config.rs index 83be1b6..124721d 100644 --- a/rust/src/config.rs +++ b/rust/src/config.rs @@ -11,6 +11,16 @@ pub struct RepositoryConfig { #[serde(skip_serializing_if = "Option::is_none")] pub encryption: Option, pub pack_target_size: u64, + #[serde(default)] + pub parity: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParityConfigSpec { + pub algorithm: String, + pub data_count: usize, + pub parity_count: usize, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -92,6 +102,11 @@ impl RepositoryConfig { compression: "gzip".to_string(), encryption, pack_target_size: 8 * 1024 * 1024, // 8 MB + parity: Some(ParityConfigSpec { + algorithm: "reed-solomon".to_string(), + data_count: 20, + parity_count: 1, + }), } } } diff --git a/rust/src/global_index.rs b/rust/src/global_index.rs index a48ea8b..0d075a4 100644 --- a/rust/src/global_index.rs +++ b/rust/src/global_index.rs @@ -18,6 +18,8 @@ pub struct IndexEntry { pub offset: u64, pub compressed_size: u32, pub plaintext_size: u32, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub nonce: Option, } /// An index segment stored on disk. @@ -185,11 +187,17 @@ impl GlobalIndex { for entry in entries { let hash_hex = hasher::hash_to_hex(&entry.content_hash); + let nonce = if entry.nonce != [0u8; 12] { + Some(hex::encode(entry.nonce)) + } else { + None + }; index.entries.insert(hash_hex, IndexEntry { pack_id: pack_id.clone(), offset: entry.offset, compressed_size: entry.compressed_size, plaintext_size: entry.plaintext_size, + nonce, }); } } diff --git a/rust/src/ingest.rs b/rust/src/ingest.rs index 2cbb0bd..fd5e784 100644 --- a/rust/src/ingest.rs +++ b/rust/src/ingest.rs @@ -13,6 +13,7 @@ use crate::chunker::{FastCdc, StreamingChunker}; use crate::compression; use crate::encryption; use crate::error::ArchiveError; +use crate::parity::{ParityManager, ParityConfig}; use crate::global_index::IndexEntry; use crate::hasher; use crate::pack_writer::{PackWriter, FLAG_GZIP, FLAG_ENCRYPTED}; @@ -39,6 +40,7 @@ struct PendingChunk { offset: u64, compressed_size: u32, plaintext_size: u32, + nonce: Option, } /// Run the single-item ingest pipeline. @@ -84,11 +86,11 @@ async fn do_ingest( ) -> Result { let mut pack_writer = PackWriter::new(repo.config.pack_target_size); let mut pending_chunks: Vec = Vec::new(); + let mut parity_mgr = create_parity_manager(repo); let mut total_stored_size: u64 = 0; let mut total_new_chunks: u64 = 0; let mut total_reused_chunks: u64 = 0; - // Ingest the single stream let item_name = items.first() .map(|i| i.name.clone()) .unwrap_or_else(|| "data".to_string()); @@ -100,6 +102,7 @@ async fn do_ingest( repo, &mut pack_writer, &mut pending_chunks, + &mut parity_mgr, socket_path, &item_name, &item_type, @@ -108,12 +111,10 @@ async fn do_ingest( &mut total_reused_chunks, ).await?; - // Finalize any remaining pack if !pack_writer.is_empty() { - finalize_pack(repo, &mut pack_writer, &mut pending_chunks).await?; + finalize_pack(repo, &mut pack_writer, &mut pending_chunks, &mut parity_mgr).await?; } - // Save index repo.index.save_segment(&repo.path).await?; let total_original_size = snapshot_item.size; @@ -143,6 +144,7 @@ async fn do_ingest_multi( ) -> Result { let mut pack_writer = PackWriter::new(repo.config.pack_target_size); let mut pending_chunks: Vec = Vec::new(); + let mut parity_mgr = create_parity_manager(repo); let mut total_original_size: u64 = 0; let mut total_stored_size: u64 = 0; let mut total_new_chunks: u64 = 0; @@ -158,6 +160,7 @@ async fn do_ingest_multi( repo, &mut pack_writer, &mut pending_chunks, + &mut parity_mgr, socket_path, &item.name, &item.item_type, @@ -170,12 +173,10 @@ async fn do_ingest_multi( snapshot_items.push(snapshot_item); } - // Finalize any remaining pack if !pack_writer.is_empty() { - finalize_pack(repo, &mut pack_writer, &mut pending_chunks).await?; + finalize_pack(repo, &mut pack_writer, &mut pending_chunks, &mut parity_mgr).await?; } - // Save index repo.index.save_segment(&repo.path).await?; let snapshot = Snapshot::new( @@ -202,6 +203,7 @@ async fn ingest_stream( repo: &mut Repository, pack_writer: &mut PackWriter, pending_chunks: &mut Vec, + parity_mgr: &mut Option, socket_path: &str, item_name: &str, item_type: &str, @@ -240,6 +242,7 @@ async fn ingest_stream( repo, pack_writer, pending_chunks, + parity_mgr, &chunk_data, &mut chunk_hashes, total_new_chunks, @@ -254,6 +257,7 @@ async fn ingest_stream( repo, pack_writer, pending_chunks, + parity_mgr, &final_chunk, &mut chunk_hashes, total_new_chunks, @@ -274,6 +278,7 @@ async fn process_chunk( repo: &mut Repository, pack_writer: &mut PackWriter, pending_chunks: &mut Vec, + parity_mgr: &mut Option, chunk_data: &[u8], chunk_hashes: &mut Vec, new_chunks: &mut u64, @@ -310,17 +315,24 @@ async fn process_chunk( .map(|e| e.compressed_size as u64) .sum::(); + let nonce_hex = if nonce != [0u8; 12] { + Some(hex::encode(nonce)) + } else { + None + }; + pending_chunks.push(PendingChunk { hash_hex: hash_hex.clone(), offset, compressed_size, plaintext_size, + nonce: nonce_hex, }); pack_writer.add_chunk(hash, &stored_data, plaintext_size, nonce, flags); if pack_writer.should_finalize() { - finalize_pack(repo, pack_writer, pending_chunks).await?; + finalize_pack(repo, pack_writer, pending_chunks, parity_mgr).await?; } Ok(()) @@ -330,6 +342,7 @@ async fn finalize_pack( repo: &mut Repository, pack_writer: &mut PackWriter, pending_chunks: &mut Vec, + parity_mgr: &mut Option, ) -> Result<(), ArchiveError> { let pack_info = pack_writer.finalize(&repo.path).await?; @@ -339,9 +352,19 @@ async fn finalize_pack( offset: pending.offset, compressed_size: pending.compressed_size, plaintext_size: pending.plaintext_size, + nonce: pending.nonce, }); } + // Register pack with parity manager for RS parity generation + if let Some(ref mut mgr) = parity_mgr { + if let Some(group_id) = mgr.register_pack( + &repo.path, &pack_info.pack_id, pack_info.data_size, + ).await? { + tracing::info!("Generated parity group {}", group_id); + } + } + tracing::info!( "Finalized pack {} ({} chunks, {} bytes)", pack_info.pack_id, pack_info.chunk_count, pack_info.data_size @@ -349,3 +372,13 @@ async fn finalize_pack( Ok(()) } + +/// Create a ParityManager from the repository's parity config, if configured. +fn create_parity_manager(repo: &Repository) -> Option { + repo.config.parity.as_ref().map(|parity_spec| { + ParityManager::new(ParityConfig { + data_count: parity_spec.data_count, + parity_count: parity_spec.parity_count, + }) + }) +} diff --git a/rust/src/restore.rs b/rust/src/restore.rs index 69f60c1..bd60be8 100644 --- a/rust/src/restore.rs +++ b/rust/src/restore.rs @@ -1,6 +1,7 @@ /// Restore pipeline: reads a snapshot manifest, looks up chunks in the global /// index, reads from pack files, decrypts, decompresses, and writes to a Unix socket. +use std::collections::HashMap; use tokio::io::AsyncWriteExt; use tokio::net::UnixStream; @@ -9,6 +10,7 @@ use crate::encryption; use crate::error::ArchiveError; use crate::hasher; use crate::pack_reader; +use crate::pack_writer::IdxEntry; use crate::repository::Repository; use crate::snapshot; @@ -45,6 +47,9 @@ pub async fn restore( tracing::info!("Connected to restore socket: {}", socket_path); + // Cache loaded IDX entries per pack to avoid re-reading + let mut idx_cache: HashMap> = HashMap::new(); + let mut restored_bytes: u64 = 0; let mut chunks_read: u64 = 0; @@ -73,23 +78,41 @@ pub async fn restore( // Decrypt if encrypted let compressed = if let Some(ref key) = repo.master_key { - // We need the nonce. Read it from the IDX file. - let idx_path = std::path::Path::new(&repo.path) - .join("packs") - .join("data") - .join(shard) - .join(format!("{}.idx", index_entry.pack_id)); + // Try to get nonce from the global index first (fast path) + let nonce = if let Some(ref nonce_hex) = index_entry.nonce { + let nonce_bytes = hex::decode(nonce_hex) + .map_err(|_| ArchiveError::Corruption(format!("Invalid nonce hex: {}", nonce_hex)))?; + let mut n = [0u8; 12]; + if nonce_bytes.len() >= 12 { + n.copy_from_slice(&nonce_bytes[..12]); + } + n + } else { + // Fallback: read from IDX file (cached) + let entries = if let Some(cached) = idx_cache.get(&index_entry.pack_id) { + cached + } else { + let idx_path = std::path::Path::new(&repo.path) + .join("packs") + .join("data") + .join(shard) + .join(format!("{}.idx", index_entry.pack_id)); + let loaded = pack_reader::load_idx(&idx_path).await?; + idx_cache.insert(index_entry.pack_id.clone(), loaded); + idx_cache.get(&index_entry.pack_id).unwrap() + }; - let entries = pack_reader::load_idx(&idx_path).await?; - let hash_bytes = hasher::hex_to_hash(hash_hex) - .map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?; + let hash_bytes = hasher::hex_to_hash(hash_hex) + .map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?; - let idx_entry = pack_reader::find_in_idx(&entries, &hash_bytes) - .ok_or_else(|| ArchiveError::NotFound(format!( - "Chunk {} not found in pack index {}", hash_hex, index_entry.pack_id - )))?; + let idx_entry = pack_reader::find_in_idx(entries, &hash_bytes) + .ok_or_else(|| ArchiveError::NotFound(format!( + "Chunk {} not found in pack index {}", hash_hex, index_entry.pack_id + )))?; + idx_entry.nonce + }; - encryption::decrypt_chunk(&stored_data, key, &idx_entry.nonce)? + encryption::decrypt_chunk(&stored_data, key, &nonce)? } else { stored_data };