Files
smartdb/rust/crates/rustdb-commands/src/handlers/index_handler.rs
T

381 lines
12 KiB
Rust

use bson::{doc, Bson, Document};
use rustdb_index::{IndexEngine, IndexOptions};
use tracing::debug;
use crate::context::CommandContext;
use crate::error::{CommandError, CommandResult};
/// Handle `createIndexes`, `dropIndexes`, and `listIndexes` commands.
pub async fn handle(
cmd: &Document,
db: &str,
ctx: &CommandContext,
command_name: &str,
) -> CommandResult<Document> {
match command_name {
"createIndexes" => handle_create_indexes(cmd, db, ctx).await,
"dropIndexes" => handle_drop_indexes(cmd, db, ctx).await,
"listIndexes" => handle_list_indexes(cmd, db, ctx).await,
_ => Ok(doc! { "ok": 1.0 }),
}
}
/// Handle the `createIndexes` command.
async fn handle_create_indexes(
cmd: &Document,
db: &str,
ctx: &CommandContext,
) -> CommandResult<Document> {
let coll = cmd
.get_str("createIndexes")
.map_err(|_| CommandError::InvalidArgument("missing 'createIndexes' field".into()))?;
let indexes = cmd
.get_array("indexes")
.map_err(|_| CommandError::InvalidArgument("missing 'indexes' array".into()))?;
let ns_key = format!("{}.{}", db, coll);
debug!(
db = db,
collection = coll,
count = indexes.len(),
"createIndexes command"
);
// Auto-create collection if needed.
let created_automatically = ensure_collection_exists(db, coll, ctx).await?;
// Get the number of indexes before creating new ones.
let num_before = {
let engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
engine.list_indexes().len() as i32
};
let mut created_count = 0_i32;
for index_bson in indexes {
let index_spec = match index_bson {
Bson::Document(d) => d,
_ => {
return Err(CommandError::InvalidArgument(
"index spec must be a document".into(),
));
}
};
let key = match index_spec.get("key") {
Some(Bson::Document(k)) => k.clone(),
_ => {
return Err(CommandError::InvalidArgument(
"index spec must have a 'key' document".into(),
));
}
};
let name = index_spec.get_str("name").ok().map(|s| s.to_string());
let unique = match index_spec.get("unique") {
Some(Bson::Boolean(b)) => *b,
_ => false,
};
let sparse = match index_spec.get("sparse") {
Some(Bson::Boolean(b)) => *b,
_ => false,
};
let expire_after_seconds = match index_spec.get("expireAfterSeconds") {
Some(Bson::Int32(n)) => Some(*n as u64),
Some(Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name,
unique,
sparse,
expire_after_seconds,
};
let options_for_persist = IndexOptions {
name: options.name.clone(),
unique: options.unique,
sparse: options.sparse,
expire_after_seconds: options.expire_after_seconds,
};
let key_for_persist = key.clone();
// Create the index in-memory.
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
match engine.create_index(key, options) {
Ok(index_name) => {
debug!(index_name = %index_name, "Created index");
// Persist index spec to disk.
let mut spec = doc! { "key": key_for_persist };
if options_for_persist.unique {
spec.insert("unique", true);
}
if options_for_persist.sparse {
spec.insert("sparse", true);
}
if let Some(ttl) = options_for_persist.expire_after_seconds {
spec.insert("expireAfterSeconds", ttl as i64);
}
if let Err(e) = ctx.storage.save_index(db, coll, &index_name, spec).await {
tracing::warn!(index = %index_name, error = %e, "failed to persist index spec");
}
created_count += 1;
}
Err(e) => {
return Err(CommandError::IndexError(e.to_string()));
}
}
}
// If we created indexes on an existing collection, rebuild from documents.
if created_count > 0 && !created_automatically {
// Load all documents and rebuild indexes.
if let Ok(all_docs) = ctx.storage.find_all(db, coll).await {
if !all_docs.is_empty() {
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
engine.rebuild_from_documents(&all_docs);
}
}
}
let num_after = {
let engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
engine.list_indexes().len() as i32
};
Ok(doc! {
"createdCollectionAutomatically": created_automatically,
"numIndexesBefore": num_before,
"numIndexesAfter": num_after,
"ok": 1.0,
})
}
/// Handle the `dropIndexes` command.
async fn handle_drop_indexes(
cmd: &Document,
db: &str,
ctx: &CommandContext,
) -> CommandResult<Document> {
let coll = cmd
.get_str("dropIndexes")
.map_err(|_| CommandError::InvalidArgument("missing 'dropIndexes' field".into()))?;
let ns_key = format!("{}.{}", db, coll);
// Get current index count.
let n_indexes_was = {
match ctx.indexes.get(&ns_key) {
Some(engine) => engine.list_indexes().len() as i32,
None => 1_i32, // At minimum the _id_ index.
}
};
let index_spec = cmd.get("index");
debug!(
db = db,
collection = coll,
index_spec = ?index_spec,
"dropIndexes command"
);
match index_spec {
Some(Bson::String(name)) if name == "*" => {
// Drop all indexes except _id_.
// Collect names to drop from storage first.
let names_to_drop: Vec<String> = if let Some(engine) = ctx.indexes.get(&ns_key) {
engine.list_indexes().iter()
.filter(|info| info.name != "_id_")
.map(|info| info.name.clone())
.collect()
} else {
Vec::new()
};
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
engine.drop_all_indexes();
}
for idx_name in &names_to_drop {
let _ = ctx.storage.drop_index(db, coll, idx_name).await;
}
}
Some(Bson::String(name)) => {
// Drop by name.
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
engine.drop_index(name).map_err(|e| {
CommandError::IndexError(e.to_string())
})?;
} else {
return Err(CommandError::IndexError(format!(
"index not found: {}",
name
)));
}
let _ = ctx.storage.drop_index(db, coll, name).await;
}
Some(Bson::Document(key_spec)) => {
// Drop by key spec: find the index with matching key.
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
let index_name = engine
.list_indexes()
.iter()
.find(|info| info.key == *key_spec)
.map(|info| info.name.clone());
if let Some(name) = index_name {
engine.drop_index(&name).map_err(|e| {
CommandError::IndexError(e.to_string())
})?;
let _ = ctx.storage.drop_index(db, coll, &name).await;
} else {
return Err(CommandError::IndexError(
"index not found with specified key".into(),
));
}
} else {
return Err(CommandError::IndexError(
"no indexes found for collection".into(),
));
}
}
_ => {
return Err(CommandError::InvalidArgument(
"dropIndexes requires 'index' field (string, document, or \"*\")".into(),
));
}
}
Ok(doc! {
"nIndexesWas": n_indexes_was,
"ok": 1.0,
})
}
/// Handle the `listIndexes` command.
async fn handle_list_indexes(
cmd: &Document,
db: &str,
ctx: &CommandContext,
) -> CommandResult<Document> {
let coll = cmd
.get_str("listIndexes")
.map_err(|_| CommandError::InvalidArgument("missing 'listIndexes' field".into()))?;
let ns_key = format!("{}.{}", db, coll);
let ns = format!("{}.{}", db, coll);
// Check if collection exists.
match ctx.storage.collection_exists(db, coll).await {
Ok(false) => {
return Err(CommandError::NamespaceNotFound(format!(
"ns not found: {}",
ns
)));
}
Err(_) => {
// If we can't check, try to proceed anyway.
}
_ => {}
}
let indexes = match ctx.indexes.get(&ns_key) {
Some(engine) => engine.list_indexes(),
None => {
// Return at least the default _id_ index.
let engine = IndexEngine::new();
engine.list_indexes()
}
};
let first_batch: Vec<Bson> = indexes
.into_iter()
.map(|info| {
let mut doc = doc! {
"v": info.v,
"key": info.key,
"name": info.name,
};
if info.unique {
doc.insert("unique", true);
}
if info.sparse {
doc.insert("sparse", true);
}
if let Some(ttl) = info.expire_after_seconds {
doc.insert("expireAfterSeconds", ttl as i64);
}
Bson::Document(doc)
})
.collect();
Ok(doc! {
"cursor": {
"id": 0_i64,
"ns": &ns,
"firstBatch": first_batch,
},
"ok": 1.0,
})
}
/// Ensure the target database and collection exist. Returns true if the collection
/// was newly created (i.e., `createdCollectionAutomatically`).
async fn ensure_collection_exists(
db: &str,
coll: &str,
ctx: &CommandContext,
) -> CommandResult<bool> {
// Create database (ignore AlreadyExists).
if let Err(e) = ctx.storage.create_database(db).await {
let msg = e.to_string();
if !msg.contains("AlreadyExists") && !msg.contains("already exists") {
return Err(CommandError::StorageError(msg));
}
}
// Check if collection exists.
match ctx.storage.collection_exists(db, coll).await {
Ok(true) => Ok(false),
Ok(false) => {
if let Err(e) = ctx.storage.create_collection(db, coll).await {
let msg = e.to_string();
if !msg.contains("AlreadyExists") && !msg.contains("already exists") {
return Err(CommandError::StorageError(msg));
}
}
Ok(true)
}
Err(_) => {
// Try creating anyway.
if let Err(e) = ctx.storage.create_collection(db, coll).await {
let msg = e.to_string();
if !msg.contains("AlreadyExists") && !msg.contains("already exists") {
return Err(CommandError::StorageError(msg));
}
}
Ok(true)
}
}
}