fix(rustdb-commands): restore persisted index initialization before writes to enforce unique constraints after restart
This commit is contained in:
@@ -1,8 +1,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use bson::Document;
|
||||
use bson::{Bson, Document};
|
||||
use dashmap::DashMap;
|
||||
use rustdb_index::IndexEngine;
|
||||
use rustdb_index::{IndexEngine, IndexOptions};
|
||||
use rustdb_storage::{OpLog, StorageAdapter};
|
||||
use rustdb_txn::{SessionEngine, TransactionEngine};
|
||||
|
||||
@@ -24,6 +24,67 @@ pub struct CommandContext {
|
||||
pub oplog: Arc<OpLog>,
|
||||
}
|
||||
|
||||
impl CommandContext {
|
||||
/// Get or lazily initialize an IndexEngine for a namespace.
|
||||
///
|
||||
/// If no IndexEngine exists yet for this namespace, loads persisted index
|
||||
/// specs from `indexes.json` via the storage adapter, creates the engine
|
||||
/// with those specs, and rebuilds index data from existing documents.
|
||||
/// This ensures unique indexes are enforced even on the very first write
|
||||
/// after a restart.
|
||||
pub async fn get_or_init_index_engine(&self, db: &str, coll: &str) -> dashmap::mapref::one::RefMut<'_, String, IndexEngine> {
|
||||
let ns_key = format!("{}.{}", db, coll);
|
||||
|
||||
// Fast path: engine already exists.
|
||||
if self.indexes.contains_key(&ns_key) {
|
||||
return self.indexes.entry(ns_key).or_insert_with(IndexEngine::new);
|
||||
}
|
||||
|
||||
// Slow path: load from persisted specs.
|
||||
let mut engine = IndexEngine::new();
|
||||
let mut has_custom = false;
|
||||
|
||||
if let Ok(specs) = self.storage.get_indexes(db, coll).await {
|
||||
for spec in &specs {
|
||||
let name = spec.get_str("name").unwrap_or("").to_string();
|
||||
if name == "_id_" || name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let key = match spec.get("key") {
|
||||
Some(Bson::Document(k)) => k.clone(),
|
||||
_ => continue,
|
||||
};
|
||||
let unique = matches!(spec.get("unique"), Some(Bson::Boolean(true)));
|
||||
let sparse = matches!(spec.get("sparse"), Some(Bson::Boolean(true)));
|
||||
let expire_after_seconds = match spec.get("expireAfterSeconds") {
|
||||
Some(Bson::Int32(n)) => Some(*n as u64),
|
||||
Some(Bson::Int64(n)) => Some(*n as u64),
|
||||
_ => None,
|
||||
};
|
||||
let options = IndexOptions {
|
||||
name: Some(name),
|
||||
unique,
|
||||
sparse,
|
||||
expire_after_seconds,
|
||||
};
|
||||
let _ = engine.create_index(key, options);
|
||||
has_custom = true;
|
||||
}
|
||||
}
|
||||
|
||||
if has_custom {
|
||||
// Rebuild index data from existing documents.
|
||||
if let Ok(docs) = self.storage.find_all(db, coll).await {
|
||||
if !docs.is_empty() {
|
||||
engine.rebuild_from_documents(&docs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.indexes.entry(ns_key).or_insert(engine)
|
||||
}
|
||||
}
|
||||
|
||||
/// State of an open cursor from a find or aggregate command.
|
||||
pub struct CursorState {
|
||||
/// Documents remaining to be returned.
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use bson::{doc, oid::ObjectId, Bson, Document};
|
||||
use rustdb_index::IndexEngine;
|
||||
use rustdb_storage::OpType;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -56,6 +55,11 @@ pub async fn handle(
|
||||
let mut inserted_count: i32 = 0;
|
||||
let mut write_errors: Vec<Document> = Vec::new();
|
||||
|
||||
// Ensure the IndexEngine is loaded (with persisted specs from indexes.json).
|
||||
// This must happen BEFORE any writes, so unique constraints are enforced
|
||||
// even on the first write after a restart.
|
||||
drop(ctx.get_or_init_index_engine(db, coll).await);
|
||||
|
||||
for (idx, mut doc) in docs.into_iter().enumerate() {
|
||||
// Auto-generate _id if not present.
|
||||
if !doc.contains_key("_id") {
|
||||
@@ -63,6 +67,7 @@ pub async fn handle(
|
||||
}
|
||||
|
||||
// Pre-check unique index constraints BEFORE storage write.
|
||||
// The engine is guaranteed to exist from the get_or_init call above.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints(&doc) {
|
||||
let err_msg = e.to_string();
|
||||
@@ -92,17 +97,15 @@ pub async fn handle(
|
||||
None,
|
||||
);
|
||||
|
||||
// Update index engine.
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
if let Err(e) = engine.on_insert(&doc) {
|
||||
tracing::error!(
|
||||
namespace = %ns_key,
|
||||
error = %e,
|
||||
"index update failed after successful insert (pre-check passed but insert failed)"
|
||||
);
|
||||
// Update index engine (already initialized above).
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
if let Err(e) = engine.on_insert(&doc) {
|
||||
tracing::error!(
|
||||
namespace = %ns_key,
|
||||
error = %e,
|
||||
"index update failed after successful insert"
|
||||
);
|
||||
}
|
||||
}
|
||||
inserted_count += 1;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use bson::{doc, oid::ObjectId, Bson, Document};
|
||||
use rustdb_index::IndexEngine;
|
||||
use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection};
|
||||
use rustdb_storage::OpType;
|
||||
use tracing::debug;
|
||||
@@ -47,6 +46,10 @@ async fn handle_update(
|
||||
ensure_collection_exists(db, coll, ctx).await?;
|
||||
|
||||
let ns_key = format!("{}.{}", db, coll);
|
||||
|
||||
// Ensure the IndexEngine is loaded with persisted specs from indexes.json.
|
||||
drop(ctx.get_or_init_index_engine(db, coll).await);
|
||||
|
||||
let mut total_n: i32 = 0;
|
||||
let mut total_n_modified: i32 = 0;
|
||||
let mut upserted_list: Vec<Document> = Vec::new();
|
||||
@@ -179,13 +182,11 @@ async fn handle_update(
|
||||
None,
|
||||
);
|
||||
|
||||
// Update index.
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
if let Err(e) = engine.on_insert(&updated) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
|
||||
// Update index (engine already initialized above).
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
if let Err(e) = engine.on_insert(&updated) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
|
||||
}
|
||||
}
|
||||
|
||||
total_n += 1;
|
||||
@@ -402,6 +403,9 @@ async fn handle_find_and_modify(
|
||||
|
||||
let ns_key = format!("{}.{}", db, coll);
|
||||
|
||||
// Ensure the IndexEngine is loaded with persisted specs.
|
||||
drop(ctx.get_or_init_index_engine(db, coll).await);
|
||||
|
||||
// Load and filter documents.
|
||||
let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?;
|
||||
|
||||
@@ -573,12 +577,10 @@ async fn handle_find_and_modify(
|
||||
|
||||
// Update index.
|
||||
{
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
if let Err(e) = engine.on_insert(&updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
if let Err(e) = engine.on_insert(&updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user