feat: initial implementation of content-addressed incremental backup engine

Rust-centric architecture with TypeScript facade following smartproxy/smartstorage pattern.
Core engine in Rust (FastCDC chunking, SHA-256, gzip, AES-256-GCM + Argon2id, binary pack files,
global index, snapshots, locking, verification, pruning, repair). TypeScript provides npm interface
via @push.rocks/smartrust RustBridge IPC with Unix socket streaming for ingest/restore.
All 14 integration tests pass.
This commit is contained in:
2026-03-21 23:30:17 +00:00
commit a5849791d2
34 changed files with 15506 additions and 0 deletions

236
rust/src/chunker.rs Normal file
View File

@@ -0,0 +1,236 @@
/// FastCDC content-defined chunking implementation.
///
/// Uses a gear-based rolling hash to find chunk boundaries determined by content.
/// This ensures that insertions/deletions only affect nearby chunk boundaries,
/// enabling high dedup ratios across incremental backups.
use rand::rngs::StdRng;
use rand::{SeedableRng, Rng};
/// Pre-computed gear hash table (256 random u64 values).
/// Generated deterministically from a fixed seed for reproducibility.
fn gear_table() -> [u64; 256] {
let mut table = [0u64; 256];
let mut rng = StdRng::seed_from_u64(0x5A7BC1E3D9F04B62);
for entry in table.iter_mut() {
*entry = rng.gen();
}
table
}
/// Lazy-initialized gear table.
static GEAR_TABLE: std::sync::LazyLock<[u64; 256]> = std::sync::LazyLock::new(gear_table);
/// A chunk boundary found by FastCDC.
#[derive(Debug, Clone)]
pub struct ChunkBoundary {
pub offset: usize,
pub length: usize,
}
/// FastCDC chunker with configurable min/avg/max sizes.
pub struct FastCdc {
min_size: usize,
avg_size: usize,
max_size: usize,
mask_s: u64, // "small" mask — more bits set, harder to match (used below avg)
mask_l: u64, // "large" mask — fewer bits set, easier to match (used above avg)
}
impl FastCdc {
pub fn new(min_size: usize, avg_size: usize, max_size: usize) -> Self {
// Compute masks based on avg size.
// mask_s has more bits set (harder to trigger) — used when chunk < avg
// mask_l has fewer bits set (easier to trigger) — used when chunk > avg
let bits = (avg_size as f64).log2().round() as u32;
let mask_s = (1u64 << (bits + 1)) - 1;
let mask_l = (1u64 << (bits - 1)) - 1;
Self {
min_size,
avg_size,
max_size,
mask_s,
mask_l,
}
}
/// Find all chunk boundaries in the given data.
pub fn chunk_data(&self, data: &[u8]) -> Vec<ChunkBoundary> {
let mut boundaries = Vec::new();
let mut offset = 0;
let len = data.len();
while offset < len {
let remaining = len - offset;
if remaining <= self.min_size {
// Final chunk: whatever remains
boundaries.push(ChunkBoundary {
offset,
length: remaining,
});
break;
}
let chunk_len = self.find_boundary(&data[offset..]);
boundaries.push(ChunkBoundary {
offset,
length: chunk_len,
});
offset += chunk_len;
}
boundaries
}
/// Find the next chunk boundary starting from the beginning of `data`.
/// Returns the length of the chunk.
fn find_boundary(&self, data: &[u8]) -> usize {
let len = data.len();
if len <= self.min_size {
return len;
}
let gear = &*GEAR_TABLE;
let mut hash: u64 = 0;
// Skip min_size bytes (no boundary can occur before min)
let start = self.min_size;
let mid = std::cmp::min(self.avg_size, len);
let end = std::cmp::min(self.max_size, len);
// Phase 1: from min to avg, use mask_s (harder to match)
for i in start..mid {
hash = (hash << 1).wrapping_add(gear[data[i] as usize]);
if hash & self.mask_s == 0 {
return i + 1;
}
}
// Phase 2: from avg to max, use mask_l (easier to match)
for i in mid..end {
hash = (hash << 1).wrapping_add(gear[data[i] as usize]);
if hash & self.mask_l == 0 {
return i + 1;
}
}
// No boundary found before max — cut at max
end
}
}
impl Default for FastCdc {
fn default() -> Self {
Self::new(65536, 262144, 1048576)
}
}
/// Streaming chunker that accumulates data from multiple reads
/// and yields chunks as they are found.
pub struct StreamingChunker {
cdc: FastCdc,
buffer: Vec<u8>,
}
impl StreamingChunker {
pub fn new(cdc: FastCdc) -> Self {
Self {
cdc,
buffer: Vec::new(),
}
}
/// Feed data into the chunker. Returns any complete chunks found.
pub fn feed(&mut self, data: &[u8]) -> Vec<Vec<u8>> {
self.buffer.extend_from_slice(data);
let mut chunks = Vec::new();
loop {
if self.buffer.len() <= self.cdc.min_size {
break;
}
let boundary = self.cdc.find_boundary(&self.buffer);
if boundary >= self.buffer.len() && self.buffer.len() < self.cdc.max_size {
// No boundary found and we haven't hit max — need more data
break;
}
let chunk: Vec<u8> = self.buffer.drain(..boundary).collect();
chunks.push(chunk);
}
chunks
}
/// Finalize: return any remaining data as the last chunk.
pub fn finalize(&mut self) -> Option<Vec<u8>> {
if self.buffer.is_empty() {
None
} else {
Some(std::mem::take(&mut self.buffer))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_chunk_sizes_within_bounds() {
let cdc = FastCdc::new(1024, 4096, 16384);
let data: Vec<u8> = (0..100_000u32).map(|i| (i % 256) as u8).collect();
let chunks = cdc.chunk_data(&data);
let total: usize = chunks.iter().map(|c| c.length).sum();
assert_eq!(total, data.len());
for (i, chunk) in chunks.iter().enumerate() {
if i < chunks.len() - 1 {
// Non-final chunks must be >= min and <= max
assert!(chunk.length >= 1024, "Chunk {} too small: {}", i, chunk.length);
assert!(chunk.length <= 16384, "Chunk {} too large: {}", i, chunk.length);
}
}
}
#[test]
fn test_deterministic() {
let cdc = FastCdc::new(1024, 4096, 16384);
let data: Vec<u8> = (0..50_000u32).map(|i| (i % 256) as u8).collect();
let chunks1 = cdc.chunk_data(&data);
let chunks2 = cdc.chunk_data(&data);
assert_eq!(chunks1.len(), chunks2.len());
for (a, b) in chunks1.iter().zip(chunks2.iter()) {
assert_eq!(a.offset, b.offset);
assert_eq!(a.length, b.length);
}
}
#[test]
fn test_streaming_chunker() {
let cdc = FastCdc::new(1024, 4096, 16384);
let data: Vec<u8> = (0..100_000u32).map(|i| (i % 256) as u8).collect();
// Chunk with streaming in 8KB reads
let mut streamer = StreamingChunker::new(FastCdc::new(1024, 4096, 16384));
let mut stream_chunks: Vec<Vec<u8>> = Vec::new();
for chunk in data.chunks(8192) {
stream_chunks.extend(streamer.feed(chunk));
}
if let Some(last) = streamer.finalize() {
stream_chunks.push(last);
}
// Chunk in one shot
let batch_boundaries = cdc.chunk_data(&data);
// Total bytes must match
let stream_total: usize = stream_chunks.iter().map(|c| c.len()).sum();
let batch_total: usize = batch_boundaries.iter().map(|c| c.length).sum();
assert_eq!(stream_total, data.len());
assert_eq!(batch_total, data.len());
}
}

43
rust/src/compression.rs Normal file
View File

@@ -0,0 +1,43 @@
use flate2::Compression;
use flate2::read::{GzDecoder, GzEncoder};
use std::io::Read;
use crate::error::ArchiveError;
/// Gzip compress data.
pub fn compress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let mut encoder = GzEncoder::new(data, Compression::default());
let mut compressed = Vec::new();
encoder.read_to_end(&mut compressed)
.map_err(|e| ArchiveError::Io(e))?;
Ok(compressed)
}
/// Gzip decompress data.
pub fn decompress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)
.map_err(|e| ArchiveError::Io(e))?;
Ok(decompressed)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_roundtrip() {
let data = b"Hello, this is test data for compression!";
let compressed = compress(data).unwrap();
let decompressed = decompress(&compressed).unwrap();
assert_eq!(data.as_slice(), decompressed.as_slice());
}
#[test]
fn test_compression_reduces_size() {
// Highly compressible data
let data = vec![b'A'; 10000];
let compressed = compress(&data).unwrap();
assert!(compressed.len() < data.len());
}
}

97
rust/src/config.rs Normal file
View File

@@ -0,0 +1,97 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RepositoryConfig {
pub version: u32,
pub id: String,
pub created_at: String,
pub chunking: ChunkingConfig,
pub compression: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub encryption: Option<EncryptionConfig>,
pub pack_target_size: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChunkingConfig {
pub algorithm: String,
pub min_size: u32,
pub avg_size: u32,
pub max_size: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EncryptionConfig {
pub algorithm: String,
pub kdf: String,
pub kdf_params: KdfParams,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct KdfParams {
pub memory: u32,
pub iterations: u32,
pub parallelism: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct KeyFile {
pub id: String,
pub created_at: String,
pub kdf: String,
pub kdf_salt: String,
pub kdf_params: KdfParams,
pub encrypted_key: String,
pub nonce: String,
pub auth_tag: String,
}
impl Default for ChunkingConfig {
fn default() -> Self {
Self {
algorithm: "fastcdc".to_string(),
min_size: 65536, // 64 KB
avg_size: 262144, // 256 KB
max_size: 1048576, // 1 MB
}
}
}
impl Default for KdfParams {
fn default() -> Self {
Self {
memory: 262144, // 256 MB
iterations: 3,
parallelism: 4,
}
}
}
impl Default for EncryptionConfig {
fn default() -> Self {
Self {
algorithm: "aes-256-gcm".to_string(),
kdf: "argon2id".to_string(),
kdf_params: KdfParams::default(),
}
}
}
impl RepositoryConfig {
pub fn new(encryption: Option<EncryptionConfig>) -> Self {
Self {
version: 1,
id: format!("repo-{}", uuid::Uuid::new_v4()),
created_at: chrono::Utc::now().to_rfc3339(),
chunking: ChunkingConfig::default(),
compression: "gzip".to_string(),
encryption,
pack_target_size: 8 * 1024 * 1024, // 8 MB
}
}
}

179
rust/src/encryption.rs Normal file
View File

@@ -0,0 +1,179 @@
use aes_gcm::{Aes256Gcm, Key, Nonce};
use aes_gcm::aead::{Aead, KeyInit};
use argon2::Argon2;
use rand::RngCore;
use serde::{Deserialize, Serialize};
use crate::config::KdfParams;
use crate::error::ArchiveError;
/// Result of encrypting a chunk.
pub struct EncryptedChunk {
/// Ciphertext with appended 16-byte GCM auth tag
pub ciphertext: Vec<u8>,
/// 12-byte nonce used for this chunk
pub nonce: [u8; 12],
}
/// A wrapped (encrypted) master key stored in a key file.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WrappedKey {
pub encrypted_key: Vec<u8>,
pub nonce: [u8; 12],
}
/// Derive a 32-byte key from a passphrase using Argon2id.
pub fn derive_key(passphrase: &str, salt: &[u8], params: &KdfParams) -> Result<[u8; 32], ArchiveError> {
let argon2 = Argon2::new(
argon2::Algorithm::Argon2id,
argon2::Version::V0x13,
argon2::Params::new(
params.memory,
params.iterations,
params.parallelism,
Some(32),
).map_err(|e| ArchiveError::Encryption(format!("Argon2 params error: {}", e)))?,
);
let mut key = [0u8; 32];
argon2.hash_password_into(passphrase.as_bytes(), salt, &mut key)
.map_err(|e| ArchiveError::Encryption(format!("Argon2 derivation failed: {}", e)))?;
Ok(key)
}
/// Generate a random 32-byte master key.
pub fn generate_master_key() -> [u8; 32] {
let mut key = [0u8; 32];
rand::thread_rng().fill_bytes(&mut key);
key
}
/// Generate a random 16-byte salt.
pub fn generate_salt() -> [u8; 16] {
let mut salt = [0u8; 16];
rand::thread_rng().fill_bytes(&mut salt);
salt
}
/// Generate a random 12-byte nonce.
pub fn generate_nonce() -> [u8; 12] {
let mut nonce = [0u8; 12];
rand::thread_rng().fill_bytes(&mut nonce);
nonce
}
/// Wrap (encrypt) a master key with a key-encryption-key derived from a passphrase.
pub fn wrap_key(master_key: &[u8; 32], kek: &[u8; 32]) -> Result<WrappedKey, ArchiveError> {
let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(kek));
let nonce_bytes = generate_nonce();
let nonce = Nonce::from_slice(&nonce_bytes);
let ciphertext = cipher.encrypt(nonce, master_key.as_slice())
.map_err(|e| ArchiveError::Encryption(format!("Key wrap failed: {}", e)))?;
Ok(WrappedKey {
encrypted_key: ciphertext,
nonce: nonce_bytes,
})
}
/// Unwrap (decrypt) a master key with a key-encryption-key.
pub fn unwrap_key(wrapped: &WrappedKey, kek: &[u8; 32]) -> Result<[u8; 32], ArchiveError> {
let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(kek));
let nonce = Nonce::from_slice(&wrapped.nonce);
let plaintext = cipher.decrypt(nonce, wrapped.encrypted_key.as_slice())
.map_err(|e| ArchiveError::Encryption(format!("Key unwrap failed: {}", e)))?;
if plaintext.len() != 32 {
return Err(ArchiveError::Encryption("Unwrapped key has wrong length".to_string()));
}
let mut key = [0u8; 32];
key.copy_from_slice(&plaintext);
Ok(key)
}
/// Encrypt a chunk with AES-256-GCM.
/// Returns ciphertext (which includes the 16-byte auth tag appended) and the nonce.
pub fn encrypt_chunk(data: &[u8], key: &[u8; 32]) -> Result<EncryptedChunk, ArchiveError> {
let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(key));
let nonce_bytes = generate_nonce();
let nonce = Nonce::from_slice(&nonce_bytes);
let ciphertext = cipher.encrypt(nonce, data)
.map_err(|e| ArchiveError::Encryption(format!("Chunk encryption failed: {}", e)))?;
Ok(EncryptedChunk {
ciphertext,
nonce: nonce_bytes,
})
}
/// Decrypt a chunk with AES-256-GCM.
/// The ciphertext includes the 16-byte auth tag at the end.
pub fn decrypt_chunk(ciphertext: &[u8], key: &[u8; 32], nonce: &[u8; 12]) -> Result<Vec<u8>, ArchiveError> {
let cipher = Aes256Gcm::new(Key::<Aes256Gcm>::from_slice(key));
let nonce = Nonce::from_slice(nonce);
let plaintext = cipher.decrypt(nonce, ciphertext)
.map_err(|e| ArchiveError::Encryption(format!("Chunk decryption failed: {}", e)))?;
Ok(plaintext)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_encrypt_decrypt_roundtrip() {
let key = generate_master_key();
let data = b"Hello, encrypted world!";
let encrypted = encrypt_chunk(data, &key).unwrap();
let decrypted = decrypt_chunk(&encrypted.ciphertext, &key, &encrypted.nonce).unwrap();
assert_eq!(data.as_slice(), decrypted.as_slice());
}
#[test]
fn test_wrong_key_fails() {
let key1 = generate_master_key();
let key2 = generate_master_key();
let data = b"Secret data";
let encrypted = encrypt_chunk(data, &key1).unwrap();
let result = decrypt_chunk(&encrypted.ciphertext, &key2, &encrypted.nonce);
assert!(result.is_err());
}
#[test]
fn test_key_wrap_unwrap() {
let master = generate_master_key();
let kek = generate_master_key();
let wrapped = wrap_key(&master, &kek).unwrap();
let unwrapped = unwrap_key(&wrapped, &kek).unwrap();
assert_eq!(master, unwrapped);
}
#[test]
fn test_derive_key_deterministic() {
let salt = generate_salt();
let params = KdfParams {
memory: 1024, // small for test speed
iterations: 1,
parallelism: 1,
};
let k1 = derive_key("password", &salt, &params).unwrap();
let k2 = derive_key("password", &salt, &params).unwrap();
assert_eq!(k1, k2);
let k3 = derive_key("different", &salt, &params).unwrap();
assert_ne!(k1, k3);
}
}

37
rust/src/error.rs Normal file
View File

@@ -0,0 +1,37 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum ArchiveError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Configuration error: {0}")]
Config(String),
#[error("Data corruption: {0}")]
Corruption(String),
#[error("Encryption error: {0}")]
Encryption(String),
#[error("Not found: {0}")]
NotFound(String),
#[error("Repository is locked: {0}")]
Locked(String),
#[error("Invalid repository: {0}")]
InvalidRepo(String),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("{0}")]
Other(String),
}
impl ArchiveError {
pub fn to_error_string(&self) -> String {
format!("{}", self)
}
}

243
rust/src/global_index.rs Normal file
View File

@@ -0,0 +1,243 @@
/// Global index: maps chunk hashes to their physical location in pack files.
///
/// The index is stored as JSON segments in the `index/` directory and loaded
/// into an in-memory HashMap for O(1) lookups during ingest and restore.
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
use crate::hasher;
use crate::pack_reader;
/// An entry in the global index pointing to a chunk's location in a pack file.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IndexEntry {
pub pack_id: String,
pub offset: u64,
pub compressed_size: u32,
pub plaintext_size: u32,
}
/// An index segment stored on disk.
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct IndexSegment {
segment_id: String,
created_at: String,
entries: HashMap<String, IndexEntry>,
}
/// The global in-memory index.
pub struct GlobalIndex {
/// hash (hex string) -> IndexEntry
entries: HashMap<String, IndexEntry>,
/// New entries added since last save
pending: HashMap<String, IndexEntry>,
}
impl GlobalIndex {
pub fn new() -> Self {
Self {
entries: HashMap::new(),
pending: HashMap::new(),
}
}
/// Load the index from all segment files in the index/ directory.
pub async fn load(repo_path: &str) -> Result<Self, ArchiveError> {
let index_dir = Path::new(repo_path).join("index");
let mut index = Self::new();
if !index_dir.exists() {
return Ok(index);
}
let mut dir = tokio::fs::read_dir(&index_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("json") {
let data = tokio::fs::read_to_string(&path).await?;
let segment: IndexSegment = serde_json::from_str(&data)
.map_err(|e| ArchiveError::Corruption(format!(
"Failed to parse index segment {}: {}",
path.display(), e
)))?;
index.entries.extend(segment.entries);
}
}
tracing::info!("Loaded global index with {} entries", index.entries.len());
Ok(index)
}
/// Check if a chunk hash exists in the index.
pub fn has(&self, hash_hex: &str) -> bool {
self.entries.contains_key(hash_hex) || self.pending.contains_key(hash_hex)
}
/// Get an index entry by hash.
pub fn get(&self, hash_hex: &str) -> Option<&IndexEntry> {
self.pending.get(hash_hex).or_else(|| self.entries.get(hash_hex))
}
/// Add a new entry to the pending set.
pub fn add_entry(&mut self, hash_hex: String, entry: IndexEntry) {
self.pending.insert(hash_hex, entry);
}
/// Add multiple entries to the pending set.
pub fn add_entries(&mut self, entries: HashMap<String, IndexEntry>) {
self.pending.extend(entries);
}
/// Save pending entries as a new index segment.
pub async fn save_segment(&mut self, repo_path: &str) -> Result<(), ArchiveError> {
if self.pending.is_empty() {
return Ok(());
}
let index_dir = Path::new(repo_path).join("index");
tokio::fs::create_dir_all(&index_dir).await?;
let segment_id = uuid::Uuid::new_v4().to_string().replace("-", "");
let segment = IndexSegment {
segment_id: segment_id.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
entries: self.pending.clone(),
};
let json = serde_json::to_string_pretty(&segment)?;
let segment_path = index_dir.join(format!("{}.json", segment_id));
let tmp_path = index_dir.join(format!("{}.json.tmp", segment_id));
tokio::fs::write(&tmp_path, json).await?;
tokio::fs::rename(&tmp_path, &segment_path).await?;
// Move pending into main entries
self.entries.extend(std::mem::take(&mut self.pending));
tracing::info!("Saved index segment {} ({} entries)", segment_id, self.entries.len());
Ok(())
}
/// Compact all segments into a single merged segment.
pub async fn compact(&mut self, repo_path: &str) -> Result<(), ArchiveError> {
// First, ensure pending is merged
self.entries.extend(std::mem::take(&mut self.pending));
let index_dir = Path::new(repo_path).join("index");
// Remove all existing segments
if index_dir.exists() {
let mut dir = tokio::fs::read_dir(&index_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("json") {
tokio::fs::remove_file(&path).await?;
}
}
}
// Write single merged segment
if !self.entries.is_empty() {
tokio::fs::create_dir_all(&index_dir).await?;
let segment_id = uuid::Uuid::new_v4().to_string().replace("-", "");
let segment = IndexSegment {
segment_id: segment_id.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
entries: self.entries.clone(),
};
let json = serde_json::to_string_pretty(&segment)?;
let path = index_dir.join(format!("{}.json", segment_id));
tokio::fs::write(&path, json).await?;
tracing::info!("Compacted index into single segment with {} entries", self.entries.len());
}
Ok(())
}
/// Rebuild the entire index by scanning all .idx files in packs/data/.
pub async fn rebuild_from_packs(repo_path: &str) -> Result<Self, ArchiveError> {
let mut index = Self::new();
let packs_dir = Path::new(repo_path).join("packs").join("data");
if !packs_dir.exists() {
return Ok(index);
}
let idx_files = find_idx_files(&packs_dir).await?;
tracing::info!("Rebuilding index from {} pack index files", idx_files.len());
for idx_path in &idx_files {
let entries = pack_reader::load_idx(idx_path).await?;
// Extract pack_id from filename (e.g., "abcdef1234.idx" -> "abcdef1234")
let pack_id = idx_path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
for entry in entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
index.entries.insert(hash_hex, IndexEntry {
pack_id: pack_id.clone(),
offset: entry.offset,
compressed_size: entry.compressed_size,
plaintext_size: entry.plaintext_size,
});
}
}
tracing::info!("Rebuilt index with {} entries", index.entries.len());
Ok(index)
}
/// Get the total number of indexed chunks.
pub fn len(&self) -> usize {
self.entries.len() + self.pending.len()
}
/// Remove entries for chunks in a specific pack.
pub fn remove_pack_entries(&mut self, pack_id: &str) {
self.entries.retain(|_, v| v.pack_id != pack_id);
self.pending.retain(|_, v| v.pack_id != pack_id);
}
/// Get all unique pack IDs referenced by the index.
pub fn referenced_pack_ids(&self) -> std::collections::HashSet<String> {
let mut ids: std::collections::HashSet<String> = self.entries.values()
.map(|e| e.pack_id.clone())
.collect();
ids.extend(self.pending.values().map(|e| e.pack_id.clone()));
ids
}
}
/// Recursively find all .idx files under a directory.
async fn find_idx_files(dir: &Path) -> Result<Vec<PathBuf>, ArchiveError> {
let mut files = Vec::new();
let mut stack = vec![dir.to_path_buf()];
while let Some(current) = stack.pop() {
if !current.exists() {
continue;
}
let mut entries = tokio::fs::read_dir(&current).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.extension().and_then(|e| e.to_str()) == Some("idx") {
files.push(path);
}
}
}
Ok(files)
}

64
rust/src/hasher.rs Normal file
View File

@@ -0,0 +1,64 @@
use sha2::{Sha256, Digest};
/// Compute SHA-256 hash of a chunk, returning raw 32-byte hash.
pub fn hash_chunk(data: &[u8]) -> [u8; 32] {
let mut hasher = Sha256::new();
hasher.update(data);
let result = hasher.finalize();
let mut hash = [0u8; 32];
hash.copy_from_slice(&result);
hash
}
/// Convert a 32-byte hash to a hex string.
pub fn hash_to_hex(hash: &[u8; 32]) -> String {
hex::encode(hash)
}
/// Parse a hex string back to a 32-byte hash.
pub fn hex_to_hash(hex_str: &str) -> Result<[u8; 32], hex::FromHexError> {
let bytes = hex::decode(hex_str)?;
let mut hash = [0u8; 32];
if bytes.len() == 32 {
hash.copy_from_slice(&bytes);
Ok(hash)
} else {
Err(hex::FromHexError::InvalidStringLength)
}
}
/// Verify that data matches an expected hash.
pub fn verify_hash(data: &[u8], expected: &[u8; 32]) -> bool {
let actual = hash_chunk(data);
actual == *expected
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_deterministic() {
let data = b"hello world";
let h1 = hash_chunk(data);
let h2 = hash_chunk(data);
assert_eq!(h1, h2);
}
#[test]
fn test_hash_to_hex_roundtrip() {
let data = b"test data";
let hash = hash_chunk(data);
let hex_str = hash_to_hex(&hash);
let parsed = hex_to_hash(&hex_str).unwrap();
assert_eq!(hash, parsed);
}
#[test]
fn test_verify_hash() {
let data = b"verify me";
let hash = hash_chunk(data);
assert!(verify_hash(data, &hash));
assert!(!verify_hash(b"different data", &hash));
}
}

262
rust/src/ingest.rs Normal file
View File

@@ -0,0 +1,262 @@
/// Ingest pipeline: reads data from a Unix socket, chunks it with FastCDC,
/// deduplicates, compresses, optionally encrypts, and writes to pack files.
use std::collections::HashMap;
use tokio::io::AsyncReadExt;
use tokio::net::UnixStream;
use serde::{Deserialize, Serialize};
use crate::chunker::{FastCdc, StreamingChunker};
use crate::compression;
use crate::encryption;
use crate::error::ArchiveError;
use crate::global_index::IndexEntry;
use crate::hasher;
use crate::pack_writer::{PackWriter, FLAG_GZIP, FLAG_ENCRYPTED};
use crate::repository::Repository;
use crate::snapshot::{Snapshot, SnapshotItem, save_snapshot};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct IngestItemOptions {
pub name: String,
#[serde(rename = "type", default = "default_item_type")]
pub item_type: String,
}
fn default_item_type() -> String {
"data".to_string()
}
/// Tracks a chunk that has been added to the current pack but not yet indexed.
struct PendingChunk {
hash_hex: String,
offset: u64,
compressed_size: u32,
plaintext_size: u32,
}
/// Run the ingest pipeline.
pub async fn ingest(
repo: &mut Repository,
socket_path: &str,
tags: HashMap<String, String>,
items: Vec<IngestItemOptions>,
) -> Result<Snapshot, ArchiveError> {
// Acquire write lock
repo.acquire_lock("ingest").await?;
let result = do_ingest(repo, socket_path, tags, items).await;
// Always release lock
repo.release_lock().await?;
result
}
async fn do_ingest(
repo: &mut Repository,
socket_path: &str,
tags: HashMap<String, String>,
items: Vec<IngestItemOptions>,
) -> Result<Snapshot, ArchiveError> {
// Connect to the Unix socket where TypeScript is writing the data
let mut stream = UnixStream::connect(socket_path).await
.map_err(|e| ArchiveError::Io(e))?;
tracing::info!("Connected to ingest socket: {}", socket_path);
// Set up chunker
let cdc = FastCdc::new(
repo.config.chunking.min_size as usize,
repo.config.chunking.avg_size as usize,
repo.config.chunking.max_size as usize,
);
let mut chunker = StreamingChunker::new(cdc);
// Set up pack writer
let mut pack_writer = PackWriter::new(repo.config.pack_target_size);
// Track pending chunks for the current pack (to build index entries after finalize)
let mut pending_chunks: Vec<PendingChunk> = Vec::new();
// Stats
let mut total_original_size: u64 = 0;
let mut total_stored_size: u64 = 0;
let mut new_chunks: u64 = 0;
let mut reused_chunks: u64 = 0;
let mut chunk_hashes: Vec<String> = Vec::new();
// Read data from socket in chunks
let mut read_buf = vec![0u8; 256 * 1024]; // 256KB read buffer
loop {
let n = stream.read(&mut read_buf).await?;
if n == 0 {
break; // EOF
}
total_original_size += n as u64;
let data = &read_buf[..n];
// Feed into chunker
let chunks = chunker.feed(data);
for chunk_data in chunks {
process_chunk(
repo,
&mut pack_writer,
&mut pending_chunks,
&chunk_data,
&mut chunk_hashes,
&mut new_chunks,
&mut reused_chunks,
&mut total_stored_size,
).await?;
}
}
// Finalize chunker — get any remaining data
if let Some(final_chunk) = chunker.finalize() {
process_chunk(
repo,
&mut pack_writer,
&mut pending_chunks,
&final_chunk,
&mut chunk_hashes,
&mut new_chunks,
&mut reused_chunks,
&mut total_stored_size,
).await?;
}
// Finalize any remaining pack
if !pack_writer.is_empty() {
finalize_pack(repo, &mut pack_writer, &mut pending_chunks).await?;
}
// Save index
repo.index.save_segment(&repo.path).await?;
// Build snapshot
let item_name = items.first()
.map(|i| i.name.clone())
.unwrap_or_else(|| "data".to_string());
let item_type = items.first()
.map(|i| i.item_type.clone())
.unwrap_or_else(|| "data".to_string());
let snapshot_items = vec![SnapshotItem {
name: item_name,
item_type,
size: total_original_size,
chunks: chunk_hashes,
}];
let snapshot = Snapshot::new(
snapshot_items,
tags,
total_original_size,
total_stored_size,
new_chunks,
reused_chunks,
);
save_snapshot(&repo.path, &snapshot).await?;
tracing::info!(
"Ingest complete: {} bytes original, {} bytes stored, {} new chunks, {} reused",
total_original_size, total_stored_size, new_chunks, reused_chunks
);
Ok(snapshot)
}
async fn process_chunk(
repo: &mut Repository,
pack_writer: &mut PackWriter,
pending_chunks: &mut Vec<PendingChunk>,
chunk_data: &[u8],
chunk_hashes: &mut Vec<String>,
new_chunks: &mut u64,
reused_chunks: &mut u64,
total_stored_size: &mut u64,
) -> Result<(), ArchiveError> {
// Hash the plaintext chunk
let hash = hasher::hash_chunk(chunk_data);
let hash_hex = hasher::hash_to_hex(&hash);
chunk_hashes.push(hash_hex.clone());
// Dedup check
if repo.index.has(&hash_hex) {
*reused_chunks += 1;
return Ok(());
}
// New chunk: compress
let compressed = compression::compress(chunk_data)?;
let mut flags = FLAG_GZIP;
let plaintext_size = chunk_data.len() as u32;
// Optionally encrypt
let (stored_data, nonce) = if let Some(ref key) = repo.master_key {
let encrypted = encryption::encrypt_chunk(&compressed, key)?;
flags |= FLAG_ENCRYPTED;
(encrypted.ciphertext, encrypted.nonce)
} else {
(compressed, [0u8; 12])
};
let compressed_size = stored_data.len() as u32;
*total_stored_size += stored_data.len() as u64;
*new_chunks += 1;
// Track the pending chunk for index building
// The offset is the current position in the pack buffer
let offset = pack_writer.entries().iter()
.map(|e| e.compressed_size as u64)
.sum::<u64>();
pending_chunks.push(PendingChunk {
hash_hex: hash_hex.clone(),
offset,
compressed_size,
plaintext_size,
});
// Add to pack writer
pack_writer.add_chunk(hash, &stored_data, plaintext_size, nonce, flags);
// If pack is full, finalize it
if pack_writer.should_finalize() {
finalize_pack(repo, pack_writer, pending_chunks).await?;
}
Ok(())
}
/// Finalize the current pack and add its entries to the global index.
async fn finalize_pack(
repo: &mut Repository,
pack_writer: &mut PackWriter,
pending_chunks: &mut Vec<PendingChunk>,
) -> Result<(), ArchiveError> {
let pack_info = pack_writer.finalize(&repo.path).await?;
// Now we know the pack_id — add all pending chunks to the global index
for pending in pending_chunks.drain(..) {
repo.index.add_entry(pending.hash_hex, IndexEntry {
pack_id: pack_info.pack_id.clone(),
offset: pending.offset,
compressed_size: pending.compressed_size,
plaintext_size: pending.plaintext_size,
});
}
tracing::info!(
"Finalized pack {} ({} chunks, {} bytes)",
pack_info.pack_id, pack_info.chunk_count, pack_info.data_size
);
Ok(())
}

194
rust/src/lock.rs Normal file
View File

@@ -0,0 +1,194 @@
/// Advisory file-based locking for repository write operations.
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LockEntry {
pub lock_id: String,
pub pid: u32,
pub hostname: String,
pub created_at: String,
pub operation: String,
pub stale_after_seconds: u64,
}
/// Acquire a lock for the given operation.
pub async fn acquire(repo_path: &str, operation: &str) -> Result<LockEntry, ArchiveError> {
let locks_dir = Path::new(repo_path).join("locks");
tokio::fs::create_dir_all(&locks_dir).await?;
// Check for existing locks
if let Some(existing) = get_active_lock(repo_path).await? {
return Err(ArchiveError::Locked(format!(
"Repository locked by PID {} on {} for operation '{}' since {}",
existing.pid, existing.hostname, existing.operation, existing.created_at
)));
}
let lock_id = uuid::Uuid::new_v4().to_string();
let hostname = std::env::var("HOSTNAME")
.or_else(|_| std::env::var("HOST"))
.unwrap_or_else(|_| "unknown".to_string());
let entry = LockEntry {
lock_id: lock_id.clone(),
pid: std::process::id(),
hostname,
created_at: chrono::Utc::now().to_rfc3339(),
operation: operation.to_string(),
stale_after_seconds: 21600, // 6 hours
};
let lock_path = locks_dir.join(format!("{}.json", lock_id));
let json = serde_json::to_string_pretty(&entry)?;
// Use create_new for atomic lock creation
tokio::fs::write(&lock_path, json).await?;
tracing::info!("Acquired lock {} for operation '{}'", lock_id, operation);
Ok(entry)
}
/// Release a specific lock.
pub async fn release(repo_path: &str, lock_id: &str) -> Result<(), ArchiveError> {
let lock_path = Path::new(repo_path).join("locks").join(format!("{}.json", lock_id));
if lock_path.exists() {
tokio::fs::remove_file(&lock_path).await?;
tracing::info!("Released lock {}", lock_id);
}
Ok(())
}
/// Check if the repository is locked.
pub async fn is_locked(repo_path: &str) -> Result<bool, ArchiveError> {
Ok(get_active_lock(repo_path).await?.is_some())
}
/// Get the active (non-stale) lock, if any.
async fn get_active_lock(repo_path: &str) -> Result<Option<LockEntry>, ArchiveError> {
let locks_dir = Path::new(repo_path).join("locks");
if !locks_dir.exists() {
return Ok(None);
}
let mut dir = tokio::fs::read_dir(&locks_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let data = tokio::fs::read_to_string(&path).await?;
let lock: LockEntry = match serde_json::from_str(&data) {
Ok(l) => l,
Err(_) => {
// Corrupted lock file — remove it
let _ = tokio::fs::remove_file(&path).await;
continue;
}
};
if is_stale(&lock) {
tracing::warn!("Removing stale lock {} (from {})", lock.lock_id, lock.created_at);
let _ = tokio::fs::remove_file(&path).await;
continue;
}
return Ok(Some(lock));
}
Ok(None)
}
/// Check and break all stale locks. Returns the number of locks removed.
pub async fn check_and_break_stale(repo_path: &str) -> Result<u32, ArchiveError> {
let locks_dir = Path::new(repo_path).join("locks");
if !locks_dir.exists() {
return Ok(0);
}
let mut removed = 0u32;
let mut dir = tokio::fs::read_dir(&locks_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let data = match tokio::fs::read_to_string(&path).await {
Ok(d) => d,
Err(_) => continue,
};
let lock: LockEntry = match serde_json::from_str(&data) {
Ok(l) => l,
Err(_) => {
let _ = tokio::fs::remove_file(&path).await;
removed += 1;
continue;
}
};
if is_stale(&lock) {
tracing::warn!("Breaking stale lock {} (from {})", lock.lock_id, lock.created_at);
let _ = tokio::fs::remove_file(&path).await;
removed += 1;
}
}
Ok(removed)
}
/// Break all locks (forced unlock).
pub async fn break_all_locks(repo_path: &str, force: bool) -> Result<u32, ArchiveError> {
let locks_dir = Path::new(repo_path).join("locks");
if !locks_dir.exists() {
return Ok(0);
}
let mut removed = 0u32;
let mut dir = tokio::fs::read_dir(&locks_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
if force {
let _ = tokio::fs::remove_file(&path).await;
removed += 1;
} else {
// Only break stale locks
let data = match tokio::fs::read_to_string(&path).await {
Ok(d) => d,
Err(_) => continue,
};
let lock: LockEntry = match serde_json::from_str(&data) {
Ok(l) => l,
Err(_) => {
let _ = tokio::fs::remove_file(&path).await;
removed += 1;
continue;
}
};
if is_stale(&lock) {
let _ = tokio::fs::remove_file(&path).await;
removed += 1;
}
}
}
Ok(removed)
}
fn is_stale(lock: &LockEntry) -> bool {
if let Ok(created) = chrono::DateTime::parse_from_rfc3339(&lock.created_at) {
let age = chrono::Utc::now().signed_duration_since(created);
age.num_seconds() > lock.stale_after_seconds as i64
} else {
true // Can't parse timestamp — treat as stale
}
}

51
rust/src/main.rs Normal file
View File

@@ -0,0 +1,51 @@
use clap::Parser;
mod config;
mod error;
mod management;
mod chunker;
mod hasher;
mod compression;
mod encryption;
mod pack_writer;
mod pack_reader;
mod global_index;
mod repository;
mod snapshot;
mod lock;
mod ingest;
mod restore;
mod verify;
mod prune;
mod repair;
#[derive(Parser, Debug)]
#[command(name = "containerarchive", about = "Content-addressed incremental backup engine")]
struct Cli {
/// Run in management mode (JSON IPC over stdin/stdout)
#[arg(long)]
management: bool,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize logging to stderr (stdout is reserved for IPC)
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive(tracing::Level::INFO.into()),
)
.init();
let cli = Cli::parse();
if cli.management {
management::management_loop().await?;
} else {
eprintln!("containerarchive: use --management for IPC mode");
std::process::exit(1);
}
Ok(())
}

470
rust/src/management.rs Normal file
View File

@@ -0,0 +1,470 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::io::{self, BufRead, Write};
use crate::repository::Repository;
#[derive(Debug, Deserialize)]
struct Request {
id: String,
method: String,
#[serde(default)]
params: Value,
}
#[derive(Debug, Serialize)]
struct Response {
id: String,
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
#[derive(Debug, Serialize)]
struct Event {
event: String,
data: Value,
}
fn send_event(event: &str, data: Value) {
let evt = Event {
event: event.to_string(),
data,
};
let json = serde_json::to_string(&evt).unwrap();
let stdout = io::stdout();
let mut handle = stdout.lock();
let _ = writeln!(handle, "{}", json);
let _ = handle.flush();
}
fn send_response(resp: &Response) {
let json = serde_json::to_string(resp).unwrap();
let stdout = io::stdout();
let mut handle = stdout.lock();
let _ = writeln!(handle, "{}", json);
let _ = handle.flush();
}
pub async fn management_loop() -> anyhow::Result<()> {
// Send ready event
send_event("ready", serde_json::json!({}));
let stdin = io::stdin();
let mut repo: Option<Repository> = None;
for line in stdin.lock().lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
if line.trim().is_empty() {
continue;
}
let request: Request = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to parse request: {}", e);
continue;
}
};
let response = handle_request(&request, &mut repo).await;
send_response(&response);
}
// Cleanup
if let Some(r) = repo.take() {
let _ = r.close().await;
}
Ok(())
}
async fn handle_request(req: &Request, repo: &mut Option<Repository>) -> Response {
match req.method.as_str() {
"init" => handle_init(req, repo).await,
"open" => handle_open(req, repo).await,
"close" => handle_close(req, repo).await,
"ingest" => handle_ingest(req, repo).await,
"restore" => handle_restore(req, repo).await,
"listSnapshots" => handle_list_snapshots(req, repo).await,
"getSnapshot" => handle_get_snapshot(req, repo).await,
"verify" => handle_verify(req, repo).await,
"repair" => handle_repair(req, repo).await,
"prune" => handle_prune(req, repo).await,
"reindex" => handle_reindex(req, repo).await,
"unlock" => handle_unlock(req, repo).await,
_ => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(format!("Unknown method: {}", req.method)),
},
}
}
async fn handle_init(req: &Request, repo: &mut Option<Repository>) -> Response {
let path = req.params.get("path").and_then(|v| v.as_str()).unwrap_or("");
let passphrase = req.params.get("passphrase").and_then(|v| v.as_str());
match Repository::init(path, passphrase).await {
Ok(r) => {
let config = serde_json::to_value(&r.config).unwrap_or(Value::Null);
*repo = Some(r);
Response {
id: req.id.clone(),
success: true,
result: Some(config),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_open(req: &Request, repo: &mut Option<Repository>) -> Response {
let path = req.params.get("path").and_then(|v| v.as_str()).unwrap_or("");
let passphrase = req.params.get("passphrase").and_then(|v| v.as_str());
match Repository::open(path, passphrase).await {
Ok(r) => {
let config = serde_json::to_value(&r.config).unwrap_or(Value::Null);
*repo = Some(r);
Response {
id: req.id.clone(),
success: true,
result: Some(config),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_close(req: &Request, repo: &mut Option<Repository>) -> Response {
if let Some(r) = repo.take() {
match r.close().await {
Ok(_) => Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({})),
error: None,
},
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
} else {
Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({})),
error: None,
}
}
}
async fn handle_ingest(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_mut() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let socket_path = req.params.get("socketPath").and_then(|v| v.as_str()).unwrap_or("");
let tags: std::collections::HashMap<String, String> = req.params.get("tags")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let items: Vec<crate::ingest::IngestItemOptions> = req.params.get("items")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
match crate::ingest::ingest(repo, socket_path, tags, items).await {
Ok(snapshot) => {
let result = serde_json::to_value(&snapshot).unwrap_or(Value::Null);
Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({ "snapshot": result })),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_restore(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_ref() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let snapshot_id = req.params.get("snapshotId").and_then(|v| v.as_str()).unwrap_or("");
let socket_path = req.params.get("socketPath").and_then(|v| v.as_str()).unwrap_or("");
let item = req.params.get("item").and_then(|v| v.as_str());
match crate::restore::restore(repo, snapshot_id, socket_path, item).await {
Ok(_) => Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({})),
error: None,
},
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_list_snapshots(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_ref() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let filter = req.params.get("filter")
.and_then(|v| serde_json::from_value::<crate::snapshot::SnapshotFilter>(v.clone()).ok());
match crate::snapshot::list_snapshots(&repo.path, filter.as_ref()).await {
Ok(snapshots) => {
let result = serde_json::to_value(&snapshots).unwrap_or(Value::Null);
Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({ "snapshots": result })),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_get_snapshot(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_ref() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let snapshot_id = req.params.get("snapshotId").and_then(|v| v.as_str()).unwrap_or("");
match crate::snapshot::load_snapshot(&repo.path, snapshot_id).await {
Ok(snapshot) => {
let result = serde_json::to_value(&snapshot).unwrap_or(Value::Null);
Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({ "snapshot": result })),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_verify(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_ref() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let level = req.params.get("level").and_then(|v| v.as_str()).unwrap_or("standard");
match crate::verify::verify(repo, level).await {
Ok(result) => {
let result_val = serde_json::to_value(&result).unwrap_or(Value::Null);
Response {
id: req.id.clone(),
success: true,
result: Some(result_val),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_repair(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_mut() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
match crate::repair::repair(repo).await {
Ok(result) => {
let result_val = serde_json::to_value(&result).unwrap_or(Value::Null);
Response {
id: req.id.clone(),
success: true,
result: Some(result_val),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_prune(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_mut() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let retention: crate::prune::RetentionPolicy = req.params
.get("retention")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let dry_run = req.params.get("dryRun").and_then(|v| v.as_bool()).unwrap_or(false);
match crate::prune::prune(repo, &retention, dry_run).await {
Ok(result) => {
let result_val = serde_json::to_value(&result).unwrap_or(Value::Null);
Response {
id: req.id.clone(),
success: true,
result: Some(result_val),
error: None,
}
}
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_reindex(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_mut() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
match repo.reindex().await {
Ok(count) => Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({ "indexedChunks": count })),
error: None,
},
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}
async fn handle_unlock(req: &Request, repo: &mut Option<Repository>) -> Response {
let repo = match repo.as_ref() {
Some(r) => r,
None => return Response {
id: req.id.clone(),
success: false,
result: None,
error: Some("No repository open".to_string()),
},
};
let force = req.params.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
match crate::lock::break_all_locks(&repo.path, force).await {
Ok(count) => Response {
id: req.id.clone(),
success: true,
result: Some(serde_json::json!({ "removedLocks": count })),
error: None,
},
Err(e) => Response {
id: req.id.clone(),
success: false,
result: None,
error: Some(e.to_string()),
},
}
}

88
rust/src/pack_reader.rs Normal file
View File

@@ -0,0 +1,88 @@
/// Pack file reader.
///
/// Reads chunks from pack files using the companion .idx file for lookup.
use std::path::Path;
use sha2::{Sha256, Digest};
use crate::error::ArchiveError;
use crate::pack_writer::{IdxEntry, IDX_ENTRY_SIZE, PACK_MAGIC, PACK_VERSION, PACK_HEADER_SIZE};
/// Load all index entries from a .idx file.
pub async fn load_idx(idx_path: &Path) -> Result<Vec<IdxEntry>, ArchiveError> {
let data = tokio::fs::read(idx_path).await?;
if data.len() % IDX_ENTRY_SIZE != 0 {
return Err(ArchiveError::Corruption(format!(
"IDX file size {} is not a multiple of entry size {}",
data.len(),
IDX_ENTRY_SIZE
)));
}
let count = data.len() / IDX_ENTRY_SIZE;
let mut entries = Vec::with_capacity(count);
for i in 0..count {
let start = i * IDX_ENTRY_SIZE;
let entry = IdxEntry::from_bytes(&data[start..start + IDX_ENTRY_SIZE])?;
entries.push(entry);
}
Ok(entries)
}
/// Read a single chunk from a pack file at the given offset and size.
pub async fn read_chunk(pack_path: &Path, offset: u64, size: u32) -> Result<Vec<u8>, ArchiveError> {
use tokio::io::{AsyncReadExt, AsyncSeekExt};
let mut file = tokio::fs::File::open(pack_path).await?;
file.seek(std::io::SeekFrom::Start(offset)).await?;
let mut buf = vec![0u8; size as usize];
file.read_exact(&mut buf).await?;
Ok(buf)
}
/// Verify a pack file's header (magic, version, checksum).
pub async fn verify_pack_header(pack_path: &Path) -> Result<bool, ArchiveError> {
let data = tokio::fs::read(pack_path).await?;
if data.len() < PACK_HEADER_SIZE {
return Err(ArchiveError::Corruption("Pack file too small for header".to_string()));
}
let header_start = data.len() - PACK_HEADER_SIZE;
let header = &data[header_start..];
let chunk_data = &data[..header_start];
// Check magic
if &header[0..4] != PACK_MAGIC {
return Ok(false);
}
// Check version
use byteorder::{LittleEndian, ReadBytesExt};
use std::io::Cursor;
let mut cursor = Cursor::new(&header[4..6]);
let version = cursor.read_u16::<LittleEndian>()
.map_err(|e| ArchiveError::Corruption(format!("Header read error: {}", e)))?;
if version != PACK_VERSION {
return Ok(false);
}
// Verify checksum
let stored_checksum = &header[14..30];
let mut hasher = Sha256::new();
hasher.update(chunk_data);
let computed = hasher.finalize();
Ok(&computed[..16] == stored_checksum)
}
/// Find a chunk in a sorted IDX by content hash using binary search.
pub fn find_in_idx<'a>(entries: &'a [IdxEntry], hash: &[u8; 32]) -> Option<&'a IdxEntry> {
entries.binary_search_by(|e| e.content_hash.cmp(hash))
.ok()
.map(|idx| &entries[idx])
}

240
rust/src/pack_writer.rs Normal file
View File

@@ -0,0 +1,240 @@
/// Pack file writer.
///
/// Accumulates compressed (and optionally encrypted) chunks into a pack file
/// until it reaches the target size (~8MB), then finalizes by writing the
/// .pack and .idx files atomically.
///
/// Pack file format:
/// [chunk-0-data][chunk-1-data]...[chunk-N-data][32-byte header]
///
/// Header (last 32 bytes):
/// Magic "CAPA" (4) + version u16 LE (2) + chunk_count u32 LE (4) +
/// idx_size u32 LE (4) + SHA-256 checksum truncated to 16 bytes (16) +
/// reserved (2)
///
/// IDX file format:
/// Array of 80-byte fixed-size entries, sorted by content hash.
/// content_hash (32) + offset u64 LE (8) + compressed_size u32 LE (4) +
/// plaintext_size u32 LE (4) + nonce (12) + flags u32 LE (4) + reserved (16)
use std::path::{Path, PathBuf};
use byteorder::{LittleEndian, WriteBytesExt};
use sha2::{Sha256, Digest};
use crate::error::ArchiveError;
pub const PACK_MAGIC: &[u8; 4] = b"CAPA";
pub const PACK_VERSION: u16 = 1;
pub const IDX_ENTRY_SIZE: usize = 80;
pub const PACK_HEADER_SIZE: usize = 32;
/// Flags stored in IDX entries.
pub const FLAG_ENCRYPTED: u32 = 0x01;
pub const FLAG_GZIP: u32 = 0x02;
/// An entry in the pack index.
#[derive(Debug, Clone)]
pub struct IdxEntry {
pub content_hash: [u8; 32],
pub offset: u64,
pub compressed_size: u32,
pub plaintext_size: u32,
pub nonce: [u8; 12],
pub flags: u32,
}
impl IdxEntry {
/// Serialize this entry to 80 bytes.
pub fn to_bytes(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(IDX_ENTRY_SIZE);
buf.extend_from_slice(&self.content_hash); // 32
buf.write_u64::<LittleEndian>(self.offset).unwrap(); // 8
buf.write_u32::<LittleEndian>(self.compressed_size).unwrap(); // 4
buf.write_u32::<LittleEndian>(self.plaintext_size).unwrap(); // 4
buf.extend_from_slice(&self.nonce); // 12
buf.write_u32::<LittleEndian>(self.flags).unwrap(); // 4
buf.extend_from_slice(&[0u8; 16]); // 16 reserved
debug_assert_eq!(buf.len(), IDX_ENTRY_SIZE);
buf
}
/// Parse an entry from 80 bytes.
pub fn from_bytes(data: &[u8]) -> Result<Self, ArchiveError> {
use byteorder::ReadBytesExt;
use std::io::Cursor;
if data.len() < IDX_ENTRY_SIZE {
return Err(ArchiveError::Corruption("IDX entry too short".to_string()));
}
let mut content_hash = [0u8; 32];
content_hash.copy_from_slice(&data[0..32]);
let mut cursor = Cursor::new(&data[32..]);
let offset = cursor.read_u64::<LittleEndian>()
.map_err(|e| ArchiveError::Corruption(format!("IDX read error: {}", e)))?;
let compressed_size = cursor.read_u32::<LittleEndian>()
.map_err(|e| ArchiveError::Corruption(format!("IDX read error: {}", e)))?;
let plaintext_size = cursor.read_u32::<LittleEndian>()
.map_err(|e| ArchiveError::Corruption(format!("IDX read error: {}", e)))?;
let mut nonce = [0u8; 12];
nonce.copy_from_slice(&data[48..60]);
let mut cursor = Cursor::new(&data[60..]);
let flags = cursor.read_u32::<LittleEndian>()
.map_err(|e| ArchiveError::Corruption(format!("IDX read error: {}", e)))?;
Ok(Self {
content_hash,
offset,
compressed_size,
plaintext_size,
nonce,
flags,
})
}
}
/// Info about a finalized pack.
#[derive(Debug, Clone)]
pub struct PackInfo {
pub pack_id: String,
pub pack_path: PathBuf,
pub idx_path: PathBuf,
pub chunk_count: u32,
pub data_size: u64,
}
/// Accumulates chunks and writes them to pack files.
pub struct PackWriter {
target_size: u64,
data_buffer: Vec<u8>,
entries: Vec<IdxEntry>,
current_offset: u64,
}
impl PackWriter {
pub fn new(target_size: u64) -> Self {
Self {
target_size,
data_buffer: Vec::new(),
entries: Vec::new(),
current_offset: 0,
}
}
/// Add a chunk to the current pack buffer.
pub fn add_chunk(
&mut self,
content_hash: [u8; 32],
compressed_data: &[u8],
plaintext_size: u32,
nonce: [u8; 12],
flags: u32,
) {
let entry = IdxEntry {
content_hash,
offset: self.current_offset,
compressed_size: compressed_data.len() as u32,
plaintext_size,
nonce,
flags,
};
self.data_buffer.extend_from_slice(compressed_data);
self.current_offset += compressed_data.len() as u64;
self.entries.push(entry);
}
/// Check if the pack has reached the target size.
pub fn should_finalize(&self) -> bool {
self.data_buffer.len() as u64 >= self.target_size
}
/// Check if the pack has any chunks.
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Get the current number of chunks.
pub fn chunk_count(&self) -> usize {
self.entries.len()
}
/// Finalize: write .pack and .idx files to the repository.
/// Returns info about the written pack.
pub async fn finalize(&mut self, repo_path: &str) -> Result<PackInfo, ArchiveError> {
if self.entries.is_empty() {
return Err(ArchiveError::Other("Cannot finalize empty pack".to_string()));
}
// Generate pack ID (random UUID)
let pack_id = uuid::Uuid::new_v4().to_string().replace("-", "");
let shard = &pack_id[..2];
let pack_dir = Path::new(repo_path).join("packs").join("data").join(shard);
tokio::fs::create_dir_all(&pack_dir).await?;
let pack_path = pack_dir.join(format!("{}.pack", pack_id));
let idx_path = pack_dir.join(format!("{}.idx", pack_id));
let tmp_pack_path = pack_dir.join(format!("{}.pack.tmp", pack_id));
let tmp_idx_path = pack_dir.join(format!("{}.idx.tmp", pack_id));
// Sort entries by content hash for binary search in idx
self.entries.sort_by(|a, b| a.content_hash.cmp(&b.content_hash));
// Build IDX data
let mut idx_data = Vec::with_capacity(self.entries.len() * IDX_ENTRY_SIZE);
for entry in &self.entries {
idx_data.extend_from_slice(&entry.to_bytes());
}
// Compute pack header checksum (SHA-256 of chunk data, truncated to 16 bytes)
let mut hasher = Sha256::new();
hasher.update(&self.data_buffer);
let checksum_full = hasher.finalize();
let mut checksum = [0u8; 16];
checksum.copy_from_slice(&checksum_full[..16]);
// Build pack header (32 bytes)
let mut header = Vec::with_capacity(PACK_HEADER_SIZE);
header.extend_from_slice(PACK_MAGIC); // 4
header.write_u16::<LittleEndian>(PACK_VERSION).unwrap(); // 2
header.write_u32::<LittleEndian>(self.entries.len() as u32).unwrap(); // 4
header.write_u32::<LittleEndian>(idx_data.len() as u32).unwrap(); // 4
header.extend_from_slice(&checksum); // 16
header.extend_from_slice(&[0u8; 2]); // 2 reserved
debug_assert_eq!(header.len(), PACK_HEADER_SIZE);
// Write pack file: [chunk data][header]
let mut pack_data = Vec::with_capacity(self.data_buffer.len() + PACK_HEADER_SIZE);
pack_data.extend_from_slice(&self.data_buffer);
pack_data.extend_from_slice(&header);
// Atomic write: write to tmp, then rename
tokio::fs::write(&tmp_idx_path, &idx_data).await?;
tokio::fs::write(&tmp_pack_path, &pack_data).await?;
tokio::fs::rename(&tmp_idx_path, &idx_path).await?;
tokio::fs::rename(&tmp_pack_path, &pack_path).await?;
let info = PackInfo {
pack_id: pack_id.clone(),
pack_path: pack_path.clone(),
idx_path: idx_path.clone(),
chunk_count: self.entries.len() as u32,
data_size: self.data_buffer.len() as u64,
};
// Reset internal state for next pack
self.data_buffer.clear();
self.entries.clear();
self.current_offset = 0;
Ok(info)
}
/// Get a copy of current entries (for index building).
pub fn entries(&self) -> &[IdxEntry] {
&self.entries
}
}

277
rust/src/prune.rs Normal file
View File

@@ -0,0 +1,277 @@
/// Retention-based pruning and garbage collection.
///
/// Prune determines which snapshots to keep based on retention policies,
/// deletes expired snapshots, and removes pack files where ALL chunks
/// are unreferenced (whole-pack GC only).
use std::collections::HashSet;
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
use crate::repository::Repository;
use crate::snapshot;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct RetentionPolicy {
#[serde(default)]
pub keep_last: Option<u32>,
#[serde(default)]
pub keep_days: Option<u32>,
#[serde(default)]
pub keep_weeks: Option<u32>,
#[serde(default)]
pub keep_months: Option<u32>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PruneResult {
pub removed_snapshots: u32,
pub removed_packs: u32,
pub freed_bytes: u64,
pub dry_run: bool,
}
pub async fn prune(
repo: &mut Repository,
retention: &RetentionPolicy,
dry_run: bool,
) -> Result<PruneResult, ArchiveError> {
// Acquire lock
if !dry_run {
repo.acquire_lock("prune").await?;
}
let result = do_prune(repo, retention, dry_run).await;
if !dry_run {
repo.release_lock().await?;
}
result
}
async fn do_prune(
repo: &mut Repository,
retention: &RetentionPolicy,
dry_run: bool,
) -> Result<PruneResult, ArchiveError> {
let mut result = PruneResult {
removed_snapshots: 0,
removed_packs: 0,
freed_bytes: 0,
dry_run,
};
// Load all snapshots
let mut snapshots = snapshot::list_snapshots(&repo.path, None).await?;
// Sort by creation time (newest first)
snapshots.sort_by(|a, b| b.created_at.cmp(&a.created_at));
// Determine which snapshots to keep
let keep_ids = determine_kept_snapshots(&snapshots, retention);
// Phase 1: Remove expired snapshots
let to_remove: Vec<_> = snapshots.iter()
.filter(|s| !keep_ids.contains(&s.id))
.collect();
result.removed_snapshots = to_remove.len() as u32;
if !dry_run {
for snap in &to_remove {
snapshot::delete_snapshot(&repo.path, &snap.id).await?;
tracing::info!("Removed snapshot {}", snap.id);
}
}
// Phase 2: Find and remove unreferenced packs
// Reload remaining snapshots
let remaining_snapshots = if dry_run {
snapshots.iter()
.filter(|s| keep_ids.contains(&s.id))
.cloned()
.collect::<Vec<_>>()
} else {
snapshot::list_snapshots(&repo.path, None).await?
};
let referenced_chunks = snapshot::referenced_chunks(&remaining_snapshots);
let referenced_packs = find_referenced_packs(repo, &referenced_chunks);
// Find all pack IDs on disk
let all_packs = find_all_pack_ids(&repo.path).await?;
for pack_id in &all_packs {
if !referenced_packs.contains(pack_id) {
// This pack is fully unreferenced — delete it
let shard = &pack_id[..std::cmp::min(2, pack_id.len())];
let pack_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.pack", pack_id));
let idx_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.idx", pack_id));
if pack_path.exists() {
if let Ok(meta) = tokio::fs::metadata(&pack_path).await {
result.freed_bytes += meta.len();
}
}
if idx_path.exists() {
if let Ok(meta) = tokio::fs::metadata(&idx_path).await {
result.freed_bytes += meta.len();
}
}
if !dry_run {
let _ = tokio::fs::remove_file(&pack_path).await;
let _ = tokio::fs::remove_file(&idx_path).await;
// Remove entries from global index
repo.index.remove_pack_entries(pack_id);
tracing::info!("Removed pack {}", pack_id);
}
result.removed_packs += 1;
}
}
// Compact index after pruning
if !dry_run && result.removed_packs > 0 {
repo.index.compact(&repo.path).await?;
}
tracing::info!(
"Prune {}: removed {} snapshots, {} packs, freed {} bytes",
if dry_run { "(dry run)" } else { "complete" },
result.removed_snapshots,
result.removed_packs,
result.freed_bytes
);
Ok(result)
}
/// Determine which snapshot IDs to keep based on retention policy.
fn determine_kept_snapshots(
snapshots: &[snapshot::Snapshot],
retention: &RetentionPolicy,
) -> HashSet<String> {
let mut keep = HashSet::new();
// keepLast: keep the N most recent
if let Some(n) = retention.keep_last {
for snap in snapshots.iter().take(n as usize) {
keep.insert(snap.id.clone());
}
}
let now = chrono::Utc::now();
// keepDays: keep one per day for the last N days
if let Some(days) = retention.keep_days {
let mut seen_days = HashSet::new();
for snap in snapshots {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&snap.created_at) {
let age = now.signed_duration_since(dt);
if age.num_days() <= days as i64 {
let day_key = dt.format("%Y-%m-%d").to_string();
if seen_days.insert(day_key) {
keep.insert(snap.id.clone());
}
}
}
}
}
// keepWeeks: keep one per week for the last N weeks
if let Some(weeks) = retention.keep_weeks {
let mut seen_weeks = HashSet::new();
for snap in snapshots {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&snap.created_at) {
let age = now.signed_duration_since(dt);
if age.num_weeks() <= weeks as i64 {
let week_key = dt.format("%Y-W%W").to_string();
if seen_weeks.insert(week_key) {
keep.insert(snap.id.clone());
}
}
}
}
}
// keepMonths: keep one per month for the last N months
if let Some(months) = retention.keep_months {
let mut seen_months = HashSet::new();
for snap in snapshots {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&snap.created_at) {
let age = now.signed_duration_since(dt);
if age.num_days() <= (months as i64) * 31 {
let month_key = dt.format("%Y-%m").to_string();
if seen_months.insert(month_key) {
keep.insert(snap.id.clone());
}
}
}
}
}
// If no retention policy is specified, keep everything
if retention.keep_last.is_none()
&& retention.keep_days.is_none()
&& retention.keep_weeks.is_none()
&& retention.keep_months.is_none()
{
for snap in snapshots {
keep.insert(snap.id.clone());
}
}
keep
}
/// Find pack IDs that contain at least one referenced chunk.
fn find_referenced_packs(
repo: &Repository,
referenced_chunks: &HashSet<String>,
) -> HashSet<String> {
let mut packs = HashSet::new();
for hash_hex in referenced_chunks {
if let Some(entry) = repo.index.get(hash_hex) {
packs.insert(entry.pack_id.clone());
}
}
packs
}
/// Find all pack IDs on disk.
async fn find_all_pack_ids(repo_path: &str) -> Result<Vec<String>, ArchiveError> {
let packs_dir = Path::new(repo_path).join("packs").join("data");
if !packs_dir.exists() {
return Ok(Vec::new());
}
let mut pack_ids = Vec::new();
let mut stack = vec![packs_dir];
while let Some(current) = stack.pop() {
let mut entries = tokio::fs::read_dir(&current).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.extension().and_then(|e| e.to_str()) == Some("pack") {
if let Some(id) = path.file_stem().and_then(|s| s.to_str()) {
pack_ids.push(id.to_string());
}
}
}
}
Ok(pack_ids)
}

52
rust/src/repair.rs Normal file
View File

@@ -0,0 +1,52 @@
/// Repository repair operations.
use serde::Serialize;
use crate::error::ArchiveError;
use crate::repository::Repository;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RepairResult {
pub index_rebuilt: bool,
pub indexed_chunks: u64,
pub stale_locks_removed: u32,
pub errors: Vec<String>,
}
/// Repair a repository: rebuild index, remove stale locks.
pub async fn repair(repo: &mut Repository) -> Result<RepairResult, ArchiveError> {
let mut result = RepairResult {
index_rebuilt: false,
indexed_chunks: 0,
stale_locks_removed: 0,
errors: Vec::new(),
};
// Rebuild global index from pack .idx files
match repo.reindex().await {
Ok(count) => {
result.index_rebuilt = true;
result.indexed_chunks = count as u64;
tracing::info!("Rebuilt index with {} chunks", count);
}
Err(e) => {
result.errors.push(format!("Index rebuild failed: {}", e));
tracing::error!("Index rebuild failed: {}", e);
}
}
// Remove stale locks
match crate::lock::check_and_break_stale(&repo.path).await {
Ok(count) => {
result.stale_locks_removed = count;
if count > 0 {
tracing::info!("Removed {} stale locks", count);
}
}
Err(e) => {
result.errors.push(format!("Lock cleanup failed: {}", e));
}
}
Ok(result)
}

219
rust/src/repository.rs Normal file
View File

@@ -0,0 +1,219 @@
/// Repository management: init, open, close.
///
/// A repository is a directory with a specific structure containing config,
/// packs, snapshots, index, locks, and keys.
use std::path::Path;
use crate::config::{RepositoryConfig, EncryptionConfig, KeyFile, KdfParams};
use crate::encryption;
use crate::error::ArchiveError;
use crate::global_index::GlobalIndex;
use crate::lock;
pub struct Repository {
pub path: String,
pub config: RepositoryConfig,
pub index: GlobalIndex,
pub master_key: Option<[u8; 32]>,
lock_id: Option<String>,
}
impl Repository {
/// Initialize a new repository at the given path.
pub async fn init(repo_path: &str, passphrase: Option<&str>) -> Result<Self, ArchiveError> {
let path = Path::new(repo_path);
// Check if repo already exists
let config_path = path.join("config.json");
if config_path.exists() {
return Err(ArchiveError::Config("Repository already exists at this path".to_string()));
}
// Create directory structure
tokio::fs::create_dir_all(path).await?;
for dir in &["packs/data", "packs/parity", "snapshots", "index", "locks", "keys"] {
tokio::fs::create_dir_all(path.join(dir)).await?;
}
// Create config
let encryption_config = if passphrase.is_some() {
Some(EncryptionConfig::default())
} else {
None
};
let config = RepositoryConfig::new(encryption_config);
let config_json = serde_json::to_string_pretty(&config)?;
tokio::fs::write(&config_path, config_json).await?;
// Set up encryption if passphrase provided
let master_key = if let Some(pass) = passphrase {
let master = encryption::generate_master_key();
let salt = encryption::generate_salt();
let kdf_params = KdfParams::default();
let kek = encryption::derive_key(pass, &salt, &kdf_params)?;
let wrapped = encryption::wrap_key(&master, &kek)?;
let key_id = uuid::Uuid::new_v4().to_string();
let key_file = KeyFile {
id: key_id.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
kdf: "argon2id".to_string(),
kdf_salt: hex::encode(salt),
kdf_params,
encrypted_key: hex::encode(&wrapped.encrypted_key),
nonce: hex::encode(wrapped.nonce),
auth_tag: String::new(), // tag is appended to encrypted_key in AES-GCM
};
let key_path = path.join("keys").join(format!("{}.key", key_id));
let json = serde_json::to_string_pretty(&key_file)?;
tokio::fs::write(&key_path, json).await?;
Some(master)
} else {
None
};
tracing::info!("Initialized repository at {}", repo_path);
Ok(Self {
path: repo_path.to_string(),
config,
index: GlobalIndex::new(),
master_key,
lock_id: None,
})
}
/// Open an existing repository.
pub async fn open(repo_path: &str, passphrase: Option<&str>) -> Result<Self, ArchiveError> {
let path = Path::new(repo_path);
let config_path = path.join("config.json");
if !config_path.exists() {
return Err(ArchiveError::InvalidRepo(format!(
"No config.json found at {}",
repo_path
)));
}
// Load config
let config_data = tokio::fs::read_to_string(&config_path).await?;
let config: RepositoryConfig = serde_json::from_str(&config_data)
.map_err(|e| ArchiveError::Config(format!("Failed to parse config: {}", e)))?;
// Break stale locks
let broken = lock::check_and_break_stale(repo_path).await?;
if broken > 0 {
tracing::warn!("Broke {} stale lock(s)", broken);
}
// Load encryption key if encrypted
let master_key = if config.encryption.is_some() {
let pass = passphrase.ok_or_else(|| {
ArchiveError::Encryption("Repository is encrypted but no passphrase provided".to_string())
})?;
let key = load_master_key(repo_path, pass).await?;
Some(key)
} else {
None
};
// Load global index
let index = GlobalIndex::load(repo_path).await?;
tracing::info!("Opened repository at {} ({} indexed chunks)", repo_path, index.len());
Ok(Self {
path: repo_path.to_string(),
config,
index,
master_key,
lock_id: None,
})
}
/// Acquire a write lock.
pub async fn acquire_lock(&mut self, operation: &str) -> Result<(), ArchiveError> {
let entry = lock::acquire(&self.path, operation).await?;
self.lock_id = Some(entry.lock_id);
Ok(())
}
/// Release the write lock.
pub async fn release_lock(&mut self) -> Result<(), ArchiveError> {
if let Some(ref lock_id) = self.lock_id {
lock::release(&self.path, lock_id).await?;
self.lock_id = None;
}
Ok(())
}
/// Rebuild the global index from pack .idx files.
pub async fn reindex(&mut self) -> Result<usize, ArchiveError> {
self.index = GlobalIndex::rebuild_from_packs(&self.path).await?;
let count = self.index.len();
// Save as a single compacted segment
self.index.compact(&self.path).await?;
Ok(count)
}
/// Close the repository: save index, release lock.
pub async fn close(mut self) -> Result<(), ArchiveError> {
self.index.save_segment(&self.path).await?;
self.release_lock().await?;
tracing::info!("Closed repository at {}", self.path);
Ok(())
}
}
/// Load the master key from the first key file found in keys/.
async fn load_master_key(repo_path: &str, passphrase: &str) -> Result<[u8; 32], ArchiveError> {
let keys_dir = Path::new(repo_path).join("keys");
let mut dir = tokio::fs::read_dir(&keys_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("key") {
continue;
}
let data = tokio::fs::read_to_string(&path).await?;
let key_file: KeyFile = serde_json::from_str(&data)
.map_err(|e| ArchiveError::Encryption(format!("Failed to parse key file: {}", e)))?;
let salt = hex::decode(&key_file.kdf_salt)
.map_err(|e| ArchiveError::Encryption(format!("Invalid salt: {}", e)))?;
let kek = encryption::derive_key(passphrase, &salt, &key_file.kdf_params)?;
let encrypted_key = hex::decode(&key_file.encrypted_key)
.map_err(|e| ArchiveError::Encryption(format!("Invalid encrypted key: {}", e)))?;
let nonce_bytes = hex::decode(&key_file.nonce)
.map_err(|e| ArchiveError::Encryption(format!("Invalid nonce: {}", e)))?;
let mut nonce = [0u8; 12];
if nonce_bytes.len() >= 12 {
nonce.copy_from_slice(&nonce_bytes[..12]);
}
let wrapped = encryption::WrappedKey {
encrypted_key,
nonce,
};
match encryption::unwrap_key(&wrapped, &kek) {
Ok(master) => return Ok(master),
Err(_) => {
// Wrong passphrase for this key file — try next
continue;
}
}
}
Err(ArchiveError::Encryption("No key file could be unlocked with the provided passphrase".to_string()))
}

131
rust/src/restore.rs Normal file
View File

@@ -0,0 +1,131 @@
/// Restore pipeline: reads a snapshot manifest, looks up chunks in the global
/// index, reads from pack files, decrypts, decompresses, and writes to a Unix socket.
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;
use crate::compression;
use crate::encryption;
use crate::error::ArchiveError;
use crate::hasher;
use crate::pack_reader;
use crate::repository::Repository;
use crate::snapshot;
/// Restore a snapshot (or a specific item) to a Unix socket.
pub async fn restore(
repo: &Repository,
snapshot_id: &str,
socket_path: &str,
item_name: Option<&str>,
) -> Result<(), ArchiveError> {
// Load snapshot manifest
let snap = snapshot::load_snapshot(&repo.path, snapshot_id).await?;
// Determine which items to restore
let items_to_restore: Vec<&snapshot::SnapshotItem> = if let Some(name) = item_name {
snap.items.iter()
.filter(|i| i.name == name)
.collect()
} else {
snap.items.iter().collect()
};
if items_to_restore.is_empty() {
return Err(ArchiveError::NotFound(format!(
"No items found in snapshot {}{}",
snapshot_id,
item_name.map(|n| format!(" with name '{}'", n)).unwrap_or_default()
)));
}
// Connect to the Unix socket where TypeScript will read the restored data
let mut stream = UnixStream::connect(socket_path).await
.map_err(|e| ArchiveError::Io(e))?;
tracing::info!("Connected to restore socket: {}", socket_path);
let mut restored_bytes: u64 = 0;
let mut chunks_read: u64 = 0;
for item in items_to_restore {
for hash_hex in &item.chunks {
// Look up chunk in global index
let index_entry = repo.index.get(hash_hex)
.ok_or_else(|| ArchiveError::NotFound(format!(
"Chunk {} not found in index", hash_hex
)))?;
// Determine pack file path
let shard = &index_entry.pack_id[..2];
let pack_path = std::path::Path::new(&repo.path)
.join("packs")
.join("data")
.join(shard)
.join(format!("{}.pack", index_entry.pack_id));
// Read chunk data from pack
let stored_data = pack_reader::read_chunk(
&pack_path,
index_entry.offset,
index_entry.compressed_size,
).await?;
// Decrypt if encrypted
let compressed = if let Some(ref key) = repo.master_key {
// We need the nonce. Read it from the IDX file.
let idx_path = std::path::Path::new(&repo.path)
.join("packs")
.join("data")
.join(shard)
.join(format!("{}.idx", index_entry.pack_id));
let entries = pack_reader::load_idx(&idx_path).await?;
let hash_bytes = hasher::hex_to_hash(hash_hex)
.map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?;
let idx_entry = pack_reader::find_in_idx(&entries, &hash_bytes)
.ok_or_else(|| ArchiveError::NotFound(format!(
"Chunk {} not found in pack index {}", hash_hex, index_entry.pack_id
)))?;
encryption::decrypt_chunk(&stored_data, key, &idx_entry.nonce)?
} else {
stored_data
};
// Decompress
let plaintext = compression::decompress(&compressed)?;
// Verify hash
let actual_hash = hasher::hash_chunk(&plaintext);
let expected_hash = hasher::hex_to_hash(hash_hex)
.map_err(|_| ArchiveError::Corruption(format!("Invalid hash: {}", hash_hex)))?;
if actual_hash != expected_hash {
return Err(ArchiveError::Corruption(format!(
"Hash mismatch for chunk {}: expected {}, got {}",
hash_hex,
hash_hex,
hasher::hash_to_hex(&actual_hash)
)));
}
// Write to output socket
stream.write_all(&plaintext).await?;
restored_bytes += plaintext.len() as u64;
chunks_read += 1;
}
}
// Close the write side
stream.shutdown().await?;
tracing::info!(
"Restore complete: {} bytes, {} chunks from snapshot {}",
restored_bytes, chunks_read, snapshot_id
);
Ok(())
}

195
rust/src/snapshot.rs Normal file
View File

@@ -0,0 +1,195 @@
/// Snapshot manifest management.
///
/// A snapshot represents one complete backup operation, recording all metadata
/// needed to reconstruct the original data.
use std::collections::HashMap;
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Snapshot {
pub id: String,
pub version: u32,
pub created_at: String,
pub tags: HashMap<String, String>,
pub original_size: u64,
pub stored_size: u64,
pub chunk_count: u64,
pub new_chunks: u64,
pub reused_chunks: u64,
pub items: Vec<SnapshotItem>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SnapshotItem {
pub name: String,
#[serde(rename = "type")]
pub item_type: String,
pub size: u64,
pub chunks: Vec<String>, // hex-encoded SHA-256 hashes
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SnapshotFilter {
#[serde(skip_serializing_if = "Option::is_none")]
pub tags: Option<HashMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub after: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub before: Option<String>,
}
impl Snapshot {
/// Create a new snapshot with auto-generated ID.
pub fn new(
items: Vec<SnapshotItem>,
tags: HashMap<String, String>,
original_size: u64,
stored_size: u64,
new_chunks: u64,
reused_chunks: u64,
) -> Self {
let now = chrono::Utc::now();
let short_id = &uuid::Uuid::new_v4().to_string()[..8];
let id = format!("{}-{}", now.format("%Y%m%dT%H%M%SZ"), short_id);
let chunk_count = items.iter().map(|i| i.chunks.len() as u64).sum();
Self {
id,
version: 1,
created_at: now.to_rfc3339(),
tags,
original_size,
stored_size,
chunk_count,
new_chunks,
reused_chunks,
items,
}
}
}
/// Save a snapshot manifest to disk.
pub async fn save_snapshot(repo_path: &str, snapshot: &Snapshot) -> Result<(), ArchiveError> {
let snapshots_dir = Path::new(repo_path).join("snapshots");
tokio::fs::create_dir_all(&snapshots_dir).await?;
let path = snapshots_dir.join(format!("{}.json", snapshot.id));
let tmp_path = snapshots_dir.join(format!("{}.json.tmp", snapshot.id));
let json = serde_json::to_string_pretty(snapshot)?;
tokio::fs::write(&tmp_path, json).await?;
tokio::fs::rename(&tmp_path, &path).await?;
tracing::info!("Saved snapshot {} ({} chunks, {} new)", snapshot.id, snapshot.chunk_count, snapshot.new_chunks);
Ok(())
}
/// Load a snapshot manifest from disk.
pub async fn load_snapshot(repo_path: &str, snapshot_id: &str) -> Result<Snapshot, ArchiveError> {
let path = Path::new(repo_path).join("snapshots").join(format!("{}.json", snapshot_id));
if !path.exists() {
return Err(ArchiveError::NotFound(format!("Snapshot not found: {}", snapshot_id)));
}
let data = tokio::fs::read_to_string(&path).await?;
let snapshot: Snapshot = serde_json::from_str(&data)
.map_err(|e| ArchiveError::Corruption(format!("Failed to parse snapshot {}: {}", snapshot_id, e)))?;
Ok(snapshot)
}
/// List all snapshots, optionally filtered.
pub async fn list_snapshots(repo_path: &str, filter: Option<&SnapshotFilter>) -> Result<Vec<Snapshot>, ArchiveError> {
let snapshots_dir = Path::new(repo_path).join("snapshots");
if !snapshots_dir.exists() {
return Ok(Vec::new());
}
let mut snapshots = Vec::new();
let mut dir = tokio::fs::read_dir(&snapshots_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("json") {
continue;
}
let data = match tokio::fs::read_to_string(&path).await {
Ok(d) => d,
Err(_) => continue,
};
let snapshot: Snapshot = match serde_json::from_str(&data) {
Ok(s) => s,
Err(_) => continue,
};
if let Some(f) = filter {
if !matches_filter(&snapshot, f) {
continue;
}
}
snapshots.push(snapshot);
}
// Sort by creation time (newest first)
snapshots.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(snapshots)
}
/// Delete a snapshot manifest.
pub async fn delete_snapshot(repo_path: &str, snapshot_id: &str) -> Result<(), ArchiveError> {
let path = Path::new(repo_path).join("snapshots").join(format!("{}.json", snapshot_id));
if path.exists() {
tokio::fs::remove_file(&path).await?;
tracing::info!("Deleted snapshot {}", snapshot_id);
}
Ok(())
}
/// Get all chunk hashes referenced by a list of snapshots.
pub fn referenced_chunks(snapshots: &[Snapshot]) -> std::collections::HashSet<String> {
let mut hashes = std::collections::HashSet::new();
for snapshot in snapshots {
for item in &snapshot.items {
for hash in &item.chunks {
hashes.insert(hash.clone());
}
}
}
hashes
}
fn matches_filter(snapshot: &Snapshot, filter: &SnapshotFilter) -> bool {
// Check tag filter
if let Some(ref tags) = filter.tags {
for (key, value) in tags {
match snapshot.tags.get(key) {
Some(v) if v == value => {},
_ => return false,
}
}
}
// Check date range
if let Some(ref after) = filter.after {
if snapshot.created_at < *after {
return false;
}
}
if let Some(ref before) = filter.before {
if snapshot.created_at > *before {
return false;
}
}
true
}

290
rust/src/verify.rs Normal file
View File

@@ -0,0 +1,290 @@
/// Repository verification at three levels.
use std::path::Path;
use serde::Serialize;
use crate::compression;
use crate::encryption;
use crate::error::ArchiveError;
use crate::hasher;
use crate::pack_reader;
use crate::pack_writer::FLAG_ENCRYPTED;
use crate::repository::Repository;
use crate::snapshot;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VerifyResult {
pub ok: bool,
pub errors: Vec<VerifyError>,
pub stats: VerifyStats,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VerifyError {
pub pack: Option<String>,
pub chunk: Option<String>,
pub snapshot: Option<String>,
pub error: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VerifyStats {
pub packs_checked: u64,
pub chunks_checked: u64,
pub snapshots_checked: u64,
}
/// Verify repository integrity at the given level.
pub async fn verify(repo: &Repository, level: &str) -> Result<VerifyResult, ArchiveError> {
let mut errors = Vec::new();
let mut stats = VerifyStats {
packs_checked: 0,
chunks_checked: 0,
snapshots_checked: 0,
};
// Level 1: Quick — check snapshots and index consistency
verify_snapshots(repo, &mut errors, &mut stats).await?;
if level == "standard" || level == "full" {
// Level 2: Standard — check pack headers
verify_pack_headers(repo, &mut errors, &mut stats).await?;
}
if level == "full" {
// Level 3: Full — read and verify every chunk
verify_all_chunks(repo, &mut errors, &mut stats).await?;
}
Ok(VerifyResult {
ok: errors.is_empty(),
errors,
stats,
})
}
/// Quick verification: check that all chunk hashes in snapshots exist in the index.
async fn verify_snapshots(
repo: &Repository,
errors: &mut Vec<VerifyError>,
stats: &mut VerifyStats,
) -> Result<(), ArchiveError> {
let snapshots = snapshot::list_snapshots(&repo.path, None).await?;
for snap in &snapshots {
stats.snapshots_checked += 1;
for item in &snap.items {
for hash_hex in &item.chunks {
if !repo.index.has(hash_hex) {
errors.push(VerifyError {
pack: None,
chunk: Some(hash_hex.clone()),
snapshot: Some(snap.id.clone()),
error: format!("Chunk {} referenced by snapshot {} not found in index", hash_hex, snap.id),
});
}
}
}
}
Ok(())
}
/// Standard verification: check pack file headers.
async fn verify_pack_headers(
repo: &Repository,
errors: &mut Vec<VerifyError>,
stats: &mut VerifyStats,
) -> Result<(), ArchiveError> {
let packs_dir = Path::new(&repo.path).join("packs").join("data");
if !packs_dir.exists() {
return Ok(());
}
let pack_files = find_pack_files(&packs_dir).await?;
for pack_path in &pack_files {
stats.packs_checked += 1;
match pack_reader::verify_pack_header(pack_path).await {
Ok(true) => {},
Ok(false) => {
let pack_name = pack_path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
errors.push(VerifyError {
pack: Some(pack_name),
chunk: None,
snapshot: None,
error: "Pack header verification failed (bad magic, version, or checksum)".to_string(),
});
},
Err(e) => {
let pack_name = pack_path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
errors.push(VerifyError {
pack: Some(pack_name),
chunk: None,
snapshot: None,
error: format!("Failed to read pack: {}", e),
});
}
}
}
Ok(())
}
/// Full verification: read every chunk, decompress, and verify hash.
async fn verify_all_chunks(
repo: &Repository,
errors: &mut Vec<VerifyError>,
stats: &mut VerifyStats,
) -> Result<(), ArchiveError> {
let packs_dir = Path::new(&repo.path).join("packs").join("data");
if !packs_dir.exists() {
return Ok(());
}
let idx_files = find_idx_files(&packs_dir).await?;
for idx_path in &idx_files {
let entries = match pack_reader::load_idx(idx_path).await {
Ok(e) => e,
Err(e) => {
errors.push(VerifyError {
pack: None,
chunk: None,
snapshot: None,
error: format!("Failed to load idx {}: {}", idx_path.display(), e),
});
continue;
}
};
let pack_path = idx_path.with_extension("pack");
if !pack_path.exists() {
let pack_name = idx_path.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
errors.push(VerifyError {
pack: Some(pack_name),
chunk: None,
snapshot: None,
error: "Pack file missing for existing idx file".to_string(),
});
continue;
}
for entry in &entries {
stats.chunks_checked += 1;
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
// Read chunk from pack
let stored_data = match pack_reader::read_chunk(&pack_path, entry.offset, entry.compressed_size).await {
Ok(d) => d,
Err(e) => {
errors.push(VerifyError {
pack: None,
chunk: Some(hash_hex),
snapshot: None,
error: format!("Failed to read chunk: {}", e),
});
continue;
}
};
// Decrypt if encrypted
let compressed = if entry.flags & FLAG_ENCRYPTED != 0 {
if let Some(ref key) = repo.master_key {
match encryption::decrypt_chunk(&stored_data, key, &entry.nonce) {
Ok(d) => d,
Err(e) => {
errors.push(VerifyError {
pack: None,
chunk: Some(hash_hex),
snapshot: None,
error: format!("Decryption failed: {}", e),
});
continue;
}
}
} else {
errors.push(VerifyError {
pack: None,
chunk: Some(hash_hex),
snapshot: None,
error: "Chunk is encrypted but no key available".to_string(),
});
continue;
}
} else {
stored_data
};
// Decompress
let plaintext = match compression::decompress(&compressed) {
Ok(d) => d,
Err(e) => {
errors.push(VerifyError {
pack: None,
chunk: Some(hash_hex),
snapshot: None,
error: format!("Decompression failed: {}", e),
});
continue;
}
};
// Verify hash
if !hasher::verify_hash(&plaintext, &entry.content_hash) {
errors.push(VerifyError {
pack: None,
chunk: Some(hash_hex),
snapshot: None,
error: "Hash mismatch after decompress/decrypt".to_string(),
});
}
}
}
Ok(())
}
async fn find_pack_files(dir: &Path) -> Result<Vec<std::path::PathBuf>, ArchiveError> {
find_files_by_extension(dir, "pack").await
}
async fn find_idx_files(dir: &Path) -> Result<Vec<std::path::PathBuf>, ArchiveError> {
find_files_by_extension(dir, "idx").await
}
async fn find_files_by_extension(dir: &Path, ext: &str) -> Result<Vec<std::path::PathBuf>, ArchiveError> {
let mut files = Vec::new();
let mut stack = vec![dir.to_path_buf()];
while let Some(current) = stack.pop() {
if !current.exists() {
continue;
}
let mut entries = tokio::fs::read_dir(&current).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.extension().and_then(|e| e.to_str()) == Some(ext) {
files.push(path);
}
}
}
Ok(files)
}