use std::collections::{HashMap, HashSet}; use bson::Document; use dashmap::DashMap; use tracing::{debug, warn}; use uuid::Uuid; use rustdb_storage::StorageAdapter; use crate::error::{TransactionError, TransactionResult}; /// The status of a transaction. #[derive(Debug, Clone, PartialEq, Eq)] pub enum TransactionStatus { Active, Committed, Aborted, } /// Describes a write operation within a transaction. #[derive(Debug, Clone, PartialEq, Eq)] pub enum WriteOp { Insert, Update, Delete, } /// A single write entry recorded within a transaction. #[derive(Debug, Clone)] pub struct WriteEntry { pub op: WriteOp, pub doc: Option, pub original_doc: Option, } /// Full state of an in-flight transaction. #[derive(Debug)] pub struct TransactionState { pub id: String, pub session_id: String, pub status: TransactionStatus, /// Tracks which documents were read: namespace -> set of doc ids. pub read_set: HashMap>, /// Tracks writes: namespace -> (doc_id -> WriteEntry). pub write_set: HashMap>, /// Snapshot of collections at transaction start: namespace -> documents. pub snapshots: HashMap>, } /// Engine that manages transaction lifecycle and conflict detection. pub struct TransactionEngine { transactions: DashMap, } impl TransactionEngine { /// Create a new transaction engine. pub fn new() -> Self { Self { transactions: DashMap::new(), } } /// Start a new transaction for the given session. /// Returns a unique transaction id (UUID v4). pub fn start_transaction(&self, session_id: &str) -> TransactionResult { let txn_id = Uuid::new_v4().to_string(); debug!(txn_id = %txn_id, session_id = %session_id, "starting transaction"); let state = TransactionState { id: txn_id.clone(), session_id: session_id.to_string(), status: TransactionStatus::Active, read_set: HashMap::new(), write_set: HashMap::new(), snapshots: HashMap::new(), }; self.transactions.insert(txn_id.clone(), state); Ok(txn_id) } /// Commit a transaction: check for conflicts, then apply buffered writes /// to the underlying storage adapter. pub async fn commit_transaction( &self, txn_id: &str, storage: &dyn StorageAdapter, ) -> TransactionResult<()> { // Remove the transaction so we own it exclusively. let mut state = self .transactions .remove(txn_id) .map(|(_, s)| s) .ok_or_else(|| TransactionError::NotFound(txn_id.to_string()))?; if state.status != TransactionStatus::Active { return Err(TransactionError::InvalidState(format!( "transaction {} is {:?}, cannot commit", txn_id, state.status ))); } // Conflict detection: check if any documents in the read set have // been modified since the snapshot was taken. // (Simplified: we skip real snapshot timestamps for now.) // Apply buffered writes to storage. for (ns, writes) in &state.write_set { let parts: Vec<&str> = ns.splitn(2, '.').collect(); if parts.len() != 2 { warn!(namespace = %ns, "invalid namespace format, skipping"); continue; } let (db, coll) = (parts[0], parts[1]); for (doc_id, entry) in writes { match entry.op { WriteOp::Insert => { if let Some(ref doc) = entry.doc { let _ = storage.insert_one(db, coll, doc.clone()).await; } } WriteOp::Update => { if let Some(ref doc) = entry.doc { let _ = storage.update_by_id(db, coll, doc_id, doc.clone()).await; } } WriteOp::Delete => { let _ = storage.delete_by_id(db, coll, doc_id).await; } } } } state.status = TransactionStatus::Committed; debug!(txn_id = %txn_id, "transaction committed"); Ok(()) } /// Abort a transaction, discarding all buffered writes. pub fn abort_transaction(&self, txn_id: &str) -> TransactionResult<()> { let mut state = self .transactions .get_mut(txn_id) .ok_or_else(|| TransactionError::NotFound(txn_id.to_string()))?; if state.status != TransactionStatus::Active { return Err(TransactionError::InvalidState(format!( "transaction {} is {:?}, cannot abort", txn_id, state.status ))); } state.status = TransactionStatus::Aborted; debug!(txn_id = %txn_id, "transaction aborted"); // Drop the mutable ref before removing. drop(state); self.transactions.remove(txn_id); Ok(()) } /// Check whether a transaction is currently active. pub fn is_active(&self, txn_id: &str) -> bool { self.transactions .get(txn_id) .map(|s| s.status == TransactionStatus::Active) .unwrap_or(false) } /// Record a document read within a transaction (for conflict detection). pub fn record_read(&self, txn_id: &str, ns: &str, doc_id: &str) { if let Some(mut state) = self.transactions.get_mut(txn_id) { state .read_set .entry(ns.to_string()) .or_default() .insert(doc_id.to_string()); } } /// Record a document write within a transaction (buffered until commit). pub fn record_write( &self, txn_id: &str, ns: &str, doc_id: &str, op: WriteOp, doc: Option, original: Option, ) { if let Some(mut state) = self.transactions.get_mut(txn_id) { let entry = WriteEntry { op, doc, original_doc: original, }; state .write_set .entry(ns.to_string()) .or_default() .insert(doc_id.to_string(), entry); } } /// Get a snapshot of documents for a namespace within a transaction, /// applying the write overlay (inserts, updates, deletes) on top. pub fn get_snapshot(&self, txn_id: &str, ns: &str) -> Option> { let state = self.transactions.get(txn_id)?; // Start with the base snapshot. let mut docs: Vec = state .snapshots .get(ns) .cloned() .unwrap_or_default(); // Apply write overlay. if let Some(writes) = state.write_set.get(ns) { // Collect ids to delete. let delete_ids: HashSet<&String> = writes .iter() .filter(|(_, e)| e.op == WriteOp::Delete) .map(|(id, _)| id) .collect(); // Remove deleted docs. docs.retain(|d| { if let Some(id) = d.get_object_id("_id").ok().map(|oid| oid.to_hex()) { !delete_ids.contains(&id) } else { true } }); // Apply updates. for (doc_id, entry) in writes { if entry.op == WriteOp::Update { if let Some(ref new_doc) = entry.doc { // Replace existing doc with updated version. let hex_id = doc_id.clone(); if let Some(pos) = docs.iter().position(|d| { d.get_object_id("_id") .ok() .map(|oid| oid.to_hex()) == Some(hex_id.clone()) }) { docs[pos] = new_doc.clone(); } } } } // Apply inserts. for (_doc_id, entry) in writes { if entry.op == WriteOp::Insert { if let Some(ref doc) = entry.doc { docs.push(doc.clone()); } } } } Some(docs) } /// Store a base snapshot for a namespace within a transaction. pub fn set_snapshot(&self, txn_id: &str, ns: &str, docs: Vec) { if let Some(mut state) = self.transactions.get_mut(txn_id) { state.snapshots.insert(ns.to_string(), docs); } } } impl Default for TransactionEngine { fn default() -> Self { Self::new() } }