162 lines
6.0 KiB
Rust
162 lines
6.0 KiB
Rust
/// Restore pipeline: reads a snapshot manifest, looks up chunks in the global
|
|
/// index, reads from pack files, decrypts, decompresses, and writes to a Unix socket.
|
|
|
|
use std::collections::HashMap;
|
|
use tokio::io::AsyncWriteExt;
|
|
use tokio::net::UnixStream;
|
|
|
|
use crate::compression;
|
|
use crate::encryption;
|
|
use crate::error::ArchiveError;
|
|
use crate::hasher;
|
|
use crate::pack_reader;
|
|
use crate::pack_writer::{IdxEntry, FLAG_ENCRYPTED};
|
|
use crate::repository::Repository;
|
|
use crate::snapshot;
|
|
|
|
/// Restore a snapshot (or a specific item) to a Unix socket.
|
|
pub async fn restore(
|
|
repo: &Repository,
|
|
snapshot_id: &str,
|
|
socket_path: &str,
|
|
item_name: Option<&str>,
|
|
) -> Result<(), ArchiveError> {
|
|
// Load snapshot manifest
|
|
let snap = snapshot::load_snapshot(&repo.path, snapshot_id).await?;
|
|
|
|
// Determine which items to restore
|
|
let items_to_restore: Vec<&snapshot::SnapshotItem> = if let Some(name) = item_name {
|
|
snap.items.iter()
|
|
.filter(|i| i.name == name)
|
|
.collect()
|
|
} else {
|
|
snap.items.iter().collect()
|
|
};
|
|
|
|
if items_to_restore.is_empty() {
|
|
return Err(ArchiveError::NotFound(format!(
|
|
"No items found in snapshot {}{}",
|
|
snapshot_id,
|
|
item_name.map(|n| format!(" with name '{}'", n)).unwrap_or_default()
|
|
)));
|
|
}
|
|
|
|
// Connect to the Unix socket where TypeScript will read the restored data
|
|
let mut stream = UnixStream::connect(socket_path).await
|
|
.map_err(|e| ArchiveError::Io(e))?;
|
|
|
|
tracing::info!("Connected to restore socket: {}", socket_path);
|
|
|
|
// Cache loaded IDX entries per pack to avoid re-reading
|
|
let mut idx_cache: HashMap<String, Vec<IdxEntry>> = HashMap::new();
|
|
|
|
let mut restored_bytes: u64 = 0;
|
|
let mut chunks_read: u64 = 0;
|
|
|
|
for item in items_to_restore {
|
|
for hash_hex in &item.chunks {
|
|
// Look up chunk in global index
|
|
let index_entry = repo.index.get(hash_hex)
|
|
.ok_or_else(|| ArchiveError::NotFound(format!(
|
|
"Chunk {} not found in index", hash_hex
|
|
)))?;
|
|
|
|
// Determine pack file path
|
|
let shard = &index_entry.pack_id[..2];
|
|
let pack_path = std::path::Path::new(&repo.path)
|
|
.join("packs")
|
|
.join("data")
|
|
.join(shard)
|
|
.join(format!("{}.pack", index_entry.pack_id));
|
|
|
|
// Read chunk data from pack
|
|
let stored_data = pack_reader::read_chunk(
|
|
&pack_path,
|
|
index_entry.offset,
|
|
index_entry.compressed_size,
|
|
).await?;
|
|
|
|
// Get flags for this chunk (determines compression algorithm)
|
|
let chunk_flags = index_entry.flags;
|
|
|
|
// Decrypt if encrypted
|
|
let compressed = if chunk_flags & FLAG_ENCRYPTED != 0 {
|
|
let key = repo.master_key.as_ref().ok_or_else(|| {
|
|
ArchiveError::Encryption("Chunk is encrypted but no key available".to_string())
|
|
})?;
|
|
|
|
// Try to get nonce from the global index first (fast path)
|
|
let nonce = if let Some(ref nonce_hex) = index_entry.nonce {
|
|
let nonce_bytes = hex::decode(nonce_hex)
|
|
.map_err(|_| ArchiveError::Corruption(format!("Invalid nonce hex: {}", nonce_hex)))?;
|
|
let mut n = [0u8; 12];
|
|
if nonce_bytes.len() >= 12 {
|
|
n.copy_from_slice(&nonce_bytes[..12]);
|
|
}
|
|
n
|
|
} else {
|
|
// Fallback: read from IDX file (cached)
|
|
let entries = if let Some(cached) = idx_cache.get(&index_entry.pack_id) {
|
|
cached
|
|
} else {
|
|
let idx_path = std::path::Path::new(&repo.path)
|
|
.join("packs")
|
|
.join("data")
|
|
.join(shard)
|
|
.join(format!("{}.idx", index_entry.pack_id));
|
|
let loaded = pack_reader::load_idx(&idx_path).await?;
|
|
idx_cache.insert(index_entry.pack_id.clone(), loaded);
|
|
idx_cache.get(&index_entry.pack_id).unwrap()
|
|
};
|
|
|
|
let hash_bytes = hasher::hex_to_hash(hash_hex)
|
|
.map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?;
|
|
|
|
let idx_entry = pack_reader::find_in_idx(entries, &hash_bytes)
|
|
.ok_or_else(|| ArchiveError::NotFound(format!(
|
|
"Chunk {} not found in pack index {}", hash_hex, index_entry.pack_id
|
|
)))?;
|
|
idx_entry.nonce
|
|
};
|
|
|
|
encryption::decrypt_chunk(&stored_data, key, &nonce)?
|
|
} else {
|
|
stored_data
|
|
};
|
|
|
|
// Decompress using flags to determine algorithm
|
|
let plaintext = compression::decompress_by_flags(&compressed, chunk_flags)?;
|
|
|
|
// Verify hash
|
|
let actual_hash = hasher::hash_chunk(&plaintext);
|
|
let expected_hash = hasher::hex_to_hash(hash_hex)
|
|
.map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?;
|
|
|
|
if actual_hash != expected_hash {
|
|
return Err(ArchiveError::Corruption(format!(
|
|
"Hash mismatch for chunk {}: expected {}, got {}",
|
|
hash_hex,
|
|
hash_hex,
|
|
hasher::hash_to_hex(&actual_hash)
|
|
)));
|
|
}
|
|
|
|
// Write to output socket
|
|
stream.write_all(&plaintext).await?;
|
|
|
|
restored_bytes += plaintext.len() as u64;
|
|
chunks_read += 1;
|
|
}
|
|
}
|
|
|
|
// Close the write side
|
|
stream.shutdown().await?;
|
|
|
|
tracing::info!(
|
|
"Restore complete: {} bytes, {} chunks from snapshot {}",
|
|
restored_bytes, chunks_read, snapshot_id
|
|
);
|
|
|
|
Ok(())
|
|
}
|