feat: add multi-item ingest and Reed-Solomon parity

- Multi-item ingest: each item gets its own Unix socket, Rust processes
  them sequentially into a single snapshot with separate chunk lists
- Reed-Solomon parity: rs(20,1) erasure coding for pack file groups,
  enabling single-pack-loss recovery via parity reconstruction
- Repair now attempts parity-based recovery for missing pack files
- 16 integration tests + 12 Rust unit tests all pass
This commit is contained in:
2026-03-21 23:46:29 +00:00
parent a5849791d2
commit ca510f4578
10 changed files with 830 additions and 115 deletions

View File

@@ -1,5 +1,8 @@
/// Ingest pipeline: reads data from a Unix socket, chunks it with FastCDC,
/// Ingest pipeline: reads data from Unix socket(s), chunks with FastCDC,
/// deduplicates, compresses, optionally encrypts, and writes to pack files.
///
/// Supports single-item ingest (one socket) and multi-item ingest
/// (multiple sockets, one per item) producing a single snapshot.
use std::collections::HashMap;
use tokio::io::AsyncReadExt;
@@ -22,6 +25,8 @@ pub struct IngestItemOptions {
pub name: String,
#[serde(rename = "type", default = "default_item_type")]
pub item_type: String,
#[serde(default)]
pub socket_path: Option<String>,
}
fn default_item_type() -> String {
@@ -36,7 +41,7 @@ struct PendingChunk {
plaintext_size: u32,
}
/// Run the ingest pipeline.
/// Run the single-item ingest pipeline.
pub async fn ingest(
repo: &mut Repository,
socket_path: &str,
@@ -54,79 +59,115 @@ pub async fn ingest(
result
}
/// Run the multi-item ingest pipeline.
pub async fn ingest_multi(
repo: &mut Repository,
tags: HashMap<String, String>,
items: Vec<IngestItemOptions>,
) -> Result<Snapshot, ArchiveError> {
// Acquire write lock
repo.acquire_lock("ingest").await?;
let result = do_ingest_multi(repo, tags, items).await;
// Always release lock
repo.release_lock().await?;
result
}
async fn do_ingest(
repo: &mut Repository,
socket_path: &str,
tags: HashMap<String, String>,
items: Vec<IngestItemOptions>,
) -> Result<Snapshot, ArchiveError> {
// Connect to the Unix socket where TypeScript is writing the data
let mut stream = UnixStream::connect(socket_path).await
.map_err(|e| ArchiveError::Io(e))?;
tracing::info!("Connected to ingest socket: {}", socket_path);
// Set up chunker
let cdc = FastCdc::new(
repo.config.chunking.min_size as usize,
repo.config.chunking.avg_size as usize,
repo.config.chunking.max_size as usize,
);
let mut chunker = StreamingChunker::new(cdc);
// Set up pack writer
let mut pack_writer = PackWriter::new(repo.config.pack_target_size);
// Track pending chunks for the current pack (to build index entries after finalize)
let mut pending_chunks: Vec<PendingChunk> = Vec::new();
// Stats
let mut total_original_size: u64 = 0;
let mut total_stored_size: u64 = 0;
let mut new_chunks: u64 = 0;
let mut reused_chunks: u64 = 0;
let mut chunk_hashes: Vec<String> = Vec::new();
let mut total_new_chunks: u64 = 0;
let mut total_reused_chunks: u64 = 0;
// Read data from socket in chunks
let mut read_buf = vec![0u8; 256 * 1024]; // 256KB read buffer
// Ingest the single stream
let item_name = items.first()
.map(|i| i.name.clone())
.unwrap_or_else(|| "data".to_string());
let item_type = items.first()
.map(|i| i.item_type.clone())
.unwrap_or_else(|| "data".to_string());
loop {
let n = stream.read(&mut read_buf).await?;
if n == 0 {
break; // EOF
}
let snapshot_item = ingest_stream(
repo,
&mut pack_writer,
&mut pending_chunks,
socket_path,
&item_name,
&item_type,
&mut total_stored_size,
&mut total_new_chunks,
&mut total_reused_chunks,
).await?;
total_original_size += n as u64;
let data = &read_buf[..n];
// Feed into chunker
let chunks = chunker.feed(data);
for chunk_data in chunks {
process_chunk(
repo,
&mut pack_writer,
&mut pending_chunks,
&chunk_data,
&mut chunk_hashes,
&mut new_chunks,
&mut reused_chunks,
&mut total_stored_size,
).await?;
}
// Finalize any remaining pack
if !pack_writer.is_empty() {
finalize_pack(repo, &mut pack_writer, &mut pending_chunks).await?;
}
// Finalize chunker — get any remaining data
if let Some(final_chunk) = chunker.finalize() {
process_chunk(
// Save index
repo.index.save_segment(&repo.path).await?;
let total_original_size = snapshot_item.size;
let snapshot = Snapshot::new(
vec![snapshot_item],
tags,
total_original_size,
total_stored_size,
total_new_chunks,
total_reused_chunks,
);
save_snapshot(&repo.path, &snapshot).await?;
tracing::info!(
"Ingest complete: {} bytes original, {} bytes stored, {} new chunks, {} reused",
total_original_size, total_stored_size, total_new_chunks, total_reused_chunks
);
Ok(snapshot)
}
async fn do_ingest_multi(
repo: &mut Repository,
tags: HashMap<String, String>,
items: Vec<IngestItemOptions>,
) -> Result<Snapshot, ArchiveError> {
let mut pack_writer = PackWriter::new(repo.config.pack_target_size);
let mut pending_chunks: Vec<PendingChunk> = Vec::new();
let mut total_original_size: u64 = 0;
let mut total_stored_size: u64 = 0;
let mut total_new_chunks: u64 = 0;
let mut total_reused_chunks: u64 = 0;
let mut snapshot_items: Vec<SnapshotItem> = Vec::new();
for item in &items {
let socket_path = item.socket_path.as_deref().ok_or_else(|| {
ArchiveError::Config(format!("Item '{}' missing socketPath for multi-item ingest", item.name))
})?;
let snapshot_item = ingest_stream(
repo,
&mut pack_writer,
&mut pending_chunks,
&final_chunk,
&mut chunk_hashes,
&mut new_chunks,
&mut reused_chunks,
socket_path,
&item.name,
&item.item_type,
&mut total_stored_size,
&mut total_new_chunks,
&mut total_reused_chunks,
).await?;
total_original_size += snapshot_item.size;
snapshot_items.push(snapshot_item);
}
// Finalize any remaining pack
@@ -137,40 +178,98 @@ async fn do_ingest(
// Save index
repo.index.save_segment(&repo.path).await?;
// Build snapshot
let item_name = items.first()
.map(|i| i.name.clone())
.unwrap_or_else(|| "data".to_string());
let item_type = items.first()
.map(|i| i.item_type.clone())
.unwrap_or_else(|| "data".to_string());
let snapshot_items = vec![SnapshotItem {
name: item_name,
item_type,
size: total_original_size,
chunks: chunk_hashes,
}];
let snapshot = Snapshot::new(
snapshot_items,
tags,
total_original_size,
total_stored_size,
new_chunks,
reused_chunks,
total_new_chunks,
total_reused_chunks,
);
save_snapshot(&repo.path, &snapshot).await?;
tracing::info!(
"Ingest complete: {} bytes original, {} bytes stored, {} new chunks, {} reused",
total_original_size, total_stored_size, new_chunks, reused_chunks
"Multi-item ingest complete: {} items, {} bytes original, {} bytes stored",
items.len(), total_original_size, total_stored_size
);
Ok(snapshot)
}
/// Ingest a single stream from a socket path, returning a SnapshotItem.
async fn ingest_stream(
repo: &mut Repository,
pack_writer: &mut PackWriter,
pending_chunks: &mut Vec<PendingChunk>,
socket_path: &str,
item_name: &str,
item_type: &str,
total_stored_size: &mut u64,
total_new_chunks: &mut u64,
total_reused_chunks: &mut u64,
) -> Result<SnapshotItem, ArchiveError> {
let mut stream = UnixStream::connect(socket_path).await
.map_err(|e| ArchiveError::Io(e))?;
tracing::info!("Connected to ingest socket for item '{}': {}", item_name, socket_path);
let cdc = FastCdc::new(
repo.config.chunking.min_size as usize,
repo.config.chunking.avg_size as usize,
repo.config.chunking.max_size as usize,
);
let mut chunker = StreamingChunker::new(cdc);
let mut item_size: u64 = 0;
let mut chunk_hashes: Vec<String> = Vec::new();
let mut read_buf = vec![0u8; 256 * 1024];
loop {
let n = stream.read(&mut read_buf).await?;
if n == 0 {
break;
}
item_size += n as u64;
let data = &read_buf[..n];
let chunks = chunker.feed(data);
for chunk_data in chunks {
process_chunk(
repo,
pack_writer,
pending_chunks,
&chunk_data,
&mut chunk_hashes,
total_new_chunks,
total_reused_chunks,
total_stored_size,
).await?;
}
}
if let Some(final_chunk) = chunker.finalize() {
process_chunk(
repo,
pack_writer,
pending_chunks,
&final_chunk,
&mut chunk_hashes,
total_new_chunks,
total_reused_chunks,
total_stored_size,
).await?;
}
Ok(SnapshotItem {
name: item_name.to_string(),
item_type: item_type.to_string(),
size: item_size,
chunks: chunk_hashes,
})
}
async fn process_chunk(
repo: &mut Repository,
pack_writer: &mut PackWriter,
@@ -181,24 +280,20 @@ async fn process_chunk(
reused_chunks: &mut u64,
total_stored_size: &mut u64,
) -> Result<(), ArchiveError> {
// Hash the plaintext chunk
let hash = hasher::hash_chunk(chunk_data);
let hash_hex = hasher::hash_to_hex(&hash);
chunk_hashes.push(hash_hex.clone());
// Dedup check
if repo.index.has(&hash_hex) {
*reused_chunks += 1;
return Ok(());
}
// New chunk: compress
let compressed = compression::compress(chunk_data)?;
let mut flags = FLAG_GZIP;
let plaintext_size = chunk_data.len() as u32;
// Optionally encrypt
let (stored_data, nonce) = if let Some(ref key) = repo.master_key {
let encrypted = encryption::encrypt_chunk(&compressed, key)?;
flags |= FLAG_ENCRYPTED;
@@ -211,8 +306,6 @@ async fn process_chunk(
*total_stored_size += stored_data.len() as u64;
*new_chunks += 1;
// Track the pending chunk for index building
// The offset is the current position in the pack buffer
let offset = pack_writer.entries().iter()
.map(|e| e.compressed_size as u64)
.sum::<u64>();
@@ -224,10 +317,8 @@ async fn process_chunk(
plaintext_size,
});
// Add to pack writer
pack_writer.add_chunk(hash, &stored_data, plaintext_size, nonce, flags);
// If pack is full, finalize it
if pack_writer.should_finalize() {
finalize_pack(repo, pack_writer, pending_chunks).await?;
}
@@ -235,7 +326,6 @@ async fn process_chunk(
Ok(())
}
/// Finalize the current pack and add its entries to the global index.
async fn finalize_pack(
repo: &mut Repository,
pack_writer: &mut PackWriter,
@@ -243,7 +333,6 @@ async fn finalize_pack(
) -> Result<(), ArchiveError> {
let pack_info = pack_writer.finalize(&repo.path).await?;
// Now we know the pack_id — add all pending chunks to the global index
for pending in pending_chunks.drain(..) {
repo.index.add_entry(pending.hash_hex, IndexEntry {
pack_id: pack_info.pack_id.clone(),

View File

@@ -17,6 +17,7 @@ mod ingest;
mod restore;
mod verify;
mod prune;
mod parity;
mod repair;
#[derive(Parser, Debug)]

View File

@@ -90,6 +90,7 @@ async fn handle_request(req: &Request, repo: &mut Option<Repository>) -> Respons
"open" => handle_open(req, repo).await,
"close" => handle_close(req, repo).await,
"ingest" => handle_ingest(req, repo).await,
"ingestMulti" => handle_ingest_multi(req, repo).await,
"restore" => handle_restore(req, repo).await,
"listSnapshots" => handle_list_snapshots(req, repo).await,
"getSnapshot" => handle_get_snapshot(req, repo).await,
@@ -219,6 +220,43 @@ async fn handle_ingest(req: &Request, repo: &mut Option<Repository>) -> Response
}
}
async fn handle_ingest_multi(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_mut() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let tags: std::collections::HashMap<String, String> = req.params.get("tags")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let items: Vec<crate::ingest::IngestItemOptions> = req.params.get("items")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
match crate::ingest::ingest_multi(repo, tags, items).await {
Ok(snapshot) => {
let result = serde_json::to_value(&snapshot).unwrap_or(Value::Null);
Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({ "snapshot": result })),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_restore(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_ref() {
Some(r) => r,

335
rust/src/parity.rs Normal file
View File

@@ -0,0 +1,335 @@
/// Reed-Solomon parity for pack files.
///
/// Every group of N data packs generates M parity packs using RS erasure coding.
/// Default RS(20,1): any single lost or corrupted pack in a group of 20 can be
/// fully reconstructed from the remaining 19 data packs + 1 parity pack.
use std::path::{Path, PathBuf};
use reed_solomon_erasure::galois_8::ReedSolomon;
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
/// Default parity configuration.
pub const DEFAULT_DATA_SHARDS: usize = 20;
pub const DEFAULT_PARITY_SHARDS: usize = 1;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParityConfig {
pub data_count: usize,
pub parity_count: usize,
}
impl Default for ParityConfig {
fn default() -> Self {
Self {
data_count: DEFAULT_DATA_SHARDS,
parity_count: DEFAULT_PARITY_SHARDS,
}
}
}
/// Manifest for a parity group.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParityGroupManifest {
pub group_id: String,
pub created_at: String,
pub algorithm: String,
pub data_count: usize,
pub parity_count: usize,
pub data_packs: Vec<ParityPackRef>,
pub parity_size: u64,
pub shard_size: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ParityPackRef {
pub id: String,
pub size: u64,
}
/// Manager for Reed-Solomon parity operations.
pub struct ParityManager {
config: ParityConfig,
/// Pack IDs that haven't been grouped yet
pending_packs: Vec<ParityPackRef>,
}
impl ParityManager {
pub fn new(config: ParityConfig) -> Self {
Self {
config,
pending_packs: Vec::new(),
}
}
/// Register a newly written pack. When enough packs accumulate, generates parity.
pub async fn register_pack(
&mut self,
repo_path: &str,
pack_id: &str,
pack_size: u64,
) -> Result<Option<String>, ArchiveError> {
self.pending_packs.push(ParityPackRef {
id: pack_id.to_string(),
size: pack_size,
});
if self.pending_packs.len() >= self.config.data_count {
let group_id = self.generate_parity(repo_path).await?;
Ok(Some(group_id))
} else {
Ok(None)
}
}
/// Generate parity for the current batch of pending packs.
async fn generate_parity(&mut self, repo_path: &str) -> Result<String, ArchiveError> {
let data_packs: Vec<_> = self.pending_packs.drain(..self.config.data_count).collect();
let rs = ReedSolomon::new(self.config.data_count, self.config.parity_count)
.map_err(|e| ArchiveError::Other(format!("RS init failed: {:?}", e)))?;
// Read all data packs and pad to the same size
let max_size = data_packs.iter().map(|p| p.size).max().unwrap_or(0) as usize;
let mut shards: Vec<Vec<u8>> = Vec::with_capacity(
self.config.data_count + self.config.parity_count,
);
// Read data shards (pack files)
for pack_ref in &data_packs {
let pack_path = resolve_pack_path(repo_path, &pack_ref.id);
let mut data = tokio::fs::read(&pack_path).await
.map_err(|e| ArchiveError::Io(e))?;
// Pad to max_size
data.resize(max_size, 0);
shards.push(data);
}
// Add empty parity shards
for _ in 0..self.config.parity_count {
shards.push(vec![0u8; max_size]);
}
// Encode — generates parity data in-place
let mut shard_refs: Vec<&mut [u8]> = shards.iter_mut()
.map(|s| s.as_mut_slice())
.collect();
rs.encode(&mut shard_refs)
.map_err(|e| ArchiveError::Other(format!("RS encode failed: {:?}", e)))?;
// Write parity pack(s)
let group_id = uuid::Uuid::new_v4().to_string().replace("-", "");
let shard_prefix = &group_id[..2];
let parity_dir = Path::new(repo_path).join("packs").join("parity").join(shard_prefix);
tokio::fs::create_dir_all(&parity_dir).await?;
// Write parity data (last parity_count shards)
let parity_data = &shards[self.config.data_count..];
for (i, parity_shard) in parity_data.iter().enumerate() {
let suffix = if self.config.parity_count > 1 {
format!(".{}.par", i)
} else {
".par".to_string()
};
let par_path = parity_dir.join(format!("{}{}", group_id, suffix));
tokio::fs::write(&par_path, parity_shard).await?;
}
// Write parity group manifest
let manifest = ParityGroupManifest {
group_id: group_id.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
algorithm: "reed-solomon".to_string(),
data_count: self.config.data_count,
parity_count: self.config.parity_count,
data_packs,
parity_size: max_size as u64,
shard_size: max_size as u64,
};
let manifest_path = parity_dir.join(format!("{}.parx", group_id));
let json = serde_json::to_string_pretty(&manifest)?;
tokio::fs::write(&manifest_path, json).await?;
tracing::info!(
"Generated RS({},{}) parity group {} ({} bytes per shard)",
self.config.data_count, self.config.parity_count,
group_id, max_size
);
Ok(group_id)
}
/// Get the number of packs pending parity generation.
pub fn pending_count(&self) -> usize {
self.pending_packs.len()
}
}
/// Attempt to reconstruct a missing/corrupted pack from its parity group.
pub async fn repair_from_parity(
repo_path: &str,
pack_id: &str,
) -> Result<bool, ArchiveError> {
// Find the parity group containing this pack
let manifest = find_parity_group(repo_path, pack_id).await?;
let manifest = match manifest {
Some(m) => m,
None => {
tracing::warn!("No parity group found for pack {}", pack_id);
return Ok(false);
}
};
let rs = ReedSolomon::new(manifest.data_count, manifest.parity_count)
.map_err(|e| ArchiveError::Other(format!("RS init failed: {:?}", e)))?;
let shard_size = manifest.shard_size as usize;
let total_shards = manifest.data_count + manifest.parity_count;
// Load all shards (data packs + parity packs)
let mut shards: Vec<Option<Vec<u8>>> = Vec::with_capacity(total_shards);
let mut missing_indices = Vec::new();
// Load data shards
for (i, pack_ref) in manifest.data_packs.iter().enumerate() {
let pack_path = resolve_pack_path(repo_path, &pack_ref.id);
if pack_path.exists() {
let mut data = tokio::fs::read(&pack_path).await?;
data.resize(shard_size, 0);
shards.push(Some(data));
} else {
shards.push(Some(vec![0u8; shard_size])); // placeholder
missing_indices.push(i);
}
}
// Load parity shards
let shard_prefix = &manifest.group_id[..2];
for i in 0..manifest.parity_count {
let suffix = if manifest.parity_count > 1 {
format!(".{}.par", i)
} else {
".par".to_string()
};
let par_path = Path::new(repo_path)
.join("packs").join("parity").join(shard_prefix)
.join(format!("{}{}", manifest.group_id, suffix));
if par_path.exists() {
let data = tokio::fs::read(&par_path).await?;
shards.push(Some(data));
} else {
shards.push(Some(vec![0u8; shard_size])); // placeholder
missing_indices.push(manifest.data_count + i);
}
}
if missing_indices.len() > manifest.parity_count {
return Err(ArchiveError::Corruption(format!(
"Too many missing shards ({}) to reconstruct with {} parity shards",
missing_indices.len(), manifest.parity_count
)));
}
// Mark missing shards as None for reed-solomon
for &idx in &missing_indices {
shards[idx] = None;
}
// Reconstruct
let mut shard_opts: Vec<Option<Vec<u8>>> = shards;
rs.reconstruct(&mut shard_opts)
.map_err(|e| ArchiveError::Other(format!("RS reconstruct failed: {:?}", e)))?;
// Write back reconstructed data packs
for &idx in &missing_indices {
if idx < manifest.data_count {
let pack_ref = &manifest.data_packs[idx];
let pack_path = resolve_pack_path(repo_path, &pack_ref.id);
if let Some(ref data) = shard_opts[idx] {
// Trim padding back to original size
let original_size = pack_ref.size as usize;
let trimmed = &data[..original_size];
let shard = &pack_ref.id[..2];
let dir = Path::new(repo_path).join("packs").join("data").join(shard);
tokio::fs::create_dir_all(&dir).await?;
tokio::fs::write(&pack_path, trimmed).await?;
tracing::info!("Reconstructed pack {} from parity", pack_ref.id);
}
}
}
Ok(true)
}
/// Find the parity group manifest containing a given pack ID.
async fn find_parity_group(
repo_path: &str,
pack_id: &str,
) -> Result<Option<ParityGroupManifest>, ArchiveError> {
let parity_dir = Path::new(repo_path).join("packs").join("parity");
if !parity_dir.exists() {
return Ok(None);
}
let manifests = find_parx_files(&parity_dir).await?;
for manifest_path in manifests {
let data = tokio::fs::read_to_string(&manifest_path).await?;
let manifest: ParityGroupManifest = match serde_json::from_str(&data) {
Ok(m) => m,
Err(_) => continue,
};
if manifest.data_packs.iter().any(|p| p.id == pack_id) {
return Ok(Some(manifest));
}
}
Ok(None)
}
/// Public helper for resolving pack paths (used by repair).
pub fn resolve_pack_path_pub(repo_path: &str, pack_id: &str) -> PathBuf {
resolve_pack_path(repo_path, pack_id)
}
/// Resolve a pack file path from its ID.
fn resolve_pack_path(repo_path: &str, pack_id: &str) -> PathBuf {
let shard = &pack_id[..std::cmp::min(2, pack_id.len())];
Path::new(repo_path)
.join("packs").join("data").join(shard)
.join(format!("{}.pack", pack_id))
}
/// Find all .parx files recursively.
async fn find_parx_files(dir: &Path) -> Result<Vec<PathBuf>, ArchiveError> {
let mut files = Vec::new();
let mut stack = vec![dir.to_path_buf()];
while let Some(current) = stack.pop() {
if !current.exists() {
continue;
}
let mut entries = tokio::fs::read_dir(&current).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.extension().and_then(|e| e.to_str()) == Some("parx") {
files.push(path);
}
}
}
Ok(files)
}

View File

@@ -10,18 +10,62 @@ pub struct RepairResult {
pub index_rebuilt: bool,
pub indexed_chunks: u64,
pub stale_locks_removed: u32,
pub packs_repaired: u32,
pub errors: Vec<String>,
}
/// Repair a repository: rebuild index, remove stale locks.
/// Repair a repository: rebuild index, remove stale locks, repair packs from parity.
pub async fn repair(repo: &mut Repository) -> Result<RepairResult, ArchiveError> {
let mut result = RepairResult {
index_rebuilt: false,
indexed_chunks: 0,
stale_locks_removed: 0,
packs_repaired: 0,
errors: Vec::new(),
};
// Remove stale locks first
match crate::lock::check_and_break_stale(&repo.path).await {
Ok(count) => {
result.stale_locks_removed = count;
if count > 0 {
tracing::info!("Removed {} stale locks", count);
}
}
Err(e) => {
result.errors.push(format!("Lock cleanup failed: {}", e));
}
}
// Check for missing pack files referenced by snapshots and attempt parity repair
let snapshots = crate::snapshot::list_snapshots(&repo.path, None).await.unwrap_or_default();
let referenced_chunks = crate::snapshot::referenced_chunks(&snapshots);
let mut missing_packs = std::collections::HashSet::new();
for hash_hex in &referenced_chunks {
if let Some(entry) = repo.index.get(hash_hex) {
let pack_path = crate::parity::resolve_pack_path_pub(&repo.path, &entry.pack_id);
if !pack_path.exists() {
missing_packs.insert(entry.pack_id.clone());
}
}
}
for pack_id in &missing_packs {
match crate::parity::repair_from_parity(&repo.path, pack_id).await {
Ok(true) => {
result.packs_repaired += 1;
tracing::info!("Repaired pack {} from parity", pack_id);
}
Ok(false) => {
result.errors.push(format!("Pack {} missing, no parity available", pack_id));
}
Err(e) => {
result.errors.push(format!("Pack {} repair failed: {}", pack_id, e));
}
}
}
// Rebuild global index from pack .idx files
match repo.reindex().await {
Ok(count) => {
@@ -35,18 +79,5 @@ pub async fn repair(repo: &mut Repository) -> Result<RepairResult, ArchiveError>
}
}
// Remove stale locks
match crate::lock::check_and_break_stale(&repo.path).await {
Ok(count) => {
result.stale_locks_removed = count;
if count > 0 {
tracing::info!("Removed {} stale locks", count);
}
}
Err(e) => {
result.errors.push(format!("Lock cleanup failed: {}", e));
}
}
Ok(result)
}