Files
smartdb/rust/crates/rustdb-commands/src/context.rs
T

99 lines
3.7 KiB
Rust

use std::sync::Arc;
use bson::{Bson, Document};
use dashmap::DashMap;
use rustdb_index::{IndexEngine, IndexOptions};
use rustdb_storage::{OpLog, StorageAdapter};
use rustdb_txn::{SessionEngine, TransactionEngine};
/// Shared command execution context, passed to all handlers.
pub struct CommandContext {
/// The storage backend.
pub storage: Arc<dyn StorageAdapter>,
/// Index engines per namespace: "db.collection" -> IndexEngine.
pub indexes: Arc<DashMap<String, IndexEngine>>,
/// Transaction engine for multi-document transactions.
pub transactions: Arc<TransactionEngine>,
/// Session engine for logical sessions.
pub sessions: Arc<SessionEngine>,
/// Active cursors for getMore / killCursors.
pub cursors: Arc<DashMap<i64, CursorState>>,
/// Server start time (for uptime reporting).
pub start_time: std::time::Instant,
/// Operation log for point-in-time replay.
pub oplog: Arc<OpLog>,
}
impl CommandContext {
/// Get or lazily initialize an IndexEngine for a namespace.
///
/// If no IndexEngine exists yet for this namespace, loads persisted index
/// specs from `indexes.json` via the storage adapter, creates the engine
/// with those specs, and rebuilds index data from existing documents.
/// This ensures unique indexes are enforced even on the very first write
/// after a restart.
pub async fn get_or_init_index_engine(&self, db: &str, coll: &str) -> dashmap::mapref::one::RefMut<'_, String, IndexEngine> {
let ns_key = format!("{}.{}", db, coll);
// Fast path: engine already exists.
if self.indexes.contains_key(&ns_key) {
return self.indexes.entry(ns_key).or_insert_with(IndexEngine::new);
}
// Slow path: load from persisted specs.
let mut engine = IndexEngine::new();
let mut has_custom = false;
if let Ok(specs) = self.storage.get_indexes(db, coll).await {
for spec in &specs {
let name = spec.get_str("name").unwrap_or("").to_string();
if name == "_id_" || name.is_empty() {
continue;
}
let key = match spec.get("key") {
Some(Bson::Document(k)) => k.clone(),
_ => continue,
};
let unique = matches!(spec.get("unique"), Some(Bson::Boolean(true)));
let sparse = matches!(spec.get("sparse"), Some(Bson::Boolean(true)));
let expire_after_seconds = match spec.get("expireAfterSeconds") {
Some(Bson::Int32(n)) => Some(*n as u64),
Some(Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name: Some(name),
unique,
sparse,
expire_after_seconds,
};
let _ = engine.create_index(key, options);
has_custom = true;
}
}
if has_custom {
// Rebuild index data from existing documents.
if let Ok(docs) = self.storage.find_all(db, coll).await {
if !docs.is_empty() {
engine.rebuild_from_documents(&docs);
}
}
}
self.indexes.entry(ns_key).or_insert(engine)
}
}
/// State of an open cursor from a find or aggregate command.
pub struct CursorState {
/// Documents remaining to be returned.
pub documents: Vec<Document>,
/// Current read position within `documents`.
pub position: usize,
/// Database the cursor belongs to.
pub database: String,
/// Collection the cursor belongs to.
pub collection: String,
}