import * as plugins from './plugins.js'; import { commitinfo } from './00_commitinfo_data.js'; import type { TContainerArchiveCommands, IInitOptions, IOpenOptions, IIngestOptions, IIngestItem, IIngestItemOptions, IRestoreOptions, ISnapshot, ISnapshotFilter, IVerifyOptions, IVerifyResult, IRetentionPolicy, IPruneResult, IRepairResult, IUnlockOptions, IIngestProgress, IIngestComplete, IVerifyError, IRepositoryConfig, } from './interfaces.js'; /** * Content-addressed incremental backup engine. * * Provides deduplicated, optionally encrypted, gzip-compressed storage * for arbitrary data streams with full snapshot history. */ export class ContainerArchive { private bridge: plugins.smartrust.RustBridge; private repoPath: string; private spawned = false; // Event subjects public ingestProgress = new plugins.smartrx.rxjs.Subject(); public ingestComplete = new plugins.smartrx.rxjs.Subject(); public verifyError = new plugins.smartrx.rxjs.Subject(); private constructor(repoPath: string) { this.repoPath = plugins.path.resolve(repoPath); const packageDir = plugins.path.resolve( plugins.path.dirname(new URL(import.meta.url).pathname), '..', ); this.bridge = new plugins.smartrust.RustBridge({ binaryName: 'containerarchive', localPaths: [ plugins.path.join(packageDir, 'dist_rust', 'containerarchive'), ], readyTimeoutMs: 30000, requestTimeoutMs: 300000, }); // Listen for events from the Rust binary this.bridge.on('event', (event: { event: string; data: any }) => { if (event.event === 'progress') { this.ingestProgress.next(event.data); } }); } private async ensureSpawned(): Promise { if (!this.spawned) { await this.bridge.spawn(); this.spawned = true; } } /** * Initialize a new repository at the given path. */ static async init(repoPath: string, options?: IInitOptions): Promise { const instance = new ContainerArchive(repoPath); await instance.ensureSpawned(); await instance.bridge.sendCommand('init', { path: instance.repoPath, passphrase: options?.passphrase, }); return instance; } /** * Open an existing repository at the given path. */ static async open(repoPath: string, options?: IOpenOptions): Promise { const instance = new ContainerArchive(repoPath); await instance.ensureSpawned(); await instance.bridge.sendCommand('open', { path: instance.repoPath, passphrase: options?.passphrase, }); return instance; } /** * Ingest a single data stream into the repository. */ async ingest( inputStream: NodeJS.ReadableStream, options?: IIngestOptions, ): Promise { const socketPath = plugins.path.join( plugins.os.tmpdir(), `containerarchive-ingest-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, ); // Create Unix socket server that Rust will connect to const { promise: dataTransferred, server } = await this.createSocketServer( socketPath, inputStream, ); try { // Send ingest command to Rust (Rust connects to our socket) const result = await this.bridge.sendCommand('ingest', { socketPath, tags: options?.tags, items: options?.items || [{ name: 'data', type: 'data' }], }); // Wait for data transfer to complete await dataTransferred; const snapshot = result.snapshot; this.ingestComplete.next({ snapshotId: snapshot.id, originalSize: snapshot.originalSize, storedSize: snapshot.storedSize, newChunks: snapshot.newChunks, reusedChunks: snapshot.reusedChunks, }); return snapshot; } finally { server.close(); // Clean up socket file try { plugins.fs.unlinkSync(socketPath); } catch {} } } /** * Ingest multiple data streams as a single multi-item snapshot. * Each item gets its own Unix socket for parallel data transfer. */ async ingestMulti( items: IIngestItem[], options?: IIngestOptions, ): Promise { if (items.length === 0) { throw new Error('At least one item is required'); } // Create one socket per item const sockets: Array<{ socketPath: string; promise: Promise; server: plugins.net.Server; }> = []; const itemOptions: Array<{ name: string; type: string; socketPath: string; }> = []; try { for (const item of items) { const socketPath = plugins.path.join( plugins.os.tmpdir(), `containerarchive-ingest-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, ); const { promise, server } = await this.createSocketServer(socketPath, item.stream); sockets.push({ socketPath, promise, server }); itemOptions.push({ name: item.name, type: item.type || 'data', socketPath, }); } // Send ingestMulti command to Rust with per-item socket paths const result = await this.bridge.sendCommand('ingestMulti', { tags: options?.tags, items: itemOptions, }); // Wait for all data transfers await Promise.all(sockets.map((s) => s.promise)); const snapshot = result.snapshot; this.ingestComplete.next({ snapshotId: snapshot.id, originalSize: snapshot.originalSize, storedSize: snapshot.storedSize, newChunks: snapshot.newChunks, reusedChunks: snapshot.reusedChunks, }); return snapshot; } finally { for (const s of sockets) { s.server.close(); try { plugins.fs.unlinkSync(s.socketPath); } catch {} } } } /** * List snapshots with optional filtering. */ async listSnapshots(filter?: ISnapshotFilter): Promise { const result = await this.bridge.sendCommand('listSnapshots', { filter, }); return result.snapshots; } /** * Get details of a specific snapshot. */ async getSnapshot(snapshotId: string): Promise { const result = await this.bridge.sendCommand('getSnapshot', { snapshotId, }); return result.snapshot; } /** * Restore a snapshot to a ReadableStream. */ async restore( snapshotId: string, options?: IRestoreOptions, ): Promise { const socketPath = plugins.path.join( plugins.os.tmpdir(), `containerarchive-restore-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, ); // Create Unix socket server that Rust will connect to and write data const { readable, server } = await this.createRestoreSocketServer(socketPath); // Send restore command to Rust (Rust connects and writes data) // Don't await — let it run in parallel with reading this.bridge.sendCommand('restore', { snapshotId, socketPath, item: options?.item, }).catch((err) => { readable.destroy(err); }).finally(() => { server.close(); try { plugins.fs.unlinkSync(socketPath); } catch {} }); return readable; } /** * Verify repository integrity. */ async verify(options?: IVerifyOptions): Promise { const result = await this.bridge.sendCommand('verify', { level: options?.level || 'standard', }); for (const error of result.errors) { this.verifyError.next(error); } return result; } /** * Repair repository (rebuild index, remove stale locks). */ async repair(): Promise { return this.bridge.sendCommand('repair', {}); } /** * Prune old snapshots and garbage-collect unreferenced packs. */ async prune(retention: IRetentionPolicy, dryRun = false): Promise { return this.bridge.sendCommand('prune', { retention, dryRun, }); } /** * Rebuild the global index from pack .idx files. */ async reindex(): Promise { await this.bridge.sendCommand('reindex', {}); } /** * Remove locks from the repository. */ async unlock(options?: IUnlockOptions): Promise { await this.bridge.sendCommand('unlock', { force: options?.force, }); } /** * Subscribe to events. */ on(event: 'ingest:progress', handler: (data: IIngestProgress) => void): plugins.smartrx.rxjs.Subscription; on(event: 'ingest:complete', handler: (data: IIngestComplete) => void): plugins.smartrx.rxjs.Subscription; on(event: 'verify:error', handler: (data: IVerifyError) => void): plugins.smartrx.rxjs.Subscription; on(event: string, handler: (data: any) => void): plugins.smartrx.rxjs.Subscription { switch (event) { case 'ingest:progress': return this.ingestProgress.subscribe(handler); case 'ingest:complete': return this.ingestComplete.subscribe(handler); case 'verify:error': return this.verifyError.subscribe(handler); default: throw new Error(`Unknown event: ${event}`); } } /** * Close the repository and terminate the Rust process. */ async close(): Promise { try { await this.bridge.sendCommand('close', {}); } catch { // Ignore errors during close } this.bridge.kill(); this.spawned = false; this.ingestProgress.complete(); this.ingestComplete.complete(); this.verifyError.complete(); } // ==================== Private Helpers ==================== /** * Create a Unix socket server that accepts a connection from Rust * and pipes the inputStream to it (for ingest). */ private createSocketServer( socketPath: string, inputStream: NodeJS.ReadableStream, ): Promise<{ promise: Promise; server: plugins.net.Server; }> { return new Promise((resolve, reject) => { const server = plugins.net.createServer((socket) => { // Pipe input data to the Rust process via socket const readableStream = inputStream as NodeJS.ReadableStream; (readableStream as any).pipe(socket); }); server.on('error', reject); server.listen(socketPath, () => { const promise = new Promise((res) => { server.on('close', () => res()); // Also resolve after a connection is handled server.once('connection', (socket) => { socket.on('end', () => { res(); }); socket.on('error', () => { res(); }); }); }); resolve({ promise, server }); }); }); } /** * Create a Unix socket server that accepts a connection from Rust * and provides a ReadableStream of the received data (for restore). */ private createRestoreSocketServer( socketPath: string, ): Promise<{ readable: plugins.stream.PassThrough; server: plugins.net.Server; }> { return new Promise((resolve, reject) => { const passthrough = new plugins.stream.PassThrough(); const server = plugins.net.createServer((socket) => { socket.pipe(passthrough); }); server.on('error', reject); server.listen(socketPath, () => { resolve({ readable: passthrough, server }); }); }); } }