//! Operation log (OpLog) for tracking mutations. //! //! The OpLog records every write operation so that changes can be replayed, //! replicated, or used for change-stream style notifications. //! Each entry stores both the new and previous document state, enabling //! point-in-time replay and revert. use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use bson::Document; use dashmap::DashMap; use serde::{Deserialize, Serialize}; /// The type of operation recorded in the oplog. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum OpType { Insert, Update, Delete, } /// A single oplog entry. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OpLogEntry { /// Monotonically increasing sequence number. pub seq: u64, /// Timestamp in milliseconds since UNIX epoch. pub timestamp_ms: i64, /// Operation type. pub op: OpType, /// Database name. pub db: String, /// Collection name. pub collection: String, /// Document id (hex string). pub document_id: String, /// The new document snapshot (for insert/update; None for delete). pub document: Option, /// The previous document snapshot (for update/delete; None for insert). pub previous_document: Option, } /// Aggregate statistics about the oplog. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct OpLogStats { pub current_seq: u64, pub total_entries: usize, pub oldest_seq: u64, pub inserts: usize, pub updates: usize, pub deletes: usize, } /// In-memory operation log. pub struct OpLog { /// All entries keyed by sequence number. entries: DashMap, /// Next sequence number. next_seq: AtomicU64, } impl OpLog { pub fn new() -> Self { Self { entries: DashMap::new(), next_seq: AtomicU64::new(1), } } /// Append an operation to the log and return its sequence number. pub fn append( &self, op: OpType, db: &str, collection: &str, document_id: &str, document: Option, previous_document: Option, ) -> u64 { let seq = self.next_seq.fetch_add(1, Ordering::SeqCst); let entry = OpLogEntry { seq, timestamp_ms: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as i64, op, db: db.to_string(), collection: collection.to_string(), document_id: document_id.to_string(), document, previous_document, }; self.entries.insert(seq, entry); seq } /// Get a single entry by sequence number. pub fn get_entry(&self, seq: u64) -> Option { self.entries.get(&seq).map(|e| e.value().clone()) } /// Get all entries with sequence number >= `since`. pub fn entries_since(&self, since: u64) -> Vec { let mut result: Vec<_> = self .entries .iter() .filter(|e| *e.key() >= since) .map(|e| e.value().clone()) .collect(); result.sort_by_key(|e| e.seq); result } /// Get entries in range [from_seq, to_seq] inclusive, sorted by seq. pub fn entries_range(&self, from_seq: u64, to_seq: u64) -> Vec { let mut result: Vec<_> = self .entries .iter() .filter(|e| { let k = *e.key(); k >= from_seq && k <= to_seq }) .map(|e| e.value().clone()) .collect(); result.sort_by_key(|e| e.seq); result } /// Remove all entries with seq > `after_seq` and reset the next_seq counter. pub fn truncate_after(&self, after_seq: u64) { let keys_to_remove: Vec = self .entries .iter() .filter(|e| *e.key() > after_seq) .map(|e| *e.key()) .collect(); for key in keys_to_remove { self.entries.remove(&key); } self.next_seq.store(after_seq + 1, Ordering::SeqCst); } /// Get the current (latest) sequence number. Returns 0 if empty. pub fn current_seq(&self) -> u64 { self.next_seq.load(Ordering::SeqCst).saturating_sub(1) } /// Get aggregate statistics. pub fn stats(&self) -> OpLogStats { let mut inserts = 0usize; let mut updates = 0usize; let mut deletes = 0usize; let mut oldest_seq = u64::MAX; for entry in self.entries.iter() { match entry.value().op { OpType::Insert => inserts += 1, OpType::Update => updates += 1, OpType::Delete => deletes += 1, } if entry.value().seq < oldest_seq { oldest_seq = entry.value().seq; } } if oldest_seq == u64::MAX { oldest_seq = 0; } OpLogStats { current_seq: self.current_seq(), total_entries: self.entries.len(), oldest_seq, inserts, updates, deletes, } } /// Clear all entries. pub fn clear(&self) { self.entries.clear(); self.next_seq.store(1, Ordering::SeqCst); } /// Number of entries in the log. pub fn len(&self) -> usize { self.entries.len() } /// Whether the log is empty. pub fn is_empty(&self) -> bool { self.entries.is_empty() } } impl Default for OpLog { fn default() -> Self { Self::new() } }