feat(rust-core): add zstd chunk compression support and rewrite partially referenced packs during prune

This commit is contained in:
2026-03-22 08:47:16 +00:00
parent 0bbe462153
commit 5672ea29ff
14 changed files with 734 additions and 23 deletions

67
rust/Cargo.lock generated
View File

@@ -207,6 +207,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
dependencies = [
"find-msvc-tools",
"jobserver",
"libc",
"shlex",
]
@@ -308,6 +310,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"uuid",
"zstd",
]
[[package]]
@@ -424,6 +427,18 @@ dependencies = [
"wasi",
]
[[package]]
name = "getrandom"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"libc",
"r-efi 5.3.0",
"wasip2",
]
[[package]]
name = "getrandom"
version = "0.4.2"
@@ -432,7 +447,7 @@ checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"r-efi 6.0.0",
"wasip2",
"wasip3",
]
@@ -555,6 +570,16 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682"
[[package]]
name = "jobserver"
version = "0.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33"
dependencies = [
"getrandom 0.3.4",
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.91"
@@ -750,6 +775,12 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkg-config"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "polyval"
version = "0.6.2"
@@ -799,6 +830,12 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r-efi"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "r-efi"
version = "6.0.0"
@@ -1512,3 +1549,31 @@ name = "zmij"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa"
[[package]]
name = "zstd"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.16+zstd.1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748"
dependencies = [
"cc",
"pkg-config",
]

View File

@@ -25,6 +25,7 @@ argon2 = "0.5"
# Compression
flate2 = "1"
zstd = "0.13"
# Utilities
uuid = { version = "1", features = ["v4"] }

View File

@@ -3,8 +3,63 @@ use flate2::read::{GzDecoder, GzEncoder};
use std::io::Read;
use crate::error::ArchiveError;
/// Gzip compress data.
pub fn compress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
/// Supported compression algorithms.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum CompressionAlgorithm {
Gzip,
Zstd,
}
impl CompressionAlgorithm {
pub fn from_str(s: &str) -> Self {
match s {
"zstd" => Self::Zstd,
_ => Self::Gzip,
}
}
/// Map to the flag bits stored in IDX entries (bits 1-2).
pub fn to_flags(self) -> u32 {
match self {
Self::Gzip => 0x02, // FLAG_GZIP
Self::Zstd => 0x04, // FLAG_ZSTD
}
}
/// Determine algorithm from IDX flags.
pub fn from_flags(flags: u32) -> Self {
if flags & 0x04 != 0 {
Self::Zstd
} else {
Self::Gzip
}
}
}
/// Compress data with the specified algorithm.
pub fn compress(data: &[u8], algo: CompressionAlgorithm) -> Result<Vec<u8>, ArchiveError> {
match algo {
CompressionAlgorithm::Gzip => compress_gzip(data),
CompressionAlgorithm::Zstd => compress_zstd(data),
}
}
/// Decompress data with the specified algorithm.
pub fn decompress(data: &[u8], algo: CompressionAlgorithm) -> Result<Vec<u8>, ArchiveError> {
match algo {
CompressionAlgorithm::Gzip => decompress_gzip(data),
CompressionAlgorithm::Zstd => decompress_zstd(data),
}
}
/// Decompress data by detecting algorithm from flags.
pub fn decompress_by_flags(data: &[u8], flags: u32) -> Result<Vec<u8>, ArchiveError> {
decompress(data, CompressionAlgorithm::from_flags(flags))
}
// ==================== Gzip ====================
fn compress_gzip(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let mut encoder = GzEncoder::new(data, Compression::default());
let mut compressed = Vec::new();
encoder.read_to_end(&mut compressed)
@@ -12,8 +67,7 @@ pub fn compress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
Ok(compressed)
}
/// Gzip decompress data.
pub fn decompress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
fn decompress_gzip(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)
@@ -21,23 +75,57 @@ pub fn decompress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
Ok(decompressed)
}
// ==================== Zstd ====================
fn compress_zstd(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
zstd::encode_all(data, 3) // level 3 = good balance of speed/ratio
.map_err(|e| ArchiveError::Io(e))
}
fn decompress_zstd(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
zstd::decode_all(data)
.map_err(|e| ArchiveError::Io(e))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_roundtrip() {
fn test_gzip_roundtrip() {
let data = b"Hello, this is test data for compression!";
let compressed = compress(data).unwrap();
let decompressed = decompress(&compressed).unwrap();
let compressed = compress(data, CompressionAlgorithm::Gzip).unwrap();
let decompressed = decompress(&compressed, CompressionAlgorithm::Gzip).unwrap();
assert_eq!(data.as_slice(), decompressed.as_slice());
}
#[test]
fn test_zstd_roundtrip() {
let data = b"Hello, this is test data for zstd compression!";
let compressed = compress(data, CompressionAlgorithm::Zstd).unwrap();
let decompressed = decompress(&compressed, CompressionAlgorithm::Zstd).unwrap();
assert_eq!(data.as_slice(), decompressed.as_slice());
}
#[test]
fn test_compression_reduces_size() {
// Highly compressible data
let data = vec![b'A'; 10000];
let compressed = compress(&data).unwrap();
assert!(compressed.len() < data.len());
let gzip = compress(&data, CompressionAlgorithm::Gzip).unwrap();
let zstd = compress(&data, CompressionAlgorithm::Zstd).unwrap();
assert!(gzip.len() < data.len());
assert!(zstd.len() < data.len());
}
#[test]
fn test_decompress_by_flags() {
let data = b"flag-based decompression test";
let gzip_compressed = compress(data, CompressionAlgorithm::Gzip).unwrap();
let result = decompress_by_flags(&gzip_compressed, 0x02).unwrap();
assert_eq!(data.as_slice(), result.as_slice());
let zstd_compressed = compress(data, CompressionAlgorithm::Zstd).unwrap();
let result = decompress_by_flags(&zstd_compressed, 0x04).unwrap();
assert_eq!(data.as_slice(), result.as_slice());
}
}

View File

@@ -20,6 +20,8 @@ pub struct IndexEntry {
pub plaintext_size: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nonce: Option<String>,
#[serde(default)]
pub flags: u32,
}
/// An index segment stored on disk.
@@ -198,6 +200,7 @@ impl GlobalIndex {
compressed_size: entry.compressed_size,
plaintext_size: entry.plaintext_size,
nonce,
flags: entry.flags,
});
}
}

View File

@@ -10,7 +10,7 @@ use tokio::net::UnixStream;
use serde::{Deserialize, Serialize};
use crate::chunker::{FastCdc, StreamingChunker};
use crate::compression;
use crate::compression::{self, CompressionAlgorithm};
use crate::encryption;
use crate::error::ArchiveError;
use crate::parity::{ParityManager, ParityConfig};
@@ -41,6 +41,7 @@ struct PendingChunk {
compressed_size: u32,
plaintext_size: u32,
nonce: Option<String>,
flags: u32,
}
/// Run the single-item ingest pipeline.
@@ -295,8 +296,9 @@ async fn process_chunk(
return Ok(());
}
let compressed = compression::compress(chunk_data)?;
let mut flags = FLAG_GZIP;
let comp_algo = CompressionAlgorithm::from_str(&repo.config.compression);
let compressed = compression::compress(chunk_data, comp_algo)?;
let mut flags = comp_algo.to_flags();
let plaintext_size = chunk_data.len() as u32;
let (stored_data, nonce) = if let Some(ref key) = repo.master_key {
@@ -327,6 +329,7 @@ async fn process_chunk(
compressed_size,
plaintext_size,
nonce: nonce_hex,
flags,
});
pack_writer.add_chunk(hash, &stored_data, plaintext_size, nonce, flags);
@@ -353,6 +356,7 @@ async fn finalize_pack(
compressed_size: pending.compressed_size,
plaintext_size: pending.plaintext_size,
nonce: pending.nonce,
flags: pending.flags,
});
}

View File

@@ -30,6 +30,7 @@ pub const PACK_HEADER_SIZE: usize = 32;
/// Flags stored in IDX entries.
pub const FLAG_ENCRYPTED: u32 = 0x01;
pub const FLAG_GZIP: u32 = 0x02;
pub const FLAG_ZSTD: u32 = 0x04;
/// An entry in the pack index.
#[derive(Debug, Clone)]

View File

@@ -9,6 +9,10 @@ use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
use crate::global_index::IndexEntry;
use crate::pack_reader;
use crate::pack_writer::PackWriter;
use crate::hasher;
use crate::repository::Repository;
use crate::snapshot;
@@ -30,6 +34,7 @@ pub struct RetentionPolicy {
pub struct PruneResult {
pub removed_snapshots: u32,
pub removed_packs: u32,
pub rewritten_packs: u32,
pub freed_bytes: u64,
pub dry_run: bool,
}
@@ -61,6 +66,7 @@ async fn do_prune(
let mut result = PruneResult {
removed_snapshots: 0,
removed_packs: 0,
rewritten_packs: 0,
freed_bytes: 0,
dry_run,
};
@@ -141,22 +147,162 @@ async fn do_prune(
}
}
// Phase 3: Rewrite partially-referenced packs to reclaim wasted space
if !dry_run {
rewrite_partial_packs(repo, &referenced_chunks, &mut result).await?;
}
// Compact index after pruning
if !dry_run && result.removed_packs > 0 {
if !dry_run && (result.removed_packs > 0 || result.rewritten_packs > 0) {
repo.index.compact(&repo.path).await?;
}
tracing::info!(
"Prune {}: removed {} snapshots, {} packs, freed {} bytes",
"Prune {}: removed {} snapshots, {} packs, rewrote {} packs, freed {} bytes",
if dry_run { "(dry run)" } else { "complete" },
result.removed_snapshots,
result.removed_packs,
result.rewritten_packs,
result.freed_bytes
);
Ok(result)
}
/// Rewrite packs that contain a mix of referenced and unreferenced chunks.
/// Only rewrites packs where >25% of data is unreferenced (to avoid churn).
async fn rewrite_partial_packs(
repo: &mut Repository,
referenced_chunks: &HashSet<String>,
result: &mut PruneResult,
) -> Result<(), ArchiveError> {
let all_packs = find_all_pack_ids(&repo.path).await?;
for pack_id in &all_packs {
let shard = &pack_id[..std::cmp::min(2, pack_id.len())];
let idx_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.idx", pack_id));
let pack_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.pack", pack_id));
if !idx_path.exists() || !pack_path.exists() {
continue;
}
let entries = match pack_reader::load_idx(&idx_path).await {
Ok(e) => e,
Err(_) => continue,
};
// Count referenced vs unreferenced chunks in this pack
let mut referenced_count = 0usize;
let mut unreferenced_bytes = 0u64;
let mut total_bytes = 0u64;
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
total_bytes += entry.compressed_size as u64;
if referenced_chunks.contains(&hash_hex) {
referenced_count += 1;
} else {
unreferenced_bytes += entry.compressed_size as u64;
}
}
// Skip if all chunks are referenced (nothing to reclaim)
if referenced_count == entries.len() {
continue;
}
// Skip if all chunks are unreferenced (already handled by Phase 2)
if referenced_count == 0 {
continue;
}
// Skip if waste is less than 25% (not worth the I/O)
if total_bytes > 0 && (unreferenced_bytes * 100 / total_bytes) < 25 {
continue;
}
tracing::info!(
"Rewriting pack {} ({}/{} chunks referenced, {} bytes reclaimable)",
pack_id, referenced_count, entries.len(), unreferenced_bytes
);
// Read referenced chunks and write them to a new pack
let mut new_pack_writer = PackWriter::new(repo.config.pack_target_size);
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
if !referenced_chunks.contains(&hash_hex) {
continue; // Skip unreferenced chunks
}
// Read chunk data from old pack
let chunk_data = pack_reader::read_chunk(
&pack_path, entry.offset, entry.compressed_size,
).await?;
new_pack_writer.add_chunk(
entry.content_hash,
&chunk_data,
entry.plaintext_size,
entry.nonce,
entry.flags,
);
}
// Finalize the new pack
if !new_pack_writer.is_empty() {
let new_pack_info = new_pack_writer.finalize(&repo.path).await?;
// Update global index: point referenced chunks to the new pack
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
if referenced_chunks.contains(&hash_hex) {
let nonce = if entry.nonce != [0u8; 12] {
Some(hex::encode(entry.nonce))
} else {
None
};
repo.index.add_entry(hash_hex, IndexEntry {
pack_id: new_pack_info.pack_id.clone(),
offset: entry.offset, // Note: offset in the new pack may differ
compressed_size: entry.compressed_size,
plaintext_size: entry.plaintext_size,
nonce,
flags: entry.flags,
});
}
}
}
// Delete old pack + idx
let old_size = tokio::fs::metadata(&pack_path).await
.map(|m| m.len()).unwrap_or(0);
let old_idx_size = tokio::fs::metadata(&idx_path).await
.map(|m| m.len()).unwrap_or(0);
let _ = tokio::fs::remove_file(&pack_path).await;
let _ = tokio::fs::remove_file(&idx_path).await;
// Remove old pack entries from index
repo.index.remove_pack_entries(pack_id);
result.freed_bytes += unreferenced_bytes;
result.rewritten_packs += 1;
tracing::info!(
"Rewrote pack {} -> saved {} bytes",
pack_id, unreferenced_bytes
);
}
Ok(())
}
/// Determine which snapshot IDs to keep based on retention policy.
fn determine_kept_snapshots(
snapshots: &[snapshot::Snapshot],

View File

@@ -10,7 +10,7 @@ use crate::encryption;
use crate::error::ArchiveError;
use crate::hasher;
use crate::pack_reader;
use crate::pack_writer::IdxEntry;
use crate::pack_writer::{IdxEntry, FLAG_ENCRYPTED};
use crate::repository::Repository;
use crate::snapshot;
@@ -76,8 +76,15 @@ pub async fn restore(
index_entry.compressed_size,
).await?;
// Get flags for this chunk (determines compression algorithm)
let chunk_flags = index_entry.flags;
// Decrypt if encrypted
let compressed = if let Some(ref key) = repo.master_key {
let compressed = if chunk_flags & FLAG_ENCRYPTED != 0 {
let key = repo.master_key.as_ref().ok_or_else(|| {
ArchiveError::Encryption("Chunk is encrypted but no key available".to_string())
})?;
// Try to get nonce from the global index first (fast path)
let nonce = if let Some(ref nonce_hex) = index_entry.nonce {
let nonce_bytes = hex::decode(nonce_hex)
@@ -117,8 +124,8 @@ pub async fn restore(
stored_data
};
// Decompress
let plaintext = compression::decompress(&compressed)?;
// Decompress using flags to determine algorithm
let plaintext = compression::decompress_by_flags(&compressed, chunk_flags)?;
// Verify hash
let actual_hash = hasher::hash_chunk(&plaintext);

View File

@@ -231,7 +231,7 @@ async fn verify_all_chunks(
};
// Decompress
let plaintext = match compression::decompress(&compressed) {
let plaintext = match compression::decompress_by_flags(&compressed, entry.flags) {
Ok(d) => d,
Err(e) => {
errors.push(VerifyError {