fix(rustdb-indexes): persist created indexes and restore them on server startup
This commit is contained in:
@@ -101,7 +101,15 @@ async fn handle_create_indexes(
|
||||
expire_after_seconds,
|
||||
};
|
||||
|
||||
// Create the index.
|
||||
let options_for_persist = IndexOptions {
|
||||
name: options.name.clone(),
|
||||
unique: options.unique,
|
||||
sparse: options.sparse,
|
||||
expire_after_seconds: options.expire_after_seconds,
|
||||
};
|
||||
let key_for_persist = key.clone();
|
||||
|
||||
// Create the index in-memory.
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
@@ -110,6 +118,22 @@ async fn handle_create_indexes(
|
||||
match engine.create_index(key, options) {
|
||||
Ok(index_name) => {
|
||||
debug!(index_name = %index_name, "Created index");
|
||||
|
||||
// Persist index spec to disk.
|
||||
let mut spec = doc! { "key": key_for_persist };
|
||||
if options_for_persist.unique {
|
||||
spec.insert("unique", true);
|
||||
}
|
||||
if options_for_persist.sparse {
|
||||
spec.insert("sparse", true);
|
||||
}
|
||||
if let Some(ttl) = options_for_persist.expire_after_seconds {
|
||||
spec.insert("expireAfterSeconds", ttl as i64);
|
||||
}
|
||||
if let Err(e) = ctx.storage.save_index(db, coll, &index_name, spec).await {
|
||||
tracing::warn!(index = %index_name, error = %e, "failed to persist index spec");
|
||||
}
|
||||
|
||||
created_count += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -180,9 +204,21 @@ async fn handle_drop_indexes(
|
||||
match index_spec {
|
||||
Some(Bson::String(name)) if name == "*" => {
|
||||
// Drop all indexes except _id_.
|
||||
// Collect names to drop from storage first.
|
||||
let names_to_drop: Vec<String> = if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
engine.list_indexes().iter()
|
||||
.filter(|info| info.name != "_id_")
|
||||
.map(|info| info.name.clone())
|
||||
.collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
engine.drop_all_indexes();
|
||||
}
|
||||
for idx_name in &names_to_drop {
|
||||
let _ = ctx.storage.drop_index(db, coll, idx_name).await;
|
||||
}
|
||||
}
|
||||
Some(Bson::String(name)) => {
|
||||
// Drop by name.
|
||||
@@ -196,6 +232,7 @@ async fn handle_drop_indexes(
|
||||
name
|
||||
)));
|
||||
}
|
||||
let _ = ctx.storage.drop_index(db, coll, name).await;
|
||||
}
|
||||
Some(Bson::Document(key_spec)) => {
|
||||
// Drop by key spec: find the index with matching key.
|
||||
@@ -210,6 +247,7 @@ async fn handle_drop_indexes(
|
||||
engine.drop_index(&name).map_err(|e| {
|
||||
CommandError::IndexError(e.to_string())
|
||||
})?;
|
||||
let _ = ctx.storage.drop_index(db, coll, &name).await;
|
||||
} else {
|
||||
return Err(CommandError::IndexError(
|
||||
"index not found with specified key".into(),
|
||||
|
||||
@@ -16,7 +16,7 @@ use rustdb_config::{RustDbOptions, StorageType};
|
||||
use rustdb_wire::{WireCodec, OP_QUERY};
|
||||
use rustdb_wire::{encode_op_msg_response, encode_op_reply_response};
|
||||
use rustdb_storage::{StorageAdapter, MemoryStorageAdapter, FileStorageAdapter, OpLog};
|
||||
// IndexEngine is used indirectly via CommandContext
|
||||
use rustdb_index::{IndexEngine, IndexOptions};
|
||||
use rustdb_txn::{TransactionEngine, SessionEngine};
|
||||
use rustdb_commands::{CommandRouter, CommandContext};
|
||||
|
||||
@@ -80,9 +80,79 @@ impl RustDb {
|
||||
});
|
||||
}
|
||||
|
||||
let indexes: Arc<DashMap<String, IndexEngine>> = Arc::new(DashMap::new());
|
||||
|
||||
// Restore persisted indexes from storage.
|
||||
if let Ok(databases) = storage.list_databases().await {
|
||||
for db_name in &databases {
|
||||
if let Ok(collections) = storage.list_collections(db_name).await {
|
||||
for coll_name in &collections {
|
||||
if let Ok(specs) = storage.get_indexes(db_name, coll_name).await {
|
||||
let has_custom = specs.iter().any(|s| {
|
||||
s.get_str("name").unwrap_or("_id_") != "_id_"
|
||||
});
|
||||
if !has_custom {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ns_key = format!("{}.{}", db_name, coll_name);
|
||||
let mut engine = IndexEngine::new();
|
||||
|
||||
for spec in &specs {
|
||||
let name = spec.get_str("name").unwrap_or("").to_string();
|
||||
if name == "_id_" {
|
||||
continue; // already created by IndexEngine::new()
|
||||
}
|
||||
let key = match spec.get("key") {
|
||||
Some(bson::Bson::Document(k)) => k.clone(),
|
||||
_ => continue,
|
||||
};
|
||||
let unique = matches!(spec.get("unique"), Some(bson::Bson::Boolean(true)));
|
||||
let sparse = matches!(spec.get("sparse"), Some(bson::Bson::Boolean(true)));
|
||||
let expire_after_seconds = match spec.get("expireAfterSeconds") {
|
||||
Some(bson::Bson::Int32(n)) => Some(*n as u64),
|
||||
Some(bson::Bson::Int64(n)) => Some(*n as u64),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let options = IndexOptions {
|
||||
name: Some(name.clone()),
|
||||
unique,
|
||||
sparse,
|
||||
expire_after_seconds,
|
||||
};
|
||||
if let Err(e) = engine.create_index(key, options) {
|
||||
tracing::warn!(
|
||||
namespace = %ns_key,
|
||||
index = %name,
|
||||
error = %e,
|
||||
"failed to restore index"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Rebuild index data from existing documents.
|
||||
if let Ok(docs) = storage.find_all(db_name, coll_name).await {
|
||||
if !docs.is_empty() {
|
||||
engine.rebuild_from_documents(&docs);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
namespace = %ns_key,
|
||||
indexes = engine.list_indexes().len(),
|
||||
"restored indexes"
|
||||
);
|
||||
indexes.insert(ns_key, engine);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = Arc::new(CommandContext {
|
||||
storage,
|
||||
indexes: Arc::new(DashMap::new()),
|
||||
indexes,
|
||||
transactions: Arc::new(TransactionEngine::new()),
|
||||
sessions: Arc::new(SessionEngine::new(30 * 60 * 1000, 60 * 1000)),
|
||||
cursors: Arc::new(DashMap::new()),
|
||||
|
||||
Reference in New Issue
Block a user