From e8161e6417d6e74f600a35f93f5fa43e3df62966 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sun, 5 Apr 2026 03:51:58 +0000 Subject: [PATCH] fix(rustdb-commands): restore persisted index initialization before writes to enforce unique constraints after restart --- changelog.md | 7 ++ package.json | 1 + pnpm-lock.yaml | 11 ++-- rust/crates/rustdb-commands/src/context.rs | 65 ++++++++++++++++++- .../src/handlers/insert_handler.rs | 27 ++++---- .../src/handlers/update_handler.rs | 30 +++++---- ts/00_commitinfo_data.ts | 2 +- 7 files changed, 110 insertions(+), 33 deletions(-) diff --git a/changelog.md b/changelog.md index ed4bef5..6772be4 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-04-05 - 2.5.3 - fix(rustdb-commands) +restore persisted index initialization before writes to enforce unique constraints after restart + +- load stored index specifications from storage when creating command context index engines +- rebuild index data from existing documents so custom indexes are active before insert, update, and upsert operations +- add @push.rocks/smartdata as a runtime dependency + ## 2026-04-05 - 2.5.2 - fix(rustdb-indexes) persist created indexes and restore them on server startup diff --git a/package.json b/package.json index 35c3887..c6e09b6 100644 --- a/package.json +++ b/package.json @@ -29,6 +29,7 @@ "dependencies": { "@api.global/typedserver": "^8.0.0", "@design.estate/dees-element": "^2.0.0", + "@push.rocks/smartdata": "7.1.5", "@push.rocks/smartrust": "^1.3.2", "bson": "^7.2.0" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 8880041..5061eab 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -14,6 +14,9 @@ importers: '@design.estate/dees-element': specifier: ^2.0.0 version: 2.2.3 + '@push.rocks/smartdata': + specifier: 7.1.5 + version: 7.1.5(socks@2.8.7) '@push.rocks/smartrust': specifier: ^1.3.2 version: 1.3.2 @@ -1026,8 +1029,8 @@ packages: '@push.rocks/smartcrypto@2.0.4': resolution: {integrity: sha512-1+/5bsjyataf5uUkUNnnVXGRAt+gHVk1KDzozjTqgqJxHvQk1d9fVDohL6CxUhUucTPtu5VR5xNBiV8YCDuGyw==} - '@push.rocks/smartdata@7.1.3': - resolution: {integrity: sha512-7vQJ9pdRk450yn2m9tmGPdSRlQVmxFPZjHD4sGYsfqCQPg+GLFusu+H16zpf+jKzAq4F2ZBMPaYymJHXvXiVcw==} + '@push.rocks/smartdata@7.1.5': + resolution: {integrity: sha512-7x7VedEg6RocWndqUPuTbY2Bh85Q/x0LOVHL4o/NVXyh3IGNtiVQ8ple4WR0qYqlHRAojX4eDSBPMiYzIasqAg==} '@push.rocks/smartdelay@3.0.5': resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==} @@ -5665,7 +5668,7 @@ snapshots: '@types/node-forge': 1.3.14 node-forge: 1.4.0 - '@push.rocks/smartdata@7.1.3(socks@2.8.7)': + '@push.rocks/smartdata@7.1.5(socks@2.8.7)': dependencies: '@push.rocks/lik': 6.4.0 '@push.rocks/smartdelay': 3.0.5 @@ -5899,7 +5902,7 @@ snapshots: '@push.rocks/smartmongo@5.1.1(socks@2.8.7)': dependencies: '@push.rocks/mongodump': 1.1.0(socks@2.8.7) - '@push.rocks/smartdata': 7.1.3(socks@2.8.7) + '@push.rocks/smartdata': 7.1.5(socks@2.8.7) '@push.rocks/smartfs': 1.5.0 '@push.rocks/smartpath': 6.0.0 '@push.rocks/smartpromise': 4.2.3 diff --git a/rust/crates/rustdb-commands/src/context.rs b/rust/crates/rustdb-commands/src/context.rs index 846b254..4b1bb0b 100644 --- a/rust/crates/rustdb-commands/src/context.rs +++ b/rust/crates/rustdb-commands/src/context.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use bson::Document; +use bson::{Bson, Document}; use dashmap::DashMap; -use rustdb_index::IndexEngine; +use rustdb_index::{IndexEngine, IndexOptions}; use rustdb_storage::{OpLog, StorageAdapter}; use rustdb_txn::{SessionEngine, TransactionEngine}; @@ -24,6 +24,67 @@ pub struct CommandContext { pub oplog: Arc, } +impl CommandContext { + /// Get or lazily initialize an IndexEngine for a namespace. + /// + /// If no IndexEngine exists yet for this namespace, loads persisted index + /// specs from `indexes.json` via the storage adapter, creates the engine + /// with those specs, and rebuilds index data from existing documents. + /// This ensures unique indexes are enforced even on the very first write + /// after a restart. + pub async fn get_or_init_index_engine(&self, db: &str, coll: &str) -> dashmap::mapref::one::RefMut<'_, String, IndexEngine> { + let ns_key = format!("{}.{}", db, coll); + + // Fast path: engine already exists. + if self.indexes.contains_key(&ns_key) { + return self.indexes.entry(ns_key).or_insert_with(IndexEngine::new); + } + + // Slow path: load from persisted specs. + let mut engine = IndexEngine::new(); + let mut has_custom = false; + + if let Ok(specs) = self.storage.get_indexes(db, coll).await { + for spec in &specs { + let name = spec.get_str("name").unwrap_or("").to_string(); + if name == "_id_" || name.is_empty() { + continue; + } + let key = match spec.get("key") { + Some(Bson::Document(k)) => k.clone(), + _ => continue, + }; + let unique = matches!(spec.get("unique"), Some(Bson::Boolean(true))); + let sparse = matches!(spec.get("sparse"), Some(Bson::Boolean(true))); + let expire_after_seconds = match spec.get("expireAfterSeconds") { + Some(Bson::Int32(n)) => Some(*n as u64), + Some(Bson::Int64(n)) => Some(*n as u64), + _ => None, + }; + let options = IndexOptions { + name: Some(name), + unique, + sparse, + expire_after_seconds, + }; + let _ = engine.create_index(key, options); + has_custom = true; + } + } + + if has_custom { + // Rebuild index data from existing documents. + if let Ok(docs) = self.storage.find_all(db, coll).await { + if !docs.is_empty() { + engine.rebuild_from_documents(&docs); + } + } + } + + self.indexes.entry(ns_key).or_insert(engine) + } +} + /// State of an open cursor from a find or aggregate command. pub struct CursorState { /// Documents remaining to be returned. diff --git a/rust/crates/rustdb-commands/src/handlers/insert_handler.rs b/rust/crates/rustdb-commands/src/handlers/insert_handler.rs index 05b5630..af253fb 100644 --- a/rust/crates/rustdb-commands/src/handlers/insert_handler.rs +++ b/rust/crates/rustdb-commands/src/handlers/insert_handler.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use bson::{doc, oid::ObjectId, Bson, Document}; -use rustdb_index::IndexEngine; use rustdb_storage::OpType; use tracing::debug; @@ -56,6 +55,11 @@ pub async fn handle( let mut inserted_count: i32 = 0; let mut write_errors: Vec = Vec::new(); + // Ensure the IndexEngine is loaded (with persisted specs from indexes.json). + // This must happen BEFORE any writes, so unique constraints are enforced + // even on the first write after a restart. + drop(ctx.get_or_init_index_engine(db, coll).await); + for (idx, mut doc) in docs.into_iter().enumerate() { // Auto-generate _id if not present. if !doc.contains_key("_id") { @@ -63,6 +67,7 @@ pub async fn handle( } // Pre-check unique index constraints BEFORE storage write. + // The engine is guaranteed to exist from the get_or_init call above. if let Some(engine) = ctx.indexes.get(&ns_key) { if let Err(e) = engine.check_unique_constraints(&doc) { let err_msg = e.to_string(); @@ -92,17 +97,15 @@ pub async fn handle( None, ); - // Update index engine. - let mut engine = ctx - .indexes - .entry(ns_key.clone()) - .or_insert_with(IndexEngine::new); - if let Err(e) = engine.on_insert(&doc) { - tracing::error!( - namespace = %ns_key, - error = %e, - "index update failed after successful insert (pre-check passed but insert failed)" - ); + // Update index engine (already initialized above). + if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { + if let Err(e) = engine.on_insert(&doc) { + tracing::error!( + namespace = %ns_key, + error = %e, + "index update failed after successful insert" + ); + } } inserted_count += 1; } diff --git a/rust/crates/rustdb-commands/src/handlers/update_handler.rs b/rust/crates/rustdb-commands/src/handlers/update_handler.rs index 4be82f0..f371636 100644 --- a/rust/crates/rustdb-commands/src/handlers/update_handler.rs +++ b/rust/crates/rustdb-commands/src/handlers/update_handler.rs @@ -1,7 +1,6 @@ use std::collections::HashSet; use bson::{doc, oid::ObjectId, Bson, Document}; -use rustdb_index::IndexEngine; use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection}; use rustdb_storage::OpType; use tracing::debug; @@ -47,6 +46,10 @@ async fn handle_update( ensure_collection_exists(db, coll, ctx).await?; let ns_key = format!("{}.{}", db, coll); + + // Ensure the IndexEngine is loaded with persisted specs from indexes.json. + drop(ctx.get_or_init_index_engine(db, coll).await); + let mut total_n: i32 = 0; let mut total_n_modified: i32 = 0; let mut upserted_list: Vec = Vec::new(); @@ -179,13 +182,11 @@ async fn handle_update( None, ); - // Update index. - let mut engine = ctx - .indexes - .entry(ns_key.clone()) - .or_insert_with(IndexEngine::new); - if let Err(e) = engine.on_insert(&updated) { - tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert"); + // Update index (engine already initialized above). + if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { + if let Err(e) = engine.on_insert(&updated) { + tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert"); + } } total_n += 1; @@ -402,6 +403,9 @@ async fn handle_find_and_modify( let ns_key = format!("{}.{}", db, coll); + // Ensure the IndexEngine is loaded with persisted specs. + drop(ctx.get_or_init_index_engine(db, coll).await); + // Load and filter documents. let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?; @@ -573,12 +577,10 @@ async fn handle_find_and_modify( // Update index. { - let mut engine = ctx - .indexes - .entry(ns_key.clone()) - .or_insert_with(IndexEngine::new); - if let Err(e) = engine.on_insert(&updated_doc) { - tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert"); + if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { + if let Err(e) = engine.on_insert(&updated_doc) { + tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert"); + } } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index cf69aed..d30719c 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartdb', - version: '2.5.2', + version: '2.5.3', description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.' }