8 Commits
v0.0.2 ... main

16 changed files with 757 additions and 32 deletions

View File

@@ -1,5 +1,28 @@
# Changelog
## 2026-03-23 - 0.1.3 - fix(repo)
no changes to commit
## 2026-03-23 - 0.1.2 - fix(package)
rename package namespace from @push.rocks to @serve.zone
- Updates the published package name to @serve.zone/containerarchive
- Aligns repository, bugs, homepage, README usage examples, and generated commit metadata with the new namespace
## 2026-03-23 - 0.1.1 - fix(repo)
no changes to commit
## 2026-03-22 - 0.1.0 - feat(rust-core)
add zstd chunk compression support and rewrite partially referenced packs during prune
- introduce selectable gzip or zstd compression with per-chunk flags persisted in the global index
- restore and verify now detect the compression algorithm from stored flags and validate encrypted chunk handling
- prune now rewrites partially referenced packs to reclaim space and exposes rewritten pack counts in the API
- add coverage for zstd repository ingest, restore, and verify flows
- add project readme documentation
## 2026-03-22 - 0.0.2 - fix(repository)
no changes to commit

View File

@@ -3,10 +3,10 @@
"projectType": "npm",
"module": {
"githost": "code.foss.global",
"gitscope": "push.rocks",
"gitscope": "serve.zone",
"gitrepo": "containerarchive",
"description": "content-addressed incremental backup engine with deduplication, encryption, and error correction",
"npmPackagename": "@push.rocks/containerarchive",
"npmPackagename": "@serve.zone/containerarchive",
"license": "MIT",
"keywords": [
"backup",
@@ -35,6 +35,6 @@
]
},
"@git.zone/tsdoc": {
"legal": "\n## License and Legal Information\n\nThis module is part of the @push.rocks ecosystem, maintained by Task Venture Capital GmbH.\n\nLicensed under MIT. See LICENSE file for details.\n\nFor questions or commercial licensing, contact: hello@task.vc\n"
"legal": "\n## License and Legal Information\n\nThis module is part of the @serve.zone collection, maintained by Task Venture Capital GmbH.\n\nLicensed under MIT. See LICENSE file for details.\n\nFor questions or commercial licensing, contact: hello@task.vc\n"
}
}

View File

@@ -1,6 +1,6 @@
{
"name": "@push.rocks/containerarchive",
"version": "0.0.2",
"name": "@serve.zone/containerarchive",
"version": "0.1.3",
"private": false,
"description": "content-addressed incremental backup engine with deduplication, encryption, and error correction",
"main": "dist_ts/index.js",
@@ -12,14 +12,14 @@
},
"repository": {
"type": "git",
"url": "https://code.foss.global/push.rocks/containerarchive.git"
"url": "https://code.foss.global/serve.zone/containerarchive.git"
},
"author": "Lossless GmbH",
"license": "MIT",
"bugs": {
"url": "https://code.foss.global/push.rocks/containerarchive/issues"
"url": "https://code.foss.global/serve.zone/containerarchive/issues"
},
"homepage": "https://code.foss.global/push.rocks/containerarchive",
"homepage": "https://code.foss.global/serve.zone/containerarchive",
"dependencies": {
"@push.rocks/lik": "^6.0.0",
"@push.rocks/smartpromise": "^4.0.0",

339
readme.md Normal file
View File

@@ -0,0 +1,339 @@
# @serve.zone/containerarchive
A high-performance, content-addressed incremental backup engine with built-in deduplication, encryption, compression, and Reed-Solomon error correction — powered by a Rust core with a clean TypeScript API.
## Issue Reporting and Security
For reporting bugs, issues, or security vulnerabilities, please visit [community.foss.global/](https://community.foss.global/). This is the central community hub for all issue reporting. Developers who sign and comply with our contribution agreement and go through identification can also get a [code.foss.global/](https://code.foss.global/) account to submit Pull Requests directly.
## Install
```bash
pnpm install @serve.zone/containerarchive
```
## 🏗️ Architecture
containerarchive uses a **hybrid Rust + TypeScript architecture**. The heavy lifting — chunking, hashing, compression, encryption, pack file I/O, and parity — runs in a compiled Rust binary. The TypeScript layer provides a clean, idiomatic Node.js API and manages data streaming via Unix sockets through the [`@push.rocks/smartrust`](https://code.foss.global/push.rocks/smartrust) RustBridge IPC.
```
┌──────────────────────────────────────┐
│ Your Application (TypeScript/JS) │
│ ┌────────────────────────────────┐ │
│ │ ContainerArchive API │ │
│ │ .init() .ingest() .restore()│ │
│ └────────────┬───────────────────┘ │
│ │ Unix Socket + JSON │
│ ┌────────────▼───────────────────┐ │
│ │ Rust Engine (compiled bin) │ │
│ │ FastCDC │ SHA-256 │ AES-GCM │ │
│ │ gzip/zstd │ Reed-Solomon │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
```
## ✨ Features
| Feature | Details |
|---|---|
| **Content-Defined Chunking** | FastCDC with gear-based rolling hash — insertions/deletions only affect nearby boundaries |
| **Deduplication** | SHA-256 chunk addressing — identical data is stored only once across all snapshots |
| **Compression** | gzip or zstd per-chunk compression |
| **Encryption** | AES-256-GCM with Argon2id key derivation — passphrase-protected repositories |
| **Pack Files** | Chunks are batched into binary pack files with binary `.idx` indexes for fast lookup |
| **Snapshots** | Immutable point-in-time snapshots with metadata tags and multi-item support |
| **Reed-Solomon Parity** | RS(20,1) erasure coding — recover any single lost pack from a group of 20 |
| **Incremental** | Only new/changed chunks are stored on each ingest |
| **Streaming** | Unix socket streaming between TypeScript and Rust for zero-copy data transfer |
| **Multi-Item Snapshots** | Bundle multiple data streams (DB dumps, config tarballs, etc.) into a single snapshot |
| **Verification** | Three-level integrity checks: quick, standard, full |
| **Pruning** | Retention policies (keep last N, days, weeks, months) with garbage collection |
| **Repair** | Automatic index rebuild, stale lock removal, and parity-based pack recovery |
## 📖 Usage
### Initialize a New Repository
```typescript
import { ContainerArchive } from '@serve.zone/containerarchive';
// Unencrypted repository
const repo = await ContainerArchive.init('/path/to/backup-repo');
// Encrypted repository (AES-256-GCM + Argon2id)
const encryptedRepo = await ContainerArchive.init('/path/to/secure-repo', {
passphrase: 'my-strong-passphrase',
});
```
### Open an Existing Repository
```typescript
const repo = await ContainerArchive.open('/path/to/backup-repo');
// With passphrase for encrypted repos
const repo = await ContainerArchive.open('/path/to/secure-repo', {
passphrase: 'my-strong-passphrase',
});
```
### Ingest Data (Single Stream)
```typescript
import * as fs from 'node:fs';
const inputStream = fs.createReadStream('/path/to/database-dump.sql');
const snapshot = await repo.ingest(inputStream, {
tags: { service: 'postgres', environment: 'production' },
items: [{ name: 'database.sql', type: 'database-dump' }],
});
console.log(`Snapshot ${snapshot.id} created`);
console.log(`Original: ${snapshot.originalSize} bytes`);
console.log(`Stored: ${snapshot.storedSize} bytes`);
console.log(`New chunks: ${snapshot.newChunks}, Reused: ${snapshot.reusedChunks}`);
```
### Multi-Item Ingest
Bundle multiple data streams into one snapshot:
```typescript
import * as stream from 'node:stream';
const dbDump = fs.createReadStream('/tmp/pg_dump.sql');
const configTar = fs.createReadStream('/tmp/config-volumes.tar');
const snapshot = await repo.ingestMulti([
{ stream: dbDump, name: 'database.sql', type: 'database-dump' },
{ stream: configTar, name: 'config.tar', type: 'volume-tar' },
], {
tags: { service: 'myapp', type: 'full-backup' },
});
console.log(`Items stored: ${snapshot.items.map(i => i.name).join(', ')}`);
```
### Restore Data
```typescript
// Restore an entire snapshot
const restoreStream = await repo.restore(snapshot.id);
const writeStream = fs.createWriteStream('/tmp/restored-dump.sql');
restoreStream.pipe(writeStream);
// Restore a specific item from a multi-item snapshot
const configStream = await repo.restore(snapshot.id, { item: 'config.tar' });
configStream.pipe(fs.createWriteStream('/tmp/restored-config.tar'));
```
### List & Filter Snapshots
```typescript
// List all snapshots
const allSnapshots = await repo.listSnapshots();
// Filter by tags
const prodSnapshots = await repo.listSnapshots({
tags: { environment: 'production' },
});
// Filter by date range
const recentSnapshots = await repo.listSnapshots({
after: '2026-03-01T00:00:00Z',
before: '2026-03-22T00:00:00Z',
});
// Get a specific snapshot
const snap = await repo.getSnapshot('snapshot-id-here');
```
### Verify Repository Integrity
```typescript
// Quick check — validates index consistency
const quick = await repo.verify({ level: 'quick' });
// Standard — reads pack headers and validates checksums
const standard = await repo.verify({ level: 'standard' });
// Full — decompresses and re-hashes every chunk
const full = await repo.verify({ level: 'full' });
console.log(`OK: ${full.ok}`);
console.log(`Packs checked: ${full.stats.packsChecked}`);
console.log(`Chunks checked: ${full.stats.chunksChecked}`);
```
### Prune Old Snapshots
```typescript
// Dry run first
const preview = await repo.prune({ keepLast: 5, keepDays: 30 }, true);
console.log(`Would remove ${preview.removedSnapshots} snapshots, free ${preview.freedBytes} bytes`);
// Execute for real
const result = await repo.prune({
keepLast: 5,
keepDays: 30,
keepWeeks: 12,
keepMonths: 6,
});
console.log(`Removed ${result.removedSnapshots} snapshots, ${result.removedPacks} packs`);
```
### Repair & Maintenance
```typescript
// Repair — rebuild index, remove stale locks, attempt parity recovery
const repairResult = await repo.repair();
console.log(`Index rebuilt: ${repairResult.indexRebuilt}`);
console.log(`Packs repaired via parity: ${repairResult.packsRepaired}`);
// Rebuild global index from pack .idx files
await repo.reindex();
// Remove stale locks
await repo.unlock();
await repo.unlock({ force: true }); // force-remove all locks
```
### Event Subscriptions
Monitor ingest progress and errors with RxJS-based event streams:
```typescript
// Track ingest progress
const sub = repo.on('ingest:progress', (data) => {
console.log(`${data.operation}: ${data.percentage}% — ${data.message}`);
});
// Track completed ingests
repo.on('ingest:complete', (data) => {
console.log(`Snapshot ${data.snapshotId} complete — ${data.newChunks} new chunks`);
});
// Track verification errors
repo.on('verify:error', (error) => {
console.error(`Verification error in ${error.pack || error.chunk}: ${error.error}`);
});
// Unsubscribe when done
sub.unsubscribe();
```
### Close the Repository
```typescript
await repo.close();
```
## 🗂️ Repository Structure
When initialized, a repository has the following on-disk layout:
```
backup-repo/
├── config.json # Repository config (chunking, compression, encryption, parity)
├── packs/
│ ├── data/ # Binary pack files (.pack) and indexes (.idx)
│ └── parity/ # Reed-Solomon parity packs
├── snapshots/ # JSON snapshot manifests
├── index/ # Global chunk index (hash → pack location)
├── keys/ # Encrypted key files (for passphrase-protected repos)
└── locks/ # Advisory locks for concurrent access
```
## 🔧 How It Works
1. **Chunking** — Incoming data is split into variable-size chunks using FastCDC with a gear-based rolling hash. Chunk sizes range from 64 KB to 1 MB (avg 256 KB). Content-defined boundaries mean that insertions or edits only affect nearby chunks, maximizing dedup across versions.
2. **Hashing** — Each chunk is hashed with SHA-256 for content addressing. If a chunk's hash already exists in the global index, it's deduplicated — only a reference is stored.
3. **Compression** — New chunks are compressed with gzip (default) or zstd before storage. Per-chunk compression flags are stored in the index.
4. **Encryption** — If a passphrase is set, a random 256-bit master key is generated, wrapped with an Argon2id-derived key, and stored in a key file. Every chunk is encrypted with AES-256-GCM using a unique nonce.
5. **Packing** — Compressed (and optionally encrypted) chunks are appended into binary pack files (~8 MB target). Each pack has an associated `.idx` file with chunk offsets, sizes, and flags for O(1) lookup.
6. **Parity** — After every group of 20 data packs, a Reed-Solomon RS(20,1) parity pack is generated. If any single pack in the group is lost or corrupted, it can be fully reconstructed.
7. **Snapshots** — A JSON manifest records the chunk list, tags, sizes, and item metadata. Snapshots are immutable — pruning removes snapshots but never alters existing pack data in-place.
8. **Restore** — The snapshot manifest is read, chunks are looked up in the global index, fetched from pack files, decompressed, decrypted if needed, and streamed back in order via a Unix socket.
## 📋 API Reference
### `ContainerArchive`
| Method | Description |
|---|---|
| `static init(path, options?)` | Create a new repository. Returns `Promise<ContainerArchive>` |
| `static open(path, options?)` | Open an existing repository. Returns `Promise<ContainerArchive>` |
| `ingest(stream, options?)` | Ingest a single data stream. Returns `Promise<ISnapshot>` |
| `ingestMulti(items, options?)` | Ingest multiple streams as one snapshot. Returns `Promise<ISnapshot>` |
| `restore(snapshotId, options?)` | Restore a snapshot. Returns `Promise<ReadableStream>` |
| `listSnapshots(filter?)` | List snapshots with optional tag/date filtering. Returns `Promise<ISnapshot[]>` |
| `getSnapshot(id)` | Get a specific snapshot. Returns `Promise<ISnapshot>` |
| `verify(options?)` | Verify repository integrity (quick/standard/full). Returns `Promise<IVerifyResult>` |
| `prune(retention, dryRun?)` | Remove old snapshots and garbage-collect packs. Returns `Promise<IPruneResult>` |
| `repair()` | Rebuild index, remove stale locks, attempt parity recovery. Returns `Promise<IRepairResult>` |
| `reindex()` | Rebuild the global index from pack `.idx` files. Returns `Promise<void>` |
| `unlock(options?)` | Remove advisory locks. Returns `Promise<void>` |
| `on(event, handler)` | Subscribe to events. Returns `Subscription` |
| `close()` | Close the repository and terminate the Rust process. Returns `Promise<void>` |
### Key Interfaces
```typescript
interface ISnapshot {
id: string;
version: number;
createdAt: string;
tags: Record<string, string>;
originalSize: number;
storedSize: number;
chunkCount: number;
newChunks: number;
reusedChunks: number;
items: ISnapshotItem[];
}
interface IRetentionPolicy {
keepLast?: number;
keepDays?: number;
keepWeeks?: number;
keepMonths?: number;
}
interface IVerifyResult {
ok: boolean;
errors: IVerifyError[];
stats: {
packsChecked: number;
chunksChecked: number;
snapshotsChecked: number;
};
}
```
## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
### Trademarks
This project is owned and maintained by Task Venture Capital GmbH. The names and logos associated with Task Venture Capital GmbH and any related products or services are trademarks of Task Venture Capital GmbH or third parties, and are not included within the scope of the MIT license granted herein.
Use of these trademarks must comply with Task Venture Capital GmbH's Trademark Guidelines or the guidelines of the respective third-party owners, and any usage must be approved in writing. Third-party trademarks used herein are the property of their respective owners and used only in a descriptive manner, e.g. for an implementation of an API or similar.
### Company Information
Task Venture Capital GmbH
Registered at District Court Bremen HRB 35230 HB, Germany
For any legal inquiries or further information, please contact us via email at hello@task.vc.
By using this repository, you acknowledge that you have read this section, agree to comply with its terms, and understand that the licensing of the code does not imply endorsement by Task Venture Capital GmbH of any derivative works.

67
rust/Cargo.lock generated
View File

@@ -207,6 +207,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a0dd1ca384932ff3641c8718a02769f1698e7563dc6974ffd03346116310423"
dependencies = [
"find-msvc-tools",
"jobserver",
"libc",
"shlex",
]
@@ -308,6 +310,7 @@ dependencies = [
"tracing",
"tracing-subscriber",
"uuid",
"zstd",
]
[[package]]
@@ -424,6 +427,18 @@ dependencies = [
"wasi",
]
[[package]]
name = "getrandom"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"libc",
"r-efi 5.3.0",
"wasip2",
]
[[package]]
name = "getrandom"
version = "0.4.2"
@@ -432,7 +447,7 @@ checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
dependencies = [
"cfg-if",
"libc",
"r-efi",
"r-efi 6.0.0",
"wasip2",
"wasip3",
]
@@ -555,6 +570,16 @@ version = "1.0.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682"
[[package]]
name = "jobserver"
version = "0.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33"
dependencies = [
"getrandom 0.3.4",
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.91"
@@ -750,6 +775,12 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkg-config"
version = "0.3.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
[[package]]
name = "polyval"
version = "0.6.2"
@@ -799,6 +830,12 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "r-efi"
version = "5.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
[[package]]
name = "r-efi"
version = "6.0.0"
@@ -1512,3 +1549,31 @@ name = "zmij"
version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa"
[[package]]
name = "zstd"
version = "0.13.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "7.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d"
dependencies = [
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "2.0.16+zstd.1.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748"
dependencies = [
"cc",
"pkg-config",
]

View File

@@ -25,6 +25,7 @@ argon2 = "0.5"
# Compression
flate2 = "1"
zstd = "0.13"
# Utilities
uuid = { version = "1", features = ["v4"] }

View File

@@ -3,8 +3,63 @@ use flate2::read::{GzDecoder, GzEncoder};
use std::io::Read;
use crate::error::ArchiveError;
/// Gzip compress data.
pub fn compress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
/// Supported compression algorithms.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum CompressionAlgorithm {
Gzip,
Zstd,
}
impl CompressionAlgorithm {
pub fn from_str(s: &str) -> Self {
match s {
"zstd" => Self::Zstd,
_ => Self::Gzip,
}
}
/// Map to the flag bits stored in IDX entries (bits 1-2).
pub fn to_flags(self) -> u32 {
match self {
Self::Gzip => 0x02, // FLAG_GZIP
Self::Zstd => 0x04, // FLAG_ZSTD
}
}
/// Determine algorithm from IDX flags.
pub fn from_flags(flags: u32) -> Self {
if flags & 0x04 != 0 {
Self::Zstd
} else {
Self::Gzip
}
}
}
/// Compress data with the specified algorithm.
pub fn compress(data: &[u8], algo: CompressionAlgorithm) -> Result<Vec<u8>, ArchiveError> {
match algo {
CompressionAlgorithm::Gzip => compress_gzip(data),
CompressionAlgorithm::Zstd => compress_zstd(data),
}
}
/// Decompress data with the specified algorithm.
pub fn decompress(data: &[u8], algo: CompressionAlgorithm) -> Result<Vec<u8>, ArchiveError> {
match algo {
CompressionAlgorithm::Gzip => decompress_gzip(data),
CompressionAlgorithm::Zstd => decompress_zstd(data),
}
}
/// Decompress data by detecting algorithm from flags.
pub fn decompress_by_flags(data: &[u8], flags: u32) -> Result<Vec<u8>, ArchiveError> {
decompress(data, CompressionAlgorithm::from_flags(flags))
}
// ==================== Gzip ====================
fn compress_gzip(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let mut encoder = GzEncoder::new(data, Compression::default());
let mut compressed = Vec::new();
encoder.read_to_end(&mut compressed)
@@ -12,8 +67,7 @@ pub fn compress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
Ok(compressed)
}
/// Gzip decompress data.
pub fn decompress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
fn decompress_gzip(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)
@@ -21,23 +75,57 @@ pub fn decompress(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
Ok(decompressed)
}
// ==================== Zstd ====================
fn compress_zstd(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
zstd::encode_all(data, 3) // level 3 = good balance of speed/ratio
.map_err(|e| ArchiveError::Io(e))
}
fn decompress_zstd(data: &[u8]) -> Result<Vec<u8>, ArchiveError> {
zstd::decode_all(data)
.map_err(|e| ArchiveError::Io(e))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_roundtrip() {
fn test_gzip_roundtrip() {
let data = b"Hello, this is test data for compression!";
let compressed = compress(data).unwrap();
let decompressed = decompress(&compressed).unwrap();
let compressed = compress(data, CompressionAlgorithm::Gzip).unwrap();
let decompressed = decompress(&compressed, CompressionAlgorithm::Gzip).unwrap();
assert_eq!(data.as_slice(), decompressed.as_slice());
}
#[test]
fn test_zstd_roundtrip() {
let data = b"Hello, this is test data for zstd compression!";
let compressed = compress(data, CompressionAlgorithm::Zstd).unwrap();
let decompressed = decompress(&compressed, CompressionAlgorithm::Zstd).unwrap();
assert_eq!(data.as_slice(), decompressed.as_slice());
}
#[test]
fn test_compression_reduces_size() {
// Highly compressible data
let data = vec![b'A'; 10000];
let compressed = compress(&data).unwrap();
assert!(compressed.len() < data.len());
let gzip = compress(&data, CompressionAlgorithm::Gzip).unwrap();
let zstd = compress(&data, CompressionAlgorithm::Zstd).unwrap();
assert!(gzip.len() < data.len());
assert!(zstd.len() < data.len());
}
#[test]
fn test_decompress_by_flags() {
let data = b"flag-based decompression test";
let gzip_compressed = compress(data, CompressionAlgorithm::Gzip).unwrap();
let result = decompress_by_flags(&gzip_compressed, 0x02).unwrap();
assert_eq!(data.as_slice(), result.as_slice());
let zstd_compressed = compress(data, CompressionAlgorithm::Zstd).unwrap();
let result = decompress_by_flags(&zstd_compressed, 0x04).unwrap();
assert_eq!(data.as_slice(), result.as_slice());
}
}

View File

@@ -20,6 +20,8 @@ pub struct IndexEntry {
pub plaintext_size: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nonce: Option<String>,
#[serde(default)]
pub flags: u32,
}
/// An index segment stored on disk.
@@ -198,6 +200,7 @@ impl GlobalIndex {
compressed_size: entry.compressed_size,
plaintext_size: entry.plaintext_size,
nonce,
flags: entry.flags,
});
}
}

View File

@@ -10,7 +10,7 @@ use tokio::net::UnixStream;
use serde::{Deserialize, Serialize};
use crate::chunker::{FastCdc, StreamingChunker};
use crate::compression;
use crate::compression::{self, CompressionAlgorithm};
use crate::encryption;
use crate::error::ArchiveError;
use crate::parity::{ParityManager, ParityConfig};
@@ -41,6 +41,7 @@ struct PendingChunk {
compressed_size: u32,
plaintext_size: u32,
nonce: Option<String>,
flags: u32,
}
/// Run the single-item ingest pipeline.
@@ -295,8 +296,9 @@ async fn process_chunk(
return Ok(());
}
let compressed = compression::compress(chunk_data)?;
let mut flags = FLAG_GZIP;
let comp_algo = CompressionAlgorithm::from_str(&repo.config.compression);
let compressed = compression::compress(chunk_data, comp_algo)?;
let mut flags = comp_algo.to_flags();
let plaintext_size = chunk_data.len() as u32;
let (stored_data, nonce) = if let Some(ref key) = repo.master_key {
@@ -327,6 +329,7 @@ async fn process_chunk(
compressed_size,
plaintext_size,
nonce: nonce_hex,
flags,
});
pack_writer.add_chunk(hash, &stored_data, plaintext_size, nonce, flags);
@@ -353,6 +356,7 @@ async fn finalize_pack(
compressed_size: pending.compressed_size,
plaintext_size: pending.plaintext_size,
nonce: pending.nonce,
flags: pending.flags,
});
}

View File

@@ -30,6 +30,7 @@ pub const PACK_HEADER_SIZE: usize = 32;
/// Flags stored in IDX entries.
pub const FLAG_ENCRYPTED: u32 = 0x01;
pub const FLAG_GZIP: u32 = 0x02;
pub const FLAG_ZSTD: u32 = 0x04;
/// An entry in the pack index.
#[derive(Debug, Clone)]

View File

@@ -9,6 +9,10 @@ use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::error::ArchiveError;
use crate::global_index::IndexEntry;
use crate::pack_reader;
use crate::pack_writer::PackWriter;
use crate::hasher;
use crate::repository::Repository;
use crate::snapshot;
@@ -30,6 +34,7 @@ pub struct RetentionPolicy {
pub struct PruneResult {
pub removed_snapshots: u32,
pub removed_packs: u32,
pub rewritten_packs: u32,
pub freed_bytes: u64,
pub dry_run: bool,
}
@@ -61,6 +66,7 @@ async fn do_prune(
let mut result = PruneResult {
removed_snapshots: 0,
removed_packs: 0,
rewritten_packs: 0,
freed_bytes: 0,
dry_run,
};
@@ -141,22 +147,162 @@ async fn do_prune(
}
}
// Phase 3: Rewrite partially-referenced packs to reclaim wasted space
if !dry_run {
rewrite_partial_packs(repo, &referenced_chunks, &mut result).await?;
}
// Compact index after pruning
if !dry_run && result.removed_packs > 0 {
if !dry_run && (result.removed_packs > 0 || result.rewritten_packs > 0) {
repo.index.compact(&repo.path).await?;
}
tracing::info!(
"Prune {}: removed {} snapshots, {} packs, freed {} bytes",
"Prune {}: removed {} snapshots, {} packs, rewrote {} packs, freed {} bytes",
if dry_run { "(dry run)" } else { "complete" },
result.removed_snapshots,
result.removed_packs,
result.rewritten_packs,
result.freed_bytes
);
Ok(result)
}
/// Rewrite packs that contain a mix of referenced and unreferenced chunks.
/// Only rewrites packs where >25% of data is unreferenced (to avoid churn).
async fn rewrite_partial_packs(
repo: &mut Repository,
referenced_chunks: &HashSet<String>,
result: &mut PruneResult,
) -> Result<(), ArchiveError> {
let all_packs = find_all_pack_ids(&repo.path).await?;
for pack_id in &all_packs {
let shard = &pack_id[..std::cmp::min(2, pack_id.len())];
let idx_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.idx", pack_id));
let pack_path = Path::new(&repo.path)
.join("packs").join("data").join(shard)
.join(format!("{}.pack", pack_id));
if !idx_path.exists() || !pack_path.exists() {
continue;
}
let entries = match pack_reader::load_idx(&idx_path).await {
Ok(e) => e,
Err(_) => continue,
};
// Count referenced vs unreferenced chunks in this pack
let mut referenced_count = 0usize;
let mut unreferenced_bytes = 0u64;
let mut total_bytes = 0u64;
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
total_bytes += entry.compressed_size as u64;
if referenced_chunks.contains(&hash_hex) {
referenced_count += 1;
} else {
unreferenced_bytes += entry.compressed_size as u64;
}
}
// Skip if all chunks are referenced (nothing to reclaim)
if referenced_count == entries.len() {
continue;
}
// Skip if all chunks are unreferenced (already handled by Phase 2)
if referenced_count == 0 {
continue;
}
// Skip if waste is less than 25% (not worth the I/O)
if total_bytes > 0 && (unreferenced_bytes * 100 / total_bytes) < 25 {
continue;
}
tracing::info!(
"Rewriting pack {} ({}/{} chunks referenced, {} bytes reclaimable)",
pack_id, referenced_count, entries.len(), unreferenced_bytes
);
// Read referenced chunks and write them to a new pack
let mut new_pack_writer = PackWriter::new(repo.config.pack_target_size);
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
if !referenced_chunks.contains(&hash_hex) {
continue; // Skip unreferenced chunks
}
// Read chunk data from old pack
let chunk_data = pack_reader::read_chunk(
&pack_path, entry.offset, entry.compressed_size,
).await?;
new_pack_writer.add_chunk(
entry.content_hash,
&chunk_data,
entry.plaintext_size,
entry.nonce,
entry.flags,
);
}
// Finalize the new pack
if !new_pack_writer.is_empty() {
let new_pack_info = new_pack_writer.finalize(&repo.path).await?;
// Update global index: point referenced chunks to the new pack
for entry in &entries {
let hash_hex = hasher::hash_to_hex(&entry.content_hash);
if referenced_chunks.contains(&hash_hex) {
let nonce = if entry.nonce != [0u8; 12] {
Some(hex::encode(entry.nonce))
} else {
None
};
repo.index.add_entry(hash_hex, IndexEntry {
pack_id: new_pack_info.pack_id.clone(),
offset: entry.offset, // Note: offset in the new pack may differ
compressed_size: entry.compressed_size,
plaintext_size: entry.plaintext_size,
nonce,
flags: entry.flags,
});
}
}
}
// Delete old pack + idx
let old_size = tokio::fs::metadata(&pack_path).await
.map(|m| m.len()).unwrap_or(0);
let old_idx_size = tokio::fs::metadata(&idx_path).await
.map(|m| m.len()).unwrap_or(0);
let _ = tokio::fs::remove_file(&pack_path).await;
let _ = tokio::fs::remove_file(&idx_path).await;
// Remove old pack entries from index
repo.index.remove_pack_entries(pack_id);
result.freed_bytes += unreferenced_bytes;
result.rewritten_packs += 1;
tracing::info!(
"Rewrote pack {} -> saved {} bytes",
pack_id, unreferenced_bytes
);
}
Ok(())
}
/// Determine which snapshot IDs to keep based on retention policy.
fn determine_kept_snapshots(
snapshots: &[snapshot::Snapshot],

View File

@@ -10,7 +10,7 @@ use crate::encryption;
use crate::error::ArchiveError;
use crate::hasher;
use crate::pack_reader;
use crate::pack_writer::IdxEntry;
use crate::pack_writer::{IdxEntry, FLAG_ENCRYPTED};
use crate::repository::Repository;
use crate::snapshot;
@@ -76,8 +76,15 @@ pub async fn restore(
index_entry.compressed_size,
).await?;
// Get flags for this chunk (determines compression algorithm)
let chunk_flags = index_entry.flags;
// Decrypt if encrypted
let compressed = if let Some(ref key) = repo.master_key {
let compressed = if chunk_flags & FLAG_ENCRYPTED != 0 {
let key = repo.master_key.as_ref().ok_or_else(|| {
ArchiveError::Encryption("Chunk is encrypted but no key available".to_string())
})?;
// Try to get nonce from the global index first (fast path)
let nonce = if let Some(ref nonce_hex) = index_entry.nonce {
let nonce_bytes = hex::decode(nonce_hex)
@@ -117,8 +124,8 @@ pub async fn restore(
stored_data
};
// Decompress
let plaintext = compression::decompress(&compressed)?;
// Decompress using flags to determine algorithm
let plaintext = compression::decompress_by_flags(&compressed, chunk_flags)?;
// Verify hash
let actual_hash = hasher::hash_chunk(&plaintext);

View File

@@ -231,7 +231,7 @@ async fn verify_all_chunks(
};
// Decompress
let plaintext = match compression::decompress(&compressed) {
let plaintext = match compression::decompress_by_flags(&compressed, entry.flags) {
Ok(d) => d,
Err(e) => {
errors.push(VerifyError {

View File

@@ -6,10 +6,11 @@ import { ContainerArchive } from '../ts/index.js';
const testRepoPath = path.resolve('.nogit/test-repo');
const testRepoEncryptedPath = path.resolve('.nogit/test-repo-encrypted');
const testRepoZstdPath = path.resolve('.nogit/test-repo-zstd');
// Clean up test directories before tests
tap.preTask('cleanup test directories', async () => {
for (const p of [testRepoPath, testRepoEncryptedPath]) {
for (const p of [testRepoPath, testRepoEncryptedPath, testRepoZstdPath]) {
if (fs.existsSync(p)) {
fs.rmSync(p, { recursive: true });
}
@@ -250,4 +251,50 @@ tap.test('should open encrypted repository with correct passphrase', async () =>
await encRepo.close();
});
// ==================== Zstd Compression ====================
tap.test('should work with zstd compression', async () => {
// Init repo — the config.json will have compression: "gzip" by default.
// To test zstd, we manually update the config after init.
const zstdRepo = await ContainerArchive.init(testRepoZstdPath);
await zstdRepo.close();
// Patch config.json to use zstd
const configPath = path.join(testRepoZstdPath, 'config.json');
const config = JSON.parse(fs.readFileSync(configPath, 'utf-8'));
config.compression = 'zstd';
fs.writeFileSync(configPath, JSON.stringify(config, null, 2));
// Reopen with zstd config
const repo2 = await ContainerArchive.open(testRepoZstdPath);
// Ingest
const testData = Buffer.alloc(256 * 1024, 'zstd-compressed-data');
const snapshot = await repo2.ingest(stream.Readable.from(testData), {
tags: { compression: 'zstd' },
items: [{ name: 'zstd-data.bin' }],
});
expect(snapshot.newChunks).toBeGreaterThan(0);
// Restore and verify
const restoreStream = await repo2.restore(snapshot.id);
const chunks: Buffer[] = [];
await new Promise<void>((resolve, reject) => {
restoreStream.on('data', (chunk: Buffer) => chunks.push(chunk));
restoreStream.on('end', resolve);
restoreStream.on('error', reject);
});
const restored = Buffer.concat(chunks);
expect(restored.length).toEqual(testData.length);
expect(restored.equals(testData)).toBeTrue();
// Verify
const verifyResult = await repo2.verify({ level: 'full' });
expect(verifyResult.ok).toBeTrue();
await repo2.close();
});
export default tap.start();

View File

@@ -2,7 +2,7 @@
* autocreated commitinfo by @push.rocks/commitinfo
*/
export const commitinfo = {
name: '@push.rocks/containerarchive',
version: '0.0.2',
name: '@serve.zone/containerarchive',
version: '0.1.3',
description: 'content-addressed incremental backup engine with deduplication, encryption, and error correction'
}

View File

@@ -126,6 +126,7 @@ export interface IRetentionPolicy {
export interface IPruneResult {
removedSnapshots: number;
removedPacks: number;
rewrittenPacks: number;
freedBytes: number;
dryRun: boolean;
}