- Multi-item ingest: each item gets its own Unix socket, Rust processes them sequentially into a single snapshot with separate chunk lists - Reed-Solomon parity: rs(20,1) erasure coding for pack file groups, enabling single-pack-loss recovery via parity reconstruction - Repair now attempts parity-based recovery for missing pack files - 16 integration tests + 12 Rust unit tests all pass
420 lines
11 KiB
TypeScript
420 lines
11 KiB
TypeScript
import * as plugins from './plugins.js';
|
|
import { commitinfo } from './00_commitinfo_data.js';
|
|
import type {
|
|
TContainerArchiveCommands,
|
|
IInitOptions,
|
|
IOpenOptions,
|
|
IIngestOptions,
|
|
IIngestItem,
|
|
IIngestItemOptions,
|
|
IRestoreOptions,
|
|
ISnapshot,
|
|
ISnapshotFilter,
|
|
IVerifyOptions,
|
|
IVerifyResult,
|
|
IRetentionPolicy,
|
|
IPruneResult,
|
|
IRepairResult,
|
|
IUnlockOptions,
|
|
IIngestProgress,
|
|
IIngestComplete,
|
|
IVerifyError,
|
|
IRepositoryConfig,
|
|
} from './interfaces.js';
|
|
|
|
/**
|
|
* Content-addressed incremental backup engine.
|
|
*
|
|
* Provides deduplicated, optionally encrypted, gzip-compressed storage
|
|
* for arbitrary data streams with full snapshot history.
|
|
*/
|
|
export class ContainerArchive {
|
|
private bridge: plugins.smartrust.RustBridge<TContainerArchiveCommands>;
|
|
private repoPath: string;
|
|
private spawned = false;
|
|
|
|
// Event subjects
|
|
public ingestProgress = new plugins.smartrx.rxjs.Subject<IIngestProgress>();
|
|
public ingestComplete = new plugins.smartrx.rxjs.Subject<IIngestComplete>();
|
|
public verifyError = new plugins.smartrx.rxjs.Subject<IVerifyError>();
|
|
|
|
private constructor(repoPath: string) {
|
|
this.repoPath = plugins.path.resolve(repoPath);
|
|
|
|
const packageDir = plugins.path.resolve(
|
|
plugins.path.dirname(new URL(import.meta.url).pathname),
|
|
'..',
|
|
);
|
|
|
|
this.bridge = new plugins.smartrust.RustBridge<TContainerArchiveCommands>({
|
|
binaryName: 'containerarchive',
|
|
localPaths: [
|
|
plugins.path.join(packageDir, 'dist_rust', 'containerarchive'),
|
|
],
|
|
readyTimeoutMs: 30000,
|
|
requestTimeoutMs: 300000,
|
|
});
|
|
|
|
// Listen for events from the Rust binary
|
|
this.bridge.on('event', (event: { event: string; data: any }) => {
|
|
if (event.event === 'progress') {
|
|
this.ingestProgress.next(event.data);
|
|
}
|
|
});
|
|
}
|
|
|
|
private async ensureSpawned(): Promise<void> {
|
|
if (!this.spawned) {
|
|
await this.bridge.spawn();
|
|
this.spawned = true;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Initialize a new repository at the given path.
|
|
*/
|
|
static async init(repoPath: string, options?: IInitOptions): Promise<ContainerArchive> {
|
|
const instance = new ContainerArchive(repoPath);
|
|
await instance.ensureSpawned();
|
|
|
|
await instance.bridge.sendCommand('init', {
|
|
path: instance.repoPath,
|
|
passphrase: options?.passphrase,
|
|
});
|
|
|
|
return instance;
|
|
}
|
|
|
|
/**
|
|
* Open an existing repository at the given path.
|
|
*/
|
|
static async open(repoPath: string, options?: IOpenOptions): Promise<ContainerArchive> {
|
|
const instance = new ContainerArchive(repoPath);
|
|
await instance.ensureSpawned();
|
|
|
|
await instance.bridge.sendCommand('open', {
|
|
path: instance.repoPath,
|
|
passphrase: options?.passphrase,
|
|
});
|
|
|
|
return instance;
|
|
}
|
|
|
|
/**
|
|
* Ingest a single data stream into the repository.
|
|
*/
|
|
async ingest(
|
|
inputStream: NodeJS.ReadableStream,
|
|
options?: IIngestOptions,
|
|
): Promise<ISnapshot> {
|
|
const socketPath = plugins.path.join(
|
|
plugins.os.tmpdir(),
|
|
`containerarchive-ingest-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
|
|
);
|
|
|
|
// Create Unix socket server that Rust will connect to
|
|
const { promise: dataTransferred, server } = await this.createSocketServer(
|
|
socketPath,
|
|
inputStream,
|
|
);
|
|
|
|
try {
|
|
// Send ingest command to Rust (Rust connects to our socket)
|
|
const result = await this.bridge.sendCommand('ingest', {
|
|
socketPath,
|
|
tags: options?.tags,
|
|
items: options?.items || [{ name: 'data', type: 'data' }],
|
|
});
|
|
|
|
// Wait for data transfer to complete
|
|
await dataTransferred;
|
|
|
|
const snapshot = result.snapshot;
|
|
this.ingestComplete.next({
|
|
snapshotId: snapshot.id,
|
|
originalSize: snapshot.originalSize,
|
|
storedSize: snapshot.storedSize,
|
|
newChunks: snapshot.newChunks,
|
|
reusedChunks: snapshot.reusedChunks,
|
|
});
|
|
|
|
return snapshot;
|
|
} finally {
|
|
server.close();
|
|
// Clean up socket file
|
|
try {
|
|
plugins.fs.unlinkSync(socketPath);
|
|
} catch {}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Ingest multiple data streams as a single multi-item snapshot.
|
|
* Each item gets its own Unix socket for parallel data transfer.
|
|
*/
|
|
async ingestMulti(
|
|
items: IIngestItem[],
|
|
options?: IIngestOptions,
|
|
): Promise<ISnapshot> {
|
|
if (items.length === 0) {
|
|
throw new Error('At least one item is required');
|
|
}
|
|
|
|
// Create one socket per item
|
|
const sockets: Array<{
|
|
socketPath: string;
|
|
promise: Promise<void>;
|
|
server: plugins.net.Server;
|
|
}> = [];
|
|
|
|
const itemOptions: Array<{
|
|
name: string;
|
|
type: string;
|
|
socketPath: string;
|
|
}> = [];
|
|
|
|
try {
|
|
for (const item of items) {
|
|
const socketPath = plugins.path.join(
|
|
plugins.os.tmpdir(),
|
|
`containerarchive-ingest-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
|
|
);
|
|
|
|
const { promise, server } = await this.createSocketServer(socketPath, item.stream);
|
|
sockets.push({ socketPath, promise, server });
|
|
itemOptions.push({
|
|
name: item.name,
|
|
type: item.type || 'data',
|
|
socketPath,
|
|
});
|
|
}
|
|
|
|
// Send ingestMulti command to Rust with per-item socket paths
|
|
const result = await this.bridge.sendCommand('ingestMulti', {
|
|
tags: options?.tags,
|
|
items: itemOptions,
|
|
});
|
|
|
|
// Wait for all data transfers
|
|
await Promise.all(sockets.map((s) => s.promise));
|
|
|
|
const snapshot = result.snapshot;
|
|
this.ingestComplete.next({
|
|
snapshotId: snapshot.id,
|
|
originalSize: snapshot.originalSize,
|
|
storedSize: snapshot.storedSize,
|
|
newChunks: snapshot.newChunks,
|
|
reusedChunks: snapshot.reusedChunks,
|
|
});
|
|
|
|
return snapshot;
|
|
} finally {
|
|
for (const s of sockets) {
|
|
s.server.close();
|
|
try { plugins.fs.unlinkSync(s.socketPath); } catch {}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* List snapshots with optional filtering.
|
|
*/
|
|
async listSnapshots(filter?: ISnapshotFilter): Promise<ISnapshot[]> {
|
|
const result = await this.bridge.sendCommand('listSnapshots', {
|
|
filter,
|
|
});
|
|
return result.snapshots;
|
|
}
|
|
|
|
/**
|
|
* Get details of a specific snapshot.
|
|
*/
|
|
async getSnapshot(snapshotId: string): Promise<ISnapshot> {
|
|
const result = await this.bridge.sendCommand('getSnapshot', {
|
|
snapshotId,
|
|
});
|
|
return result.snapshot;
|
|
}
|
|
|
|
/**
|
|
* Restore a snapshot to a ReadableStream.
|
|
*/
|
|
async restore(
|
|
snapshotId: string,
|
|
options?: IRestoreOptions,
|
|
): Promise<NodeJS.ReadableStream> {
|
|
const socketPath = plugins.path.join(
|
|
plugins.os.tmpdir(),
|
|
`containerarchive-restore-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
|
|
);
|
|
|
|
// Create Unix socket server that Rust will connect to and write data
|
|
const { readable, server } = await this.createRestoreSocketServer(socketPath);
|
|
|
|
// Send restore command to Rust (Rust connects and writes data)
|
|
// Don't await — let it run in parallel with reading
|
|
this.bridge.sendCommand('restore', {
|
|
snapshotId,
|
|
socketPath,
|
|
item: options?.item,
|
|
}).catch((err) => {
|
|
readable.destroy(err);
|
|
}).finally(() => {
|
|
server.close();
|
|
try {
|
|
plugins.fs.unlinkSync(socketPath);
|
|
} catch {}
|
|
});
|
|
|
|
return readable;
|
|
}
|
|
|
|
/**
|
|
* Verify repository integrity.
|
|
*/
|
|
async verify(options?: IVerifyOptions): Promise<IVerifyResult> {
|
|
const result = await this.bridge.sendCommand('verify', {
|
|
level: options?.level || 'standard',
|
|
});
|
|
|
|
for (const error of result.errors) {
|
|
this.verifyError.next(error);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
/**
|
|
* Repair repository (rebuild index, remove stale locks).
|
|
*/
|
|
async repair(): Promise<IRepairResult> {
|
|
return this.bridge.sendCommand('repair', {});
|
|
}
|
|
|
|
/**
|
|
* Prune old snapshots and garbage-collect unreferenced packs.
|
|
*/
|
|
async prune(retention: IRetentionPolicy, dryRun = false): Promise<IPruneResult> {
|
|
return this.bridge.sendCommand('prune', {
|
|
retention,
|
|
dryRun,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Rebuild the global index from pack .idx files.
|
|
*/
|
|
async reindex(): Promise<void> {
|
|
await this.bridge.sendCommand('reindex', {});
|
|
}
|
|
|
|
/**
|
|
* Remove locks from the repository.
|
|
*/
|
|
async unlock(options?: IUnlockOptions): Promise<void> {
|
|
await this.bridge.sendCommand('unlock', {
|
|
force: options?.force,
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Subscribe to events.
|
|
*/
|
|
on(event: 'ingest:progress', handler: (data: IIngestProgress) => void): plugins.smartrx.rxjs.Subscription;
|
|
on(event: 'ingest:complete', handler: (data: IIngestComplete) => void): plugins.smartrx.rxjs.Subscription;
|
|
on(event: 'verify:error', handler: (data: IVerifyError) => void): plugins.smartrx.rxjs.Subscription;
|
|
on(event: string, handler: (data: any) => void): plugins.smartrx.rxjs.Subscription {
|
|
switch (event) {
|
|
case 'ingest:progress':
|
|
return this.ingestProgress.subscribe(handler);
|
|
case 'ingest:complete':
|
|
return this.ingestComplete.subscribe(handler);
|
|
case 'verify:error':
|
|
return this.verifyError.subscribe(handler);
|
|
default:
|
|
throw new Error(`Unknown event: ${event}`);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Close the repository and terminate the Rust process.
|
|
*/
|
|
async close(): Promise<void> {
|
|
try {
|
|
await this.bridge.sendCommand('close', {});
|
|
} catch {
|
|
// Ignore errors during close
|
|
}
|
|
this.bridge.kill();
|
|
this.spawned = false;
|
|
|
|
this.ingestProgress.complete();
|
|
this.ingestComplete.complete();
|
|
this.verifyError.complete();
|
|
}
|
|
|
|
// ==================== Private Helpers ====================
|
|
|
|
/**
|
|
* Create a Unix socket server that accepts a connection from Rust
|
|
* and pipes the inputStream to it (for ingest).
|
|
*/
|
|
private createSocketServer(
|
|
socketPath: string,
|
|
inputStream: NodeJS.ReadableStream,
|
|
): Promise<{
|
|
promise: Promise<void>;
|
|
server: plugins.net.Server;
|
|
}> {
|
|
return new Promise((resolve, reject) => {
|
|
const server = plugins.net.createServer((socket) => {
|
|
// Pipe input data to the Rust process via socket
|
|
const readableStream = inputStream as NodeJS.ReadableStream;
|
|
(readableStream as any).pipe(socket);
|
|
});
|
|
|
|
server.on('error', reject);
|
|
|
|
server.listen(socketPath, () => {
|
|
const promise = new Promise<void>((res) => {
|
|
server.on('close', () => res());
|
|
// Also resolve after a connection is handled
|
|
server.once('connection', (socket) => {
|
|
socket.on('end', () => {
|
|
res();
|
|
});
|
|
socket.on('error', () => {
|
|
res();
|
|
});
|
|
});
|
|
});
|
|
resolve({ promise, server });
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Create a Unix socket server that accepts a connection from Rust
|
|
* and provides a ReadableStream of the received data (for restore).
|
|
*/
|
|
private createRestoreSocketServer(
|
|
socketPath: string,
|
|
): Promise<{
|
|
readable: plugins.stream.PassThrough;
|
|
server: plugins.net.Server;
|
|
}> {
|
|
return new Promise((resolve, reject) => {
|
|
const passthrough = new plugins.stream.PassThrough();
|
|
const server = plugins.net.createServer((socket) => {
|
|
socket.pipe(passthrough);
|
|
});
|
|
|
|
server.on('error', reject);
|
|
|
|
server.listen(socketPath, () => {
|
|
resolve({ readable: passthrough, server });
|
|
});
|
|
});
|
|
}
|
|
}
|