feat: wire parity into ingest pipeline, optimize restore with nonce caching

- Parity generation auto-triggers after every N packs during ingest
- ParityConfig stored in repository config.json
- Nonce stored in global index entries, eliminating IDX re-reads during
  encrypted restore (fast path) with IDX cache fallback
- Repair now attempts parity-based pack reconstruction before reindexing
This commit is contained in:
2026-03-22 00:14:17 +00:00
parent ca510f4578
commit 66aa43494e
4 changed files with 101 additions and 22 deletions

View File

@@ -11,6 +11,16 @@ pub struct RepositoryConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub encryption: Option<EncryptionConfig>,
pub pack_target_size: u64,
#[serde(default)]
pub parity: Option<ParityConfigSpec>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParityConfigSpec {
pub algorithm: String,
pub data_count: usize,
pub parity_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -92,6 +102,11 @@ impl RepositoryConfig {
compression: "gzip".to_string(),
encryption,
pack_target_size: 8 * 1024 * 1024, // 8 MB
parity: Some(ParityConfigSpec {
algorithm: "reed-solomon".to_string(),
data_count: 20,
parity_count: 1,
}),
}
}
}

View File

@@ -18,6 +18,8 @@ pub struct IndexEntry {
pub offset: u64,
pub compressed_size: u32,
pub plaintext_size: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nonce: Option<String>,
}
/// An index segment stored on disk.
@@ -185,11 +187,17 @@ impl GlobalIndex {
for entry in entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
let nonce = if entry.nonce != [0u8; 12] {
Some(hex::encode(entry.nonce))
} else {
None
};
index.entries.insert(hash_hex, IndexEntry {
pack_id: pack_id.clone(),
offset: entry.offset,
compressed_size: entry.compressed_size,
plaintext_size: entry.plaintext_size,
nonce,
});
}
}

View File

@@ -13,6 +13,7 @@ use crate::chunker::{FastCdc, StreamingChunker};
use crate::compression;
use crate::encryption;
use crate::error::ArchiveError;
use crate::parity::{ParityManager, ParityConfig};
use crate::global_index::IndexEntry;
use crate::hasher;
use crate::pack_writer::{PackWriter, FLAG_GZIP, FLAG_ENCRYPTED};
@@ -39,6 +40,7 @@ struct PendingChunk {
offset: u64,
compressed_size: u32,
plaintext_size: u32,
nonce: Option<String>,
}
/// Run the single-item ingest pipeline.
@@ -84,11 +86,11 @@ async fn do_ingest(
) -> Result<Snapshot, ArchiveError> {
let mut pack_writer = PackWriter::new(repo.config.pack_target_size);
let mut pending_chunks: Vec<PendingChunk> = Vec::new();
let mut parity_mgr = create_parity_manager(repo);
let mut total_stored_size: u64 = 0;
let mut total_new_chunks: u64 = 0;
let mut total_reused_chunks: u64 = 0;
// Ingest the single stream
let item_name = items.first()
.map(|i| i.name.clone())
.unwrap_or_else(|| "data".to_string());
@@ -100,6 +102,7 @@ async fn do_ingest(
repo,
&mut pack_writer,
&mut pending_chunks,
&mut parity_mgr,
socket_path,
&item_name,
&item_type,
@@ -108,12 +111,10 @@ async fn do_ingest(
&mut total_reused_chunks,
).await?;
// Finalize any remaining pack
if !pack_writer.is_empty() {
finalize_pack(repo, &mut pack_writer, &mut pending_chunks).await?;
finalize_pack(repo, &mut pack_writer, &mut pending_chunks, &mut parity_mgr).await?;
}
// Save index
repo.index.save_segment(&repo.path).await?;
let total_original_size = snapshot_item.size;
@@ -143,6 +144,7 @@ async fn do_ingest_multi(
) -> Result<Snapshot, ArchiveError> {
let mut pack_writer = PackWriter::new(repo.config.pack_target_size);
let mut pending_chunks: Vec<PendingChunk> = Vec::new();
let mut parity_mgr = create_parity_manager(repo);
let mut total_original_size: u64 = 0;
let mut total_stored_size: u64 = 0;
let mut total_new_chunks: u64 = 0;
@@ -158,6 +160,7 @@ async fn do_ingest_multi(
repo,
&mut pack_writer,
&mut pending_chunks,
&mut parity_mgr,
socket_path,
&item.name,
&item.item_type,
@@ -170,12 +173,10 @@ async fn do_ingest_multi(
snapshot_items.push(snapshot_item);
}
// Finalize any remaining pack
if !pack_writer.is_empty() {
finalize_pack(repo, &mut pack_writer, &mut pending_chunks).await?;
finalize_pack(repo, &mut pack_writer, &mut pending_chunks, &mut parity_mgr).await?;
}
// Save index
repo.index.save_segment(&repo.path).await?;
let snapshot = Snapshot::new(
@@ -202,6 +203,7 @@ async fn ingest_stream(
repo: &mut Repository,
pack_writer: &mut PackWriter,
pending_chunks: &mut Vec<PendingChunk>,
parity_mgr: &mut Option<ParityManager>,
socket_path: &str,
item_name: &str,
item_type: &str,
@@ -240,6 +242,7 @@ async fn ingest_stream(
repo,
pack_writer,
pending_chunks,
parity_mgr,
&chunk_data,
&mut chunk_hashes,
total_new_chunks,
@@ -254,6 +257,7 @@ async fn ingest_stream(
repo,
pack_writer,
pending_chunks,
parity_mgr,
&final_chunk,
&mut chunk_hashes,
total_new_chunks,
@@ -274,6 +278,7 @@ async fn process_chunk(
repo: &mut Repository,
pack_writer: &mut PackWriter,
pending_chunks: &mut Vec<PendingChunk>,
parity_mgr: &mut Option<ParityManager>,
chunk_data: &[u8],
chunk_hashes: &mut Vec<String>,
new_chunks: &mut u64,
@@ -310,17 +315,24 @@ async fn process_chunk(
.map(|e| e.compressed_size as u64)
.sum::<u64>();
let nonce_hex = if nonce != [0u8; 12] {
Some(hex::encode(nonce))
} else {
None
};
pending_chunks.push(PendingChunk {
hash_hex: hash_hex.clone(),
offset,
compressed_size,
plaintext_size,
nonce: nonce_hex,
});
pack_writer.add_chunk(hash, &stored_data, plaintext_size, nonce, flags);
if pack_writer.should_finalize() {
finalize_pack(repo, pack_writer, pending_chunks).await?;
finalize_pack(repo, pack_writer, pending_chunks, parity_mgr).await?;
}
Ok(())
@@ -330,6 +342,7 @@ async fn finalize_pack(
repo: &mut Repository,
pack_writer: &mut PackWriter,
pending_chunks: &mut Vec<PendingChunk>,
parity_mgr: &mut Option<ParityManager>,
) -> Result<(), ArchiveError> {
let pack_info = pack_writer.finalize(&repo.path).await?;
@@ -339,9 +352,19 @@ async fn finalize_pack(
offset: pending.offset,
compressed_size: pending.compressed_size,
plaintext_size: pending.plaintext_size,
nonce: pending.nonce,
});
}
// Register pack with parity manager for RS parity generation
if let Some(ref mut mgr) = parity_mgr {
if let Some(group_id) = mgr.register_pack(
&repo.path, &pack_info.pack_id, pack_info.data_size,
).await? {
tracing::info!("Generated parity group {}", group_id);
}
}
tracing::info!(
"Finalized pack {} ({} chunks, {} bytes)",
pack_info.pack_id, pack_info.chunk_count, pack_info.data_size
@@ -349,3 +372,13 @@ async fn finalize_pack(
Ok(())
}
/// Create a ParityManager from the repository's parity config, if configured.
fn create_parity_manager(repo: &Repository) -> Option<ParityManager> {
repo.config.parity.as_ref().map(|parity_spec| {
ParityManager::new(ParityConfig {
data_count: parity_spec.data_count,
parity_count: parity_spec.parity_count,
})
})
}

View File

@@ -1,6 +1,7 @@
/// Restore pipeline: reads a snapshot manifest, looks up chunks in the global
/// index, reads from pack files, decrypts, decompresses, and writes to a Unix socket.
use std::collections::HashMap;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
@@ -9,6 +10,7 @@ use crate::encryption;
use crate::error::ArchiveError;
use crate::hasher;
use crate::pack_reader;
use crate::pack_writer::IdxEntry;
use crate::repository::Repository;
use crate::snapshot;
@@ -45,6 +47,9 @@ pub async fn restore(
tracing::info!("Connected to restore socket: {}", socket_path);
// Cache loaded IDX entries per pack to avoid re-reading
let mut idx_cache: HashMap<String, Vec<IdxEntry>> = HashMap::new();
let mut restored_bytes: u64 = 0;
let mut chunks_read: u64 = 0;
@@ -73,23 +78,41 @@ pub async fn restore(
// Decrypt if encrypted
let compressed = if let Some(ref key) = repo.master_key {
// We need the nonce. Read it from the IDX file.
let idx_path = std::path::Path::new(&repo.path)
.join("packs")
.join("data")
.join(shard)
.join(format!("{}.idx", index_entry.pack_id));
// Try to get nonce from the global index first (fast path)
let nonce = if let Some(ref nonce_hex) = index_entry.nonce {
let nonce_bytes = hex::decode(nonce_hex)
.map_err(|_| ArchiveError::Corruption(format!("Invalid nonce hex: {}", nonce_hex)))?;
let mut n = [0u8; 12];
if nonce_bytes.len() >= 12 {
n.copy_from_slice(&nonce_bytes[..12]);
}
n
} else {
// Fallback: read from IDX file (cached)
let entries = if let Some(cached) = idx_cache.get(&index_entry.pack_id) {
cached
} else {
let idx_path = std::path::Path::new(&repo.path)
.join("packs")
.join("data")
.join(shard)
.join(format!("{}.idx", index_entry.pack_id));
let loaded = pack_reader::load_idx(&idx_path).await?;
idx_cache.insert(index_entry.pack_id.clone(), loaded);
idx_cache.get(&index_entry.pack_id).unwrap()
};
let entries = pack_reader::load_idx(&idx_path).await?;
let hash_bytes = hasher::hex_to_hash(hash_hex)
.map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?;
let hash_bytes = hasher::hex_to_hash(hash_hex)
.map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?;
let idx_entry = pack_reader::find_in_idx(&entries, &hash_bytes)
.ok_or_else(|| ArchiveError::NotFound(format!(
"Chunk {} not found in pack index {}", hash_hex, index_entry.pack_id
)))?;
let idx_entry = pack_reader::find_in_idx(entries, &hash_bytes)
.ok_or_else(|| ArchiveError::NotFound(format!(
"Chunk {} not found in pack index {}", hash_hex, index_entry.pack_id
)))?;
idx_entry.nonce
};
encryption::decrypt_chunk(&stored_data, key, &idx_entry.nonce)?
encryption::decrypt_chunk(&stored_data, key, &nonce)?
} else {
stored_data
};