Files
containerarchive/ts/classes.containerarchive.ts
Juergen Kunz ca510f4578 feat: add multi-item ingest and Reed-Solomon parity
- Multi-item ingest: each item gets its own Unix socket, Rust processes
  them sequentially into a single snapshot with separate chunk lists
- Reed-Solomon parity: rs(20,1) erasure coding for pack file groups,
  enabling single-pack-loss recovery via parity reconstruction
- Repair now attempts parity-based recovery for missing pack files
- 16 integration tests + 12 Rust unit tests all pass
2026-03-21 23:46:29 +00:00

420 lines
11 KiB
TypeScript

import * as plugins from './plugins.js';
import { commitinfo } from './00_commitinfo_data.js';
import type {
TContainerArchiveCommands,
IInitOptions,
IOpenOptions,
IIngestOptions,
IIngestItem,
IIngestItemOptions,
IRestoreOptions,
ISnapshot,
ISnapshotFilter,
IVerifyOptions,
IVerifyResult,
IRetentionPolicy,
IPruneResult,
IRepairResult,
IUnlockOptions,
IIngestProgress,
IIngestComplete,
IVerifyError,
IRepositoryConfig,
} from './interfaces.js';
/**
* Content-addressed incremental backup engine.
*
* Provides deduplicated, optionally encrypted, gzip-compressed storage
* for arbitrary data streams with full snapshot history.
*/
export class ContainerArchive {
private bridge: plugins.smartrust.RustBridge<TContainerArchiveCommands>;
private repoPath: string;
private spawned = false;
// Event subjects
public ingestProgress = new plugins.smartrx.rxjs.Subject<IIngestProgress>();
public ingestComplete = new plugins.smartrx.rxjs.Subject<IIngestComplete>();
public verifyError = new plugins.smartrx.rxjs.Subject<IVerifyError>();
private constructor(repoPath: string) {
this.repoPath = plugins.path.resolve(repoPath);
const packageDir = plugins.path.resolve(
plugins.path.dirname(new URL(import.meta.url).pathname),
'..',
);
this.bridge = new plugins.smartrust.RustBridge<TContainerArchiveCommands>({
binaryName: 'containerarchive',
localPaths: [
plugins.path.join(packageDir, 'dist_rust', 'containerarchive'),
],
readyTimeoutMs: 30000,
requestTimeoutMs: 300000,
});
// Listen for events from the Rust binary
this.bridge.on('event', (event: { event: string; data: any }) => {
if (event.event === 'progress') {
this.ingestProgress.next(event.data);
}
});
}
private async ensureSpawned(): Promise<void> {
if (!this.spawned) {
await this.bridge.spawn();
this.spawned = true;
}
}
/**
* Initialize a new repository at the given path.
*/
static async init(repoPath: string, options?: IInitOptions): Promise<ContainerArchive> {
const instance = new ContainerArchive(repoPath);
await instance.ensureSpawned();
await instance.bridge.sendCommand('init', {
path: instance.repoPath,
passphrase: options?.passphrase,
});
return instance;
}
/**
* Open an existing repository at the given path.
*/
static async open(repoPath: string, options?: IOpenOptions): Promise<ContainerArchive> {
const instance = new ContainerArchive(repoPath);
await instance.ensureSpawned();
await instance.bridge.sendCommand('open', {
path: instance.repoPath,
passphrase: options?.passphrase,
});
return instance;
}
/**
* Ingest a single data stream into the repository.
*/
async ingest(
inputStream: NodeJS.ReadableStream,
options?: IIngestOptions,
): Promise<ISnapshot> {
const socketPath = plugins.path.join(
plugins.os.tmpdir(),
`containerarchive-ingest-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
);
// Create Unix socket server that Rust will connect to
const { promise: dataTransferred, server } = await this.createSocketServer(
socketPath,
inputStream,
);
try {
// Send ingest command to Rust (Rust connects to our socket)
const result = await this.bridge.sendCommand('ingest', {
socketPath,
tags: options?.tags,
items: options?.items || [{ name: 'data', type: 'data' }],
});
// Wait for data transfer to complete
await dataTransferred;
const snapshot = result.snapshot;
this.ingestComplete.next({
snapshotId: snapshot.id,
originalSize: snapshot.originalSize,
storedSize: snapshot.storedSize,
newChunks: snapshot.newChunks,
reusedChunks: snapshot.reusedChunks,
});
return snapshot;
} finally {
server.close();
// Clean up socket file
try {
plugins.fs.unlinkSync(socketPath);
} catch {}
}
}
/**
* Ingest multiple data streams as a single multi-item snapshot.
* Each item gets its own Unix socket for parallel data transfer.
*/
async ingestMulti(
items: IIngestItem[],
options?: IIngestOptions,
): Promise<ISnapshot> {
if (items.length === 0) {
throw new Error('At least one item is required');
}
// Create one socket per item
const sockets: Array<{
socketPath: string;
promise: Promise<void>;
server: plugins.net.Server;
}> = [];
const itemOptions: Array<{
name: string;
type: string;
socketPath: string;
}> = [];
try {
for (const item of items) {
const socketPath = plugins.path.join(
plugins.os.tmpdir(),
`containerarchive-ingest-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
);
const { promise, server } = await this.createSocketServer(socketPath, item.stream);
sockets.push({ socketPath, promise, server });
itemOptions.push({
name: item.name,
type: item.type || 'data',
socketPath,
});
}
// Send ingestMulti command to Rust with per-item socket paths
const result = await this.bridge.sendCommand('ingestMulti', {
tags: options?.tags,
items: itemOptions,
});
// Wait for all data transfers
await Promise.all(sockets.map((s) => s.promise));
const snapshot = result.snapshot;
this.ingestComplete.next({
snapshotId: snapshot.id,
originalSize: snapshot.originalSize,
storedSize: snapshot.storedSize,
newChunks: snapshot.newChunks,
reusedChunks: snapshot.reusedChunks,
});
return snapshot;
} finally {
for (const s of sockets) {
s.server.close();
try { plugins.fs.unlinkSync(s.socketPath); } catch {}
}
}
}
/**
* List snapshots with optional filtering.
*/
async listSnapshots(filter?: ISnapshotFilter): Promise<ISnapshot[]> {
const result = await this.bridge.sendCommand('listSnapshots', {
filter,
});
return result.snapshots;
}
/**
* Get details of a specific snapshot.
*/
async getSnapshot(snapshotId: string): Promise<ISnapshot> {
const result = await this.bridge.sendCommand('getSnapshot', {
snapshotId,
});
return result.snapshot;
}
/**
* Restore a snapshot to a ReadableStream.
*/
async restore(
snapshotId: string,
options?: IRestoreOptions,
): Promise<NodeJS.ReadableStream> {
const socketPath = plugins.path.join(
plugins.os.tmpdir(),
`containerarchive-restore-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
);
// Create Unix socket server that Rust will connect to and write data
const { readable, server } = await this.createRestoreSocketServer(socketPath);
// Send restore command to Rust (Rust connects and writes data)
// Don't await — let it run in parallel with reading
this.bridge.sendCommand('restore', {
snapshotId,
socketPath,
item: options?.item,
}).catch((err) => {
readable.destroy(err);
}).finally(() => {
server.close();
try {
plugins.fs.unlinkSync(socketPath);
} catch {}
});
return readable;
}
/**
* Verify repository integrity.
*/
async verify(options?: IVerifyOptions): Promise<IVerifyResult> {
const result = await this.bridge.sendCommand('verify', {
level: options?.level || 'standard',
});
for (const error of result.errors) {
this.verifyError.next(error);
}
return result;
}
/**
* Repair repository (rebuild index, remove stale locks).
*/
async repair(): Promise<IRepairResult> {
return this.bridge.sendCommand('repair', {});
}
/**
* Prune old snapshots and garbage-collect unreferenced packs.
*/
async prune(retention: IRetentionPolicy, dryRun = false): Promise<IPruneResult> {
return this.bridge.sendCommand('prune', {
retention,
dryRun,
});
}
/**
* Rebuild the global index from pack .idx files.
*/
async reindex(): Promise<void> {
await this.bridge.sendCommand('reindex', {});
}
/**
* Remove locks from the repository.
*/
async unlock(options?: IUnlockOptions): Promise<void> {
await this.bridge.sendCommand('unlock', {
force: options?.force,
});
}
/**
* Subscribe to events.
*/
on(event: 'ingest:progress', handler: (data: IIngestProgress) => void): plugins.smartrx.rxjs.Subscription;
on(event: 'ingest:complete', handler: (data: IIngestComplete) => void): plugins.smartrx.rxjs.Subscription;
on(event: 'verify:error', handler: (data: IVerifyError) => void): plugins.smartrx.rxjs.Subscription;
on(event: string, handler: (data: any) => void): plugins.smartrx.rxjs.Subscription {
switch (event) {
case 'ingest:progress':
return this.ingestProgress.subscribe(handler);
case 'ingest:complete':
return this.ingestComplete.subscribe(handler);
case 'verify:error':
return this.verifyError.subscribe(handler);
default:
throw new Error(`Unknown event: ${event}`);
}
}
/**
* Close the repository and terminate the Rust process.
*/
async close(): Promise<void> {
try {
await this.bridge.sendCommand('close', {});
} catch {
// Ignore errors during close
}
this.bridge.kill();
this.spawned = false;
this.ingestProgress.complete();
this.ingestComplete.complete();
this.verifyError.complete();
}
// ==================== Private Helpers ====================
/**
* Create a Unix socket server that accepts a connection from Rust
* and pipes the inputStream to it (for ingest).
*/
private createSocketServer(
socketPath: string,
inputStream: NodeJS.ReadableStream,
): Promise<{
promise: Promise<void>;
server: plugins.net.Server;
}> {
return new Promise((resolve, reject) => {
const server = plugins.net.createServer((socket) => {
// Pipe input data to the Rust process via socket
const readableStream = inputStream as NodeJS.ReadableStream;
(readableStream as any).pipe(socket);
});
server.on('error', reject);
server.listen(socketPath, () => {
const promise = new Promise<void>((res) => {
server.on('close', () => res());
// Also resolve after a connection is handled
server.once('connection', (socket) => {
socket.on('end', () => {
res();
});
socket.on('error', () => {
res();
});
});
});
resolve({ promise, server });
});
});
}
/**
* Create a Unix socket server that accepts a connection from Rust
* and provides a ReadableStream of the received data (for restore).
*/
private createRestoreSocketServer(
socketPath: string,
): Promise<{
readable: plugins.stream.PassThrough;
server: plugins.net.Server;
}> {
return new Promise((resolve, reject) => {
const passthrough = new plugins.stream.PassThrough();
const server = plugins.net.createServer((socket) => {
socket.pipe(passthrough);
});
server.on('error', reject);
server.listen(socketPath, () => {
resolve({ readable: passthrough, server });
});
});
}
}