Files
smartdb/rust/crates/rustdb-index/src/engine.rs
T

741 lines
26 KiB
Rust

use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use bson::{Bson, Document};
use tracing::{debug, trace};
use rustdb_query::get_nested_value;
use crate::error::IndexError;
/// Options for creating an index.
#[derive(Debug, Clone, Default)]
pub struct IndexOptions {
/// Custom name for the index. Auto-generated if None.
pub name: Option<String>,
/// Whether the index enforces unique values.
pub unique: bool,
/// Whether the index skips documents missing the indexed field.
pub sparse: bool,
/// TTL in seconds (for date fields). None means no expiry.
pub expire_after_seconds: Option<u64>,
}
/// Metadata about an existing index.
#[derive(Debug, Clone)]
pub struct IndexInfo {
/// Index version (always 2).
pub v: i32,
/// The key specification document (e.g. {"name": 1}).
pub key: Document,
/// The index name.
pub name: String,
/// Whether the index enforces uniqueness.
pub unique: bool,
/// Whether the index is sparse.
pub sparse: bool,
/// TTL expiry in seconds, if set.
pub expire_after_seconds: Option<u64>,
}
/// Internal data for a single index.
struct IndexData {
/// The key specification (field -> direction).
key: Document,
/// The index name.
name: String,
/// Whether uniqueness is enforced.
unique: bool,
/// Whether the index is sparse.
sparse: bool,
/// TTL in seconds.
expire_after_seconds: Option<u64>,
/// B-tree for range queries: serialized key bytes -> set of document _id hex strings.
btree: BTreeMap<Vec<u8>, BTreeSet<String>>,
/// Hash map for equality lookups: serialized key bytes -> set of document _id hex strings.
hash: HashMap<Vec<u8>, HashSet<String>>,
}
impl IndexData {
fn new(key: Document, name: String, unique: bool, sparse: bool, expire_after_seconds: Option<u64>) -> Self {
Self {
key,
name,
unique,
sparse,
expire_after_seconds,
btree: BTreeMap::new(),
hash: HashMap::new(),
}
}
fn to_info(&self) -> IndexInfo {
IndexInfo {
v: 2,
key: self.key.clone(),
name: self.name.clone(),
unique: self.unique,
sparse: self.sparse,
expire_after_seconds: self.expire_after_seconds,
}
}
}
/// Manages indexes for a single collection.
pub struct IndexEngine {
/// All indexes keyed by name.
indexes: HashMap<String, IndexData>,
}
impl IndexEngine {
/// Create a new IndexEngine with the default `_id_` index.
pub fn new() -> Self {
let mut indexes = HashMap::new();
let id_key = bson::doc! { "_id": 1 };
let id_index = IndexData::new(id_key, "_id_".to_string(), true, false, None);
indexes.insert("_id_".to_string(), id_index);
Self { indexes }
}
/// Create a new index. Returns the index name.
pub fn create_index(&mut self, key: Document, options: IndexOptions) -> Result<String, IndexError> {
if key.is_empty() {
return Err(IndexError::InvalidIndex("Index key must have at least one field".to_string()));
}
let name = options.name.unwrap_or_else(|| Self::generate_index_name(&key));
if self.indexes.contains_key(&name) {
debug!(index_name = %name, "Index already exists, returning existing");
return Ok(name);
}
debug!(index_name = %name, unique = options.unique, sparse = options.sparse, "Creating index");
let index_data = IndexData::new(
key,
name.clone(),
options.unique,
options.sparse,
options.expire_after_seconds,
);
self.indexes.insert(name.clone(), index_data);
Ok(name)
}
/// Drop an index by name. Returns true if the index existed.
/// Cannot drop the `_id_` index.
pub fn drop_index(&mut self, name: &str) -> Result<bool, IndexError> {
if name == "_id_" {
return Err(IndexError::ProtectedIndex("_id_".to_string()));
}
let existed = self.indexes.remove(name).is_some();
if existed {
debug!(index_name = %name, "Dropped index");
}
Ok(existed)
}
/// Drop all indexes except `_id_`.
pub fn drop_all_indexes(&mut self) {
self.indexes.retain(|name, _| name == "_id_");
debug!("Dropped all non-_id indexes");
}
/// List all indexes.
pub fn list_indexes(&self) -> Vec<IndexInfo> {
self.indexes.values().map(|idx| idx.to_info()).collect()
}
/// Check whether an index with the given name exists.
pub fn index_exists(&self, name: &str) -> bool {
self.indexes.contains_key(name)
}
/// Check unique constraints for a document without modifying the index.
/// Returns Ok(()) if no conflict, Err(DuplicateKey) if a unique constraint
/// would be violated. This is a read-only check (immutable &self).
pub fn check_unique_constraints(&self, doc: &Document) -> Result<(), IndexError> {
for idx in self.indexes.values() {
if idx.unique {
let key_bytes = Self::extract_key_bytes(doc, &idx.key, idx.sparse);
if let Some(ref kb) = key_bytes {
if let Some(existing_ids) = idx.hash.get(kb) {
if !existing_ids.is_empty() {
return Err(IndexError::DuplicateKey {
index: idx.name.clone(),
key: format!("{:?}", kb),
});
}
}
}
}
}
Ok(())
}
/// Check unique constraints for an update, excluding the document being updated.
/// Returns Ok(()) if no conflict. This is a read-only check (immutable &self).
pub fn check_unique_constraints_for_update(
&self,
old_doc: &Document,
new_doc: &Document,
) -> Result<(), IndexError> {
let doc_id = Self::extract_id(old_doc);
for idx in self.indexes.values() {
if idx.unique {
let new_key_bytes = Self::extract_key_bytes(new_doc, &idx.key, idx.sparse);
if let Some(ref kb) = new_key_bytes {
if let Some(existing_ids) = idx.hash.get(kb) {
let has_conflict = existing_ids.iter().any(|id| *id != doc_id);
if has_conflict {
return Err(IndexError::DuplicateKey {
index: idx.name.clone(),
key: format!("{:?}", kb),
});
}
}
}
}
}
Ok(())
}
/// Notify the engine that a document has been inserted.
/// Checks unique constraints and updates all index structures.
pub fn on_insert(&mut self, doc: &Document) -> Result<(), IndexError> {
let doc_id = Self::extract_id(doc);
// First pass: check unique constraints
for idx in self.indexes.values() {
if idx.unique {
let key_bytes = Self::extract_key_bytes(doc, &idx.key, idx.sparse);
if let Some(ref kb) = key_bytes {
if let Some(existing_ids) = idx.hash.get(kb) {
if !existing_ids.is_empty() {
return Err(IndexError::DuplicateKey {
index: idx.name.clone(),
key: format!("{:?}", kb),
});
}
}
}
}
}
// Second pass: insert into all indexes
for idx in self.indexes.values_mut() {
let key_bytes = Self::extract_key_bytes(doc, &idx.key, idx.sparse);
if let Some(kb) = key_bytes {
idx.btree.entry(kb.clone()).or_default().insert(doc_id.clone());
idx.hash.entry(kb).or_default().insert(doc_id.clone());
}
}
trace!(doc_id = %doc_id, "Indexed document on insert");
Ok(())
}
/// Notify the engine that a document has been updated.
pub fn on_update(&mut self, old_doc: &Document, new_doc: &Document) -> Result<(), IndexError> {
let doc_id = Self::extract_id(old_doc);
// Check unique constraints for the new document (excluding the document itself)
for idx in self.indexes.values() {
if idx.unique {
let new_key_bytes = Self::extract_key_bytes(new_doc, &idx.key, idx.sparse);
if let Some(ref kb) = new_key_bytes {
if let Some(existing_ids) = idx.hash.get(kb) {
// If there are existing entries that aren't this document, it's a conflict
let other_ids: HashSet<_> = existing_ids.iter()
.filter(|id| **id != doc_id)
.collect();
if !other_ids.is_empty() {
return Err(IndexError::DuplicateKey {
index: idx.name.clone(),
key: format!("{:?}", kb),
});
}
}
}
}
}
// Remove old entries and insert new ones
for idx in self.indexes.values_mut() {
let old_key_bytes = Self::extract_key_bytes(old_doc, &idx.key, idx.sparse);
if let Some(ref kb) = old_key_bytes {
if let Some(set) = idx.btree.get_mut(kb) {
set.remove(&doc_id);
if set.is_empty() {
idx.btree.remove(kb);
}
}
if let Some(set) = idx.hash.get_mut(kb) {
set.remove(&doc_id);
if set.is_empty() {
idx.hash.remove(kb);
}
}
}
let new_key_bytes = Self::extract_key_bytes(new_doc, &idx.key, idx.sparse);
if let Some(kb) = new_key_bytes {
idx.btree.entry(kb.clone()).or_default().insert(doc_id.clone());
idx.hash.entry(kb).or_default().insert(doc_id.clone());
}
}
trace!(doc_id = %doc_id, "Re-indexed document on update");
Ok(())
}
/// Notify the engine that a document has been deleted.
pub fn on_delete(&mut self, doc: &Document) {
let doc_id = Self::extract_id(doc);
for idx in self.indexes.values_mut() {
let key_bytes = Self::extract_key_bytes(doc, &idx.key, idx.sparse);
if let Some(ref kb) = key_bytes {
if let Some(set) = idx.btree.get_mut(kb) {
set.remove(&doc_id);
if set.is_empty() {
idx.btree.remove(kb);
}
}
if let Some(set) = idx.hash.get_mut(kb) {
set.remove(&doc_id);
if set.is_empty() {
idx.hash.remove(kb);
}
}
}
}
trace!(doc_id = %doc_id, "Removed document from indexes");
}
/// Attempt to find candidate document IDs using indexes for the given filter.
/// Returns `None` if no suitable index is found (meaning a COLLSCAN is needed).
/// Returns `Some(set)` with candidate IDs that should be checked against the full filter.
pub fn find_candidate_ids(&self, filter: &Document) -> Option<HashSet<String>> {
if filter.is_empty() {
return None;
}
// Try each index to see which can serve this query
let mut best_candidates: Option<HashSet<String>> = None;
let mut best_score: f64 = 0.0;
for idx in self.indexes.values() {
if let Some((candidates, score)) = self.try_index_lookup(idx, filter) {
if score > best_score {
best_score = score;
best_candidates = Some(candidates);
}
}
}
best_candidates
}
/// Rebuild all indexes from a full set of documents.
pub fn rebuild_from_documents(&mut self, docs: &[Document]) {
// Clear all index data
for idx in self.indexes.values_mut() {
idx.btree.clear();
idx.hash.clear();
}
// Re-index all documents
for doc in docs {
let doc_id = Self::extract_id(doc);
for idx in self.indexes.values_mut() {
let key_bytes = Self::extract_key_bytes(doc, &idx.key, idx.sparse);
if let Some(kb) = key_bytes {
idx.btree.entry(kb.clone()).or_default().insert(doc_id.clone());
idx.hash.entry(kb).or_default().insert(doc_id.clone());
}
}
}
debug!(num_docs = docs.len(), num_indexes = self.indexes.len(), "Rebuilt all indexes");
}
// ---- Internal helpers ----
/// Try to use an index for the given filter. Returns candidate IDs and a score.
fn try_index_lookup(&self, idx: &IndexData, filter: &Document) -> Option<(HashSet<String>, f64)> {
let index_fields: Vec<String> = idx.key.keys().map(|k| k.to_string()).collect();
// Check if the filter uses fields covered by this index
let mut matched_any = false;
let mut result_set: Option<HashSet<String>> = None;
let mut total_score: f64 = 0.0;
for field in &index_fields {
if let Some(condition) = filter.get(field) {
matched_any = true;
let (candidates, score) = self.lookup_field(idx, field, condition);
total_score += score;
// Add unique bonus
if idx.unique {
total_score += 0.5;
}
result_set = Some(match result_set {
Some(existing) => existing.intersection(&candidates).cloned().collect(),
None => candidates,
});
}
}
if !matched_any {
return None;
}
result_set.map(|rs| (rs, total_score))
}
/// Look up candidates for a single field condition in an index.
fn lookup_field(&self, idx: &IndexData, field: &str, condition: &Bson) -> (HashSet<String>, f64) {
match condition {
// Equality match
Bson::Document(cond_doc) if Self::has_operators(cond_doc) => {
self.lookup_operator(idx, field, cond_doc)
}
// Direct equality
_ => {
let key_bytes = Self::bson_to_key_bytes(condition);
let candidates = idx.hash
.get(&key_bytes)
.cloned()
.unwrap_or_default();
(candidates, 2.0) // equality score
}
}
}
/// Handle operator-based lookups ($eq, $in, $gt, $lt, etc.).
fn lookup_operator(&self, idx: &IndexData, field: &str, operators: &Document) -> (HashSet<String>, f64) {
let mut candidates = HashSet::new();
let mut score: f64 = 0.0;
let mut has_range = false;
for (op, value) in operators {
match op.as_str() {
"$eq" => {
let key_bytes = Self::bson_to_key_bytes(value);
if let Some(ids) = idx.hash.get(&key_bytes) {
candidates = if candidates.is_empty() {
ids.clone()
} else {
candidates.intersection(ids).cloned().collect()
};
}
score += 2.0;
}
"$in" => {
if let Bson::Array(arr) = value {
let mut in_candidates = HashSet::new();
for v in arr {
let key_bytes = Self::bson_to_key_bytes(v);
if let Some(ids) = idx.hash.get(&key_bytes) {
in_candidates.extend(ids.iter().cloned());
}
}
candidates = if candidates.is_empty() {
in_candidates
} else {
candidates.intersection(&in_candidates).cloned().collect()
};
score += 1.5;
}
}
"$gt" | "$gte" | "$lt" | "$lte" => {
let range_candidates = self.range_scan(idx, field, op.as_str(), value);
candidates = if candidates.is_empty() && !has_range {
range_candidates
} else {
candidates.intersection(&range_candidates).cloned().collect()
};
has_range = true;
score += 1.0;
}
_ => {
// Operators like $ne, $nin, $exists, $regex are not efficiently indexable
// Return all indexed IDs for this index
}
}
}
// If we only had non-indexable operators, return empty with 0 score
if score == 0.0 {
return (HashSet::new(), 0.0);
}
(candidates, score)
}
/// Perform a range scan on the B-tree index.
fn range_scan(&self, idx: &IndexData, _field: &str, op: &str, bound: &Bson) -> HashSet<String> {
let bound_bytes = Self::bson_to_key_bytes(bound);
let mut result = HashSet::new();
match op {
"$gt" => {
use std::ops::Bound;
for (_key, ids) in idx.btree.range((Bound::Excluded(bound_bytes), Bound::Unbounded)) {
result.extend(ids.iter().cloned());
}
}
"$gte" => {
for (_key, ids) in idx.btree.range(bound_bytes..) {
result.extend(ids.iter().cloned());
}
}
"$lt" => {
for (_key, ids) in idx.btree.range(..bound_bytes) {
result.extend(ids.iter().cloned());
}
}
"$lte" => {
for (_key, ids) in idx.btree.range(..=bound_bytes) {
result.extend(ids.iter().cloned());
}
}
_ => {}
}
result
}
/// Generate an index name from the key spec (e.g. {"name": 1, "age": -1} -> "name_1_age_-1").
fn generate_index_name(key: &Document) -> String {
key.iter()
.map(|(field, dir)| {
let dir_val = match dir {
Bson::Int32(n) => n.to_string(),
Bson::Int64(n) => n.to_string(),
Bson::String(s) => s.clone(),
_ => "1".to_string(),
};
format!("{}_{}", field, dir_val)
})
.collect::<Vec<_>>()
.join("_")
}
/// Extract the `_id` field from a document as a hex string.
fn extract_id(doc: &Document) -> String {
match doc.get("_id") {
Some(Bson::ObjectId(oid)) => oid.to_hex(),
Some(Bson::String(s)) => s.clone(),
Some(other) => format!("{}", other),
None => String::new(),
}
}
/// Extract the index key bytes from a document for a given key specification.
/// Returns `None` if the document should be skipped (sparse index with missing fields).
fn extract_key_bytes(doc: &Document, key_spec: &Document, sparse: bool) -> Option<Vec<u8>> {
let fields: Vec<(&str, &Bson)> = key_spec.iter().map(|(k, v)| (k.as_str(), v)).collect();
if fields.len() == 1 {
// Single-field index
let field = fields[0].0;
let value = Self::resolve_field_value(doc, field);
if sparse && value.is_none() {
return None;
}
let val = value.unwrap_or(Bson::Null);
Some(Self::bson_to_key_bytes(&val))
} else {
// Compound index: concatenate field values
let mut all_null = true;
let mut compound_bytes = Vec::new();
for (field, _dir) in &fields {
let value = Self::resolve_field_value(doc, field);
if value.is_some() {
all_null = false;
}
let val = value.unwrap_or(Bson::Null);
let field_bytes = Self::bson_to_key_bytes(&val);
// Length-prefix each field for unambiguous concatenation
compound_bytes.extend_from_slice(&(field_bytes.len() as u32).to_be_bytes());
compound_bytes.extend_from_slice(&field_bytes);
}
if sparse && all_null {
return None;
}
Some(compound_bytes)
}
}
/// Resolve a field value from a document, supporting dot notation.
fn resolve_field_value(doc: &Document, field: &str) -> Option<Bson> {
if field.contains('.') {
get_nested_value(doc, field)
} else {
doc.get(field).cloned()
}
}
/// Serialize a BSON value to bytes for use as an index key.
fn bson_to_key_bytes(value: &Bson) -> Vec<u8> {
// Use BSON raw serialization for consistent byte representation.
// We wrap in a document since raw BSON requires a top-level document.
let wrapper = bson::doc! { "k": value.clone() };
let raw = bson::to_vec(&wrapper).unwrap_or_default();
raw
}
fn has_operators(doc: &Document) -> bool {
doc.keys().any(|k| k.starts_with('$'))
}
}
impl Default for IndexEngine {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use bson::oid::ObjectId;
fn make_doc(name: &str, age: i32) -> Document {
bson::doc! {
"_id": ObjectId::new(),
"name": name,
"age": age,
}
}
#[test]
fn test_default_id_index() {
let engine = IndexEngine::new();
assert!(engine.index_exists("_id_"));
assert_eq!(engine.list_indexes().len(), 1);
}
#[test]
fn test_create_and_drop_index() {
let mut engine = IndexEngine::new();
let name = engine.create_index(
bson::doc! { "name": 1 },
IndexOptions::default(),
).unwrap();
assert_eq!(name, "name_1");
assert!(engine.index_exists("name_1"));
assert!(engine.drop_index("name_1").unwrap());
assert!(!engine.index_exists("name_1"));
}
#[test]
fn test_cannot_drop_id_index() {
let mut engine = IndexEngine::new();
let result = engine.drop_index("_id_");
assert!(result.is_err());
}
#[test]
fn test_unique_constraint() {
let mut engine = IndexEngine::new();
engine.create_index(
bson::doc! { "email": 1 },
IndexOptions { unique: true, ..Default::default() },
).unwrap();
let doc1 = bson::doc! { "_id": ObjectId::new(), "email": "a@b.com" };
let doc2 = bson::doc! { "_id": ObjectId::new(), "email": "a@b.com" };
engine.on_insert(&doc1).unwrap();
let result = engine.on_insert(&doc2);
assert!(result.is_err());
}
#[test]
fn test_find_candidates_equality() {
let mut engine = IndexEngine::new();
engine.create_index(
bson::doc! { "name": 1 },
IndexOptions::default(),
).unwrap();
let doc1 = make_doc("Alice", 30);
let doc2 = make_doc("Bob", 25);
let doc3 = make_doc("Alice", 35);
engine.on_insert(&doc1).unwrap();
engine.on_insert(&doc2).unwrap();
engine.on_insert(&doc3).unwrap();
let filter = bson::doc! { "name": "Alice" };
let candidates = engine.find_candidate_ids(&filter);
assert!(candidates.is_some());
assert_eq!(candidates.unwrap().len(), 2);
}
#[test]
fn test_on_delete() {
let mut engine = IndexEngine::new();
engine.create_index(
bson::doc! { "name": 1 },
IndexOptions::default(),
).unwrap();
let doc = make_doc("Alice", 30);
engine.on_insert(&doc).unwrap();
let filter = bson::doc! { "name": "Alice" };
assert!(engine.find_candidate_ids(&filter).is_some());
engine.on_delete(&doc);
let candidates = engine.find_candidate_ids(&filter);
assert!(candidates.is_some());
assert!(candidates.unwrap().is_empty());
}
#[test]
fn test_rebuild_from_documents() {
let mut engine = IndexEngine::new();
engine.create_index(
bson::doc! { "name": 1 },
IndexOptions::default(),
).unwrap();
let docs = vec![
make_doc("Alice", 30),
make_doc("Bob", 25),
];
engine.rebuild_from_documents(&docs);
let filter = bson::doc! { "name": "Alice" };
let candidates = engine.find_candidate_ids(&filter);
assert!(candidates.is_some());
assert_eq!(candidates.unwrap().len(), 1);
}
#[test]
fn test_drop_all_indexes() {
let mut engine = IndexEngine::new();
engine.create_index(bson::doc! { "a": 1 }, IndexOptions::default()).unwrap();
engine.create_index(bson::doc! { "b": 1 }, IndexOptions::default()).unwrap();
assert_eq!(engine.list_indexes().len(), 3);
engine.drop_all_indexes();
assert_eq!(engine.list_indexes().len(), 1);
assert!(engine.index_exists("_id_"));
}
}