import * as plugins from '../congodb.plugins.js'; import type { IStorageAdapter } from '../storage/IStorageAdapter.js'; import type { Document, IStoredDocument, ITransactionOptions } from '../types/interfaces.js'; import { CongoTransactionError, CongoWriteConflictError } from '../errors/CongoErrors.js'; /** * Transaction state */ export interface ITransactionState { id: string; sessionId: string; startTime: plugins.bson.Timestamp; status: 'active' | 'committed' | 'aborted'; readSet: Map>; // ns -> document _ids read writeSet: Map>; // ns -> _id -> operation snapshots: Map; // ns -> snapshot of documents } /** * Transaction engine for ACID transaction support */ export class TransactionEngine { private storage: IStorageAdapter; private transactions: Map = new Map(); private txnCounter = 0; constructor(storage: IStorageAdapter) { this.storage = storage; } /** * Start a new transaction */ startTransaction(sessionId: string, options?: ITransactionOptions): string { this.txnCounter++; const txnId = `txn_${sessionId}_${this.txnCounter}`; const transaction: ITransactionState = { id: txnId, sessionId, startTime: new plugins.bson.Timestamp({ t: Math.floor(Date.now() / 1000), i: this.txnCounter }), status: 'active', readSet: new Map(), writeSet: new Map(), snapshots: new Map(), }; this.transactions.set(txnId, transaction); return txnId; } /** * Get a transaction by ID */ getTransaction(txnId: string): ITransactionState | undefined { return this.transactions.get(txnId); } /** * Check if a transaction is active */ isActive(txnId: string): boolean { const txn = this.transactions.get(txnId); return txn?.status === 'active'; } /** * Get or create a snapshot for a namespace */ async getSnapshot(txnId: string, dbName: string, collName: string): Promise { const txn = this.transactions.get(txnId); if (!txn || txn.status !== 'active') { throw new CongoTransactionError('Transaction is not active'); } const ns = `${dbName}.${collName}`; if (!txn.snapshots.has(ns)) { const snapshot = await this.storage.createSnapshot(dbName, collName); txn.snapshots.set(ns, snapshot); } // Apply transaction writes to snapshot const snapshot = txn.snapshots.get(ns)!; const writes = txn.writeSet.get(ns); if (!writes) { return snapshot; } // Create a modified view of the snapshot const result: IStoredDocument[] = []; const deletedIds = new Set(); const modifiedDocs = new Map(); for (const [idStr, write] of writes) { if (write.op === 'delete') { deletedIds.add(idStr); } else if (write.op === 'update' || write.op === 'insert') { if (write.doc) { modifiedDocs.set(idStr, write.doc); } } } // Add existing documents (not deleted, possibly modified) for (const doc of snapshot) { const idStr = doc._id.toHexString(); if (deletedIds.has(idStr)) { continue; } if (modifiedDocs.has(idStr)) { result.push(modifiedDocs.get(idStr)!); modifiedDocs.delete(idStr); } else { result.push(doc); } } // Add new documents (inserts) for (const doc of modifiedDocs.values()) { result.push(doc); } return result; } /** * Record a read operation */ recordRead(txnId: string, dbName: string, collName: string, docIds: string[]): void { const txn = this.transactions.get(txnId); if (!txn || txn.status !== 'active') return; const ns = `${dbName}.${collName}`; if (!txn.readSet.has(ns)) { txn.readSet.set(ns, new Set()); } const readSet = txn.readSet.get(ns)!; for (const id of docIds) { readSet.add(id); } } /** * Record a write operation (insert) */ recordInsert(txnId: string, dbName: string, collName: string, doc: IStoredDocument): void { const txn = this.transactions.get(txnId); if (!txn || txn.status !== 'active') { throw new CongoTransactionError('Transaction is not active'); } const ns = `${dbName}.${collName}`; if (!txn.writeSet.has(ns)) { txn.writeSet.set(ns, new Map()); } txn.writeSet.get(ns)!.set(doc._id.toHexString(), { op: 'insert', doc, }); } /** * Record a write operation (update) */ recordUpdate( txnId: string, dbName: string, collName: string, originalDoc: IStoredDocument, updatedDoc: IStoredDocument ): void { const txn = this.transactions.get(txnId); if (!txn || txn.status !== 'active') { throw new CongoTransactionError('Transaction is not active'); } const ns = `${dbName}.${collName}`; if (!txn.writeSet.has(ns)) { txn.writeSet.set(ns, new Map()); } const idStr = originalDoc._id.toHexString(); const existing = txn.writeSet.get(ns)!.get(idStr); // If we already have a write for this document, update it if (existing) { existing.doc = updatedDoc; } else { txn.writeSet.get(ns)!.set(idStr, { op: 'update', doc: updatedDoc, originalDoc, }); } } /** * Record a write operation (delete) */ recordDelete(txnId: string, dbName: string, collName: string, doc: IStoredDocument): void { const txn = this.transactions.get(txnId); if (!txn || txn.status !== 'active') { throw new CongoTransactionError('Transaction is not active'); } const ns = `${dbName}.${collName}`; if (!txn.writeSet.has(ns)) { txn.writeSet.set(ns, new Map()); } const idStr = doc._id.toHexString(); const existing = txn.writeSet.get(ns)!.get(idStr); if (existing && existing.op === 'insert') { // If we inserted and then deleted, just remove the write txn.writeSet.get(ns)!.delete(idStr); } else { txn.writeSet.get(ns)!.set(idStr, { op: 'delete', originalDoc: doc, }); } } /** * Commit a transaction */ async commitTransaction(txnId: string): Promise { const txn = this.transactions.get(txnId); if (!txn) { throw new CongoTransactionError('Transaction not found'); } if (txn.status !== 'active') { throw new CongoTransactionError(`Cannot commit transaction in state: ${txn.status}`); } // Check for write conflicts for (const [ns, writes] of txn.writeSet) { const [dbName, collName] = ns.split('.'); const ids = Array.from(writes.keys()).map(id => new plugins.bson.ObjectId(id)); const hasConflicts = await this.storage.hasConflicts(dbName, collName, ids, txn.startTime); if (hasConflicts) { txn.status = 'aborted'; throw new CongoWriteConflictError(); } } // Apply all writes for (const [ns, writes] of txn.writeSet) { const [dbName, collName] = ns.split('.'); for (const [idStr, write] of writes) { switch (write.op) { case 'insert': if (write.doc) { await this.storage.insertOne(dbName, collName, write.doc); } break; case 'update': if (write.doc) { await this.storage.updateById(dbName, collName, new plugins.bson.ObjectId(idStr), write.doc); } break; case 'delete': await this.storage.deleteById(dbName, collName, new plugins.bson.ObjectId(idStr)); break; } } } txn.status = 'committed'; } /** * Abort a transaction */ async abortTransaction(txnId: string): Promise { const txn = this.transactions.get(txnId); if (!txn) { throw new CongoTransactionError('Transaction not found'); } if (txn.status !== 'active') { // Already committed or aborted, just return return; } // Simply discard all buffered writes txn.writeSet.clear(); txn.readSet.clear(); txn.snapshots.clear(); txn.status = 'aborted'; } /** * End a transaction (cleanup) */ endTransaction(txnId: string): void { this.transactions.delete(txnId); } /** * Get all pending writes for a namespace */ getPendingWrites(txnId: string, dbName: string, collName: string): Map | undefined { const txn = this.transactions.get(txnId); if (!txn) return undefined; const ns = `${dbName}.${collName}`; return txn.writeSet.get(ns); } /** * Execute a callback within a transaction, with automatic retry on conflict */ async withTransaction( sessionId: string, callback: (txnId: string) => Promise, options?: ITransactionOptions & { maxRetries?: number } ): Promise { const maxRetries = options?.maxRetries ?? 3; let lastError: Error | undefined; for (let attempt = 0; attempt < maxRetries; attempt++) { const txnId = this.startTransaction(sessionId, options); try { const result = await callback(txnId); await this.commitTransaction(txnId); this.endTransaction(txnId); return result; } catch (error: any) { await this.abortTransaction(txnId); this.endTransaction(txnId); if (error instanceof CongoWriteConflictError && attempt < maxRetries - 1) { // Retry on write conflict lastError = error; continue; } throw error; } } throw lastError || new CongoTransactionError('Transaction failed after max retries'); } }