Files
containerarchive/rust/src/prune.rs

424 lines
13 KiB
Rust

/// Retention-based pruning and garbage collection.
///
/// Prune determines which snapshots to keep based on retention policies,
/// deletes expired snapshots, and removes pack files where ALL chunks
/// are unreferenced (whole-pack GC only).
use std::collections::HashSet;
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
use crate::global_index::IndexEntry;
use crate::pack_reader;
use crate::pack_writer::PackWriter;
use crate::hasher;
use crate::repository::Repository;
use crate::snapshot;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct RetentionPolicy {
#[serde(default)]
pub keep_last: Option<u32>,
#[serde(default)]
pub keep_days: Option<u32>,
#[serde(default)]
pub keep_weeks: Option<u32>,
#[serde(default)]
pub keep_months: Option<u32>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PruneResult {
pub removed_snapshots: u32,
pub removed_packs: u32,
pub rewritten_packs: u32,
pub freed_bytes: u64,
pub dry_run: bool,
}
pub async fn prune(
repo: &mut Repository,
retention: &RetentionPolicy,
dry_run: bool,
) -> Result<PruneResult, ArchiveError> {
// Acquire lock
if !dry_run {
repo.acquire_lock("prune").await?;
}
let result = do_prune(repo, retention, dry_run).await;
if !dry_run {
repo.release_lock().await?;
}
result
}
async fn do_prune(
repo: &mut Repository,
retention: &RetentionPolicy,
dry_run: bool,
) -> Result<PruneResult, ArchiveError> {
let mut result = PruneResult {
removed_snapshots: 0,
removed_packs: 0,
rewritten_packs: 0,
freed_bytes: 0,
dry_run,
};
// Load all snapshots
let mut snapshots = snapshot::list_snapshots(&repo.path, None).await?;
// Sort by creation time (newest first)
snapshots.sort_by(|a, b| b.created_at.cmp(&a.created_at));
// Determine which snapshots to keep
let keep_ids = determine_kept_snapshots(&snapshots, retention);
// Phase 1: Remove expired snapshots
let to_remove: Vec<_> = snapshots.iter()
.filter(|s| !keep_ids.contains(&s.id))
.collect();
result.removed_snapshots = to_remove.len() as u32;
if !dry_run {
for snap in &to_remove {
snapshot::delete_snapshot(&repo.path, &snap.id).await?;
tracing::info!("Removed snapshot {}", snap.id);
}
}
// Phase 2: Find and remove unreferenced packs
// Reload remaining snapshots
let remaining_snapshots = if dry_run {
snapshots.iter()
.filter(|s| keep_ids.contains(&s.id))
.cloned()
.collect::<Vec<_>>()
} else {
snapshot::list_snapshots(&repo.path, None).await?
};
let referenced_chunks = snapshot::referenced_chunks(&remaining_snapshots);
let referenced_packs = find_referenced_packs(repo, &referenced_chunks);
// Find all pack IDs on disk
let all_packs = find_all_pack_ids(&repo.path).await?;
for pack_id in &all_packs {
if !referenced_packs.contains(pack_id) {
// This pack is fully unreferenced — delete it
let shard = &pack_id[..std::cmp::min(2, pack_id.len())];
let pack_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.pack", pack_id));
let idx_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.idx", pack_id));
if pack_path.exists() {
if let Ok(meta) = tokio::fs::metadata(&pack_path).await {
result.freed_bytes += meta.len();
}
}
if idx_path.exists() {
if let Ok(meta) = tokio::fs::metadata(&idx_path).await {
result.freed_bytes += meta.len();
}
}
if !dry_run {
let _ = tokio::fs::remove_file(&pack_path).await;
let _ = tokio::fs::remove_file(&idx_path).await;
// Remove entries from global index
repo.index.remove_pack_entries(pack_id);
tracing::info!("Removed pack {}", pack_id);
}
result.removed_packs += 1;
}
}
// Phase 3: Rewrite partially-referenced packs to reclaim wasted space
if !dry_run {
rewrite_partial_packs(repo, &referenced_chunks, &mut result).await?;
}
// Compact index after pruning
if !dry_run && (result.removed_packs > 0 || result.rewritten_packs > 0) {
repo.index.compact(&repo.path).await?;
}
tracing::info!(
"Prune {}: removed {} snapshots, {} packs, rewrote {} packs, freed {} bytes",
if dry_run { "(dry run)" } else { "complete" },
result.removed_snapshots,
result.removed_packs,
result.rewritten_packs,
result.freed_bytes
);
Ok(result)
}
/// Rewrite packs that contain a mix of referenced and unreferenced chunks.
/// Only rewrites packs where >25% of data is unreferenced (to avoid churn).
async fn rewrite_partial_packs(
repo: &mut Repository,
referenced_chunks: &HashSet<String>,
result: &mut PruneResult,
) -> Result<(), ArchiveError> {
let all_packs = find_all_pack_ids(&repo.path).await?;
for pack_id in &all_packs {
let shard = &pack_id[..std::cmp::min(2, pack_id.len())];
let idx_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.idx", pack_id));
let pack_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.pack", pack_id));
if !idx_path.exists() || !pack_path.exists() {
continue;
}
let entries = match pack_reader::load_idx(&idx_path).await {
Ok(e) => e,
Err(_) => continue,
};
// Count referenced vs unreferenced chunks in this pack
let mut referenced_count = 0usize;
let mut unreferenced_bytes = 0u64;
let mut total_bytes = 0u64;
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
total_bytes += entry.compressed_size as u64;
if referenced_chunks.contains(&hash_hex) {
referenced_count += 1;
} else {
unreferenced_bytes += entry.compressed_size as u64;
}
}
// Skip if all chunks are referenced (nothing to reclaim)
if referenced_count == entries.len() {
continue;
}
// Skip if all chunks are unreferenced (already handled by Phase 2)
if referenced_count == 0 {
continue;
}
// Skip if waste is less than 25% (not worth the I/O)
if total_bytes > 0 && (unreferenced_bytes * 100 / total_bytes) < 25 {
continue;
}
tracing::info!(
"Rewriting pack {} ({}/{} chunks referenced, {} bytes reclaimable)",
pack_id, referenced_count, entries.len(), unreferenced_bytes
);
// Read referenced chunks and write them to a new pack
let mut new_pack_writer = PackWriter::new(repo.config.pack_target_size);
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
if !referenced_chunks.contains(&hash_hex) {
continue; // Skip unreferenced chunks
}
// Read chunk data from old pack
let chunk_data = pack_reader::read_chunk(
&pack_path, entry.offset, entry.compressed_size,
).await?;
new_pack_writer.add_chunk(
entry.content_hash,
&chunk_data,
entry.plaintext_size,
entry.nonce,
entry.flags,
);
}
// Finalize the new pack
if !new_pack_writer.is_empty() {
let new_pack_info = new_pack_writer.finalize(&repo.path).await?;
// Update global index: point referenced chunks to the new pack
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
if referenced_chunks.contains(&hash_hex) {
let nonce = if entry.nonce != [0u8; 12] {
Some(hex::encode(entry.nonce))
} else {
None
};
repo.index.add_entry(hash_hex, IndexEntry {
pack_id: new_pack_info.pack_id.clone(),
offset: entry.offset, // Note: offset in the new pack may differ
compressed_size: entry.compressed_size,
plaintext_size: entry.plaintext_size,
nonce,
flags: entry.flags,
});
}
}
}
// Delete old pack + idx
let old_size = tokio::fs::metadata(&pack_path).await
.map(|m| m.len()).unwrap_or(0);
let old_idx_size = tokio::fs::metadata(&idx_path).await
.map(|m| m.len()).unwrap_or(0);
let _ = tokio::fs::remove_file(&pack_path).await;
let _ = tokio::fs::remove_file(&idx_path).await;
// Remove old pack entries from index
repo.index.remove_pack_entries(pack_id);
result.freed_bytes += unreferenced_bytes;
result.rewritten_packs += 1;
tracing::info!(
"Rewrote pack {} -> saved {} bytes",
pack_id, unreferenced_bytes
);
}
Ok(())
}
/// Determine which snapshot IDs to keep based on retention policy.
fn determine_kept_snapshots(
snapshots: &[snapshot::Snapshot],
retention: &RetentionPolicy,
) -> HashSet<String> {
let mut keep = HashSet::new();
// keepLast: keep the N most recent
if let Some(n) = retention.keep_last {
for snap in snapshots.iter().take(n as usize) {
keep.insert(snap.id.clone());
}
}
let now = chrono::Utc::now();
// keepDays: keep one per day for the last N days
if let Some(days) = retention.keep_days {
let mut seen_days = HashSet::new();
for snap in snapshots {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&snap.created_at) {
let age = now.signed_duration_since(dt);
if age.num_days() <= days as i64 {
let day_key = dt.format("%Y-%m-%d").to_string();
if seen_days.insert(day_key) {
keep.insert(snap.id.clone());
}
}
}
}
}
// keepWeeks: keep one per week for the last N weeks
if let Some(weeks) = retention.keep_weeks {
let mut seen_weeks = HashSet::new();
for snap in snapshots {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&snap.created_at) {
let age = now.signed_duration_since(dt);
if age.num_weeks() <= weeks as i64 {
let week_key = dt.format("%Y-W%W").to_string();
if seen_weeks.insert(week_key) {
keep.insert(snap.id.clone());
}
}
}
}
}
// keepMonths: keep one per month for the last N months
if let Some(months) = retention.keep_months {
let mut seen_months = HashSet::new();
for snap in snapshots {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(&snap.created_at) {
let age = now.signed_duration_since(dt);
if age.num_days() <= (months as i64) * 31 {
let month_key = dt.format("%Y-%m").to_string();
if seen_months.insert(month_key) {
keep.insert(snap.id.clone());
}
}
}
}
}
// If no retention policy is specified, keep everything
if retention.keep_last.is_none()
&& retention.keep_days.is_none()
&& retention.keep_weeks.is_none()
&& retention.keep_months.is_none()
{
for snap in snapshots {
keep.insert(snap.id.clone());
}
}
keep
}
/// Find pack IDs that contain at least one referenced chunk.
fn find_referenced_packs(
repo: &Repository,
referenced_chunks: &HashSet<String>,
) -> HashSet<String> {
let mut packs = HashSet::new();
for hash_hex in referenced_chunks {
if let Some(entry) = repo.index.get(hash_hex) {
packs.insert(entry.pack_id.clone());
}
}
packs
}
/// Find all pack IDs on disk.
async fn find_all_pack_ids(repo_path: &str) -> Result<Vec<String>, ArchiveError> {
let packs_dir = Path::new(repo_path).join("packs").join("data");
if !packs_dir.exists() {
return Ok(Vec::new());
}
let mut pack_ids = Vec::new();
let mut stack = vec![packs_dir];
while let Some(current) = stack.pop() {
let mut entries = tokio::fs::read_dir(&current).await?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.extension().and_then(|e| e.to_str()) == Some("pack") {
if let Some(id) = path.file_stem().and_then(|s| s.to_str()) {
pack_ids.push(id.to_string());
}
}
}
}
Ok(pack_ids)
}