9 Commits

10 changed files with 241 additions and 38 deletions
+25
View File
@@ -1,5 +1,30 @@
# Changelog
## 2026-04-05 - 2.5.4 - fix(package)
bump package version to 2.5.3
- Updates the package metadata version by one patch release.
## 2026-04-05 - 2.5.3 - fix(rustdb-commands)
restore persisted index initialization before writes to enforce unique constraints after restart
- load stored index specifications from storage when creating command context index engines
- rebuild index data from existing documents so custom indexes are active before insert, update, and upsert operations
- add @push.rocks/smartdata as a runtime dependency
## 2026-04-05 - 2.5.2 - fix(rustdb-indexes)
persist created indexes and restore them on server startup
- Save index specifications to storage when indexes are created.
- Remove persisted index metadata when indexes are dropped by name, key spec, or wildcard.
- Rebuild in-memory index engines from stored definitions and existing documents during startup.
## 2026-04-05 - 2.5.1 - fix(docs)
update project documentation
- Modifies a single documentation-related file with a minimal text change.
- No source code, API, or package metadata changes are indicated in the diff summary.
## 2026-04-05 - 2.5.0 - feat(storage)
add offline data validation and strengthen storage/index integrity checks
+2 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartdb",
"version": "2.5.0",
"version": "2.5.4",
"private": false,
"description": "A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.",
"exports": {
@@ -29,6 +29,7 @@
"dependencies": {
"@api.global/typedserver": "^8.0.0",
"@design.estate/dees-element": "^2.0.0",
"@push.rocks/smartdata": "7.1.5",
"@push.rocks/smartrust": "^1.3.2",
"bson": "^7.2.0"
},
+7 -4
View File
@@ -14,6 +14,9 @@ importers:
'@design.estate/dees-element':
specifier: ^2.0.0
version: 2.2.3
'@push.rocks/smartdata':
specifier: 7.1.5
version: 7.1.5(socks@2.8.7)
'@push.rocks/smartrust':
specifier: ^1.3.2
version: 1.3.2
@@ -1026,8 +1029,8 @@ packages:
'@push.rocks/smartcrypto@2.0.4':
resolution: {integrity: sha512-1+/5bsjyataf5uUkUNnnVXGRAt+gHVk1KDzozjTqgqJxHvQk1d9fVDohL6CxUhUucTPtu5VR5xNBiV8YCDuGyw==}
'@push.rocks/smartdata@7.1.3':
resolution: {integrity: sha512-7vQJ9pdRk450yn2m9tmGPdSRlQVmxFPZjHD4sGYsfqCQPg+GLFusu+H16zpf+jKzAq4F2ZBMPaYymJHXvXiVcw==}
'@push.rocks/smartdata@7.1.5':
resolution: {integrity: sha512-7x7VedEg6RocWndqUPuTbY2Bh85Q/x0LOVHL4o/NVXyh3IGNtiVQ8ple4WR0qYqlHRAojX4eDSBPMiYzIasqAg==}
'@push.rocks/smartdelay@3.0.5':
resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==}
@@ -5665,7 +5668,7 @@ snapshots:
'@types/node-forge': 1.3.14
node-forge: 1.4.0
'@push.rocks/smartdata@7.1.3(socks@2.8.7)':
'@push.rocks/smartdata@7.1.5(socks@2.8.7)':
dependencies:
'@push.rocks/lik': 6.4.0
'@push.rocks/smartdelay': 3.0.5
@@ -5899,7 +5902,7 @@ snapshots:
'@push.rocks/smartmongo@5.1.1(socks@2.8.7)':
dependencies:
'@push.rocks/mongodump': 1.1.0(socks@2.8.7)
'@push.rocks/smartdata': 7.1.3(socks@2.8.7)
'@push.rocks/smartdata': 7.1.5(socks@2.8.7)
'@push.rocks/smartfs': 1.5.0
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartpromise': 4.2.3
+63 -2
View File
@@ -1,8 +1,8 @@
use std::sync::Arc;
use bson::Document;
use bson::{Bson, Document};
use dashmap::DashMap;
use rustdb_index::IndexEngine;
use rustdb_index::{IndexEngine, IndexOptions};
use rustdb_storage::{OpLog, StorageAdapter};
use rustdb_txn::{SessionEngine, TransactionEngine};
@@ -24,6 +24,67 @@ pub struct CommandContext {
pub oplog: Arc<OpLog>,
}
impl CommandContext {
/// Get or lazily initialize an IndexEngine for a namespace.
///
/// If no IndexEngine exists yet for this namespace, loads persisted index
/// specs from `indexes.json` via the storage adapter, creates the engine
/// with those specs, and rebuilds index data from existing documents.
/// This ensures unique indexes are enforced even on the very first write
/// after a restart.
pub async fn get_or_init_index_engine(&self, db: &str, coll: &str) -> dashmap::mapref::one::RefMut<'_, String, IndexEngine> {
let ns_key = format!("{}.{}", db, coll);
// Fast path: engine already exists.
if self.indexes.contains_key(&ns_key) {
return self.indexes.entry(ns_key).or_insert_with(IndexEngine::new);
}
// Slow path: load from persisted specs.
let mut engine = IndexEngine::new();
let mut has_custom = false;
if let Ok(specs) = self.storage.get_indexes(db, coll).await {
for spec in &specs {
let name = spec.get_str("name").unwrap_or("").to_string();
if name == "_id_" || name.is_empty() {
continue;
}
let key = match spec.get("key") {
Some(Bson::Document(k)) => k.clone(),
_ => continue,
};
let unique = matches!(spec.get("unique"), Some(Bson::Boolean(true)));
let sparse = matches!(spec.get("sparse"), Some(Bson::Boolean(true)));
let expire_after_seconds = match spec.get("expireAfterSeconds") {
Some(Bson::Int32(n)) => Some(*n as u64),
Some(Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name: Some(name),
unique,
sparse,
expire_after_seconds,
};
let _ = engine.create_index(key, options);
has_custom = true;
}
}
if has_custom {
// Rebuild index data from existing documents.
if let Ok(docs) = self.storage.find_all(db, coll).await {
if !docs.is_empty() {
engine.rebuild_from_documents(&docs);
}
}
}
self.indexes.entry(ns_key).or_insert(engine)
}
}
/// State of an open cursor from a find or aggregate command.
pub struct CursorState {
/// Documents remaining to be returned.
@@ -101,7 +101,15 @@ async fn handle_create_indexes(
expire_after_seconds,
};
// Create the index.
let options_for_persist = IndexOptions {
name: options.name.clone(),
unique: options.unique,
sparse: options.sparse,
expire_after_seconds: options.expire_after_seconds,
};
let key_for_persist = key.clone();
// Create the index in-memory.
let mut engine = ctx
.indexes
.entry(ns_key.clone())
@@ -110,6 +118,22 @@ async fn handle_create_indexes(
match engine.create_index(key, options) {
Ok(index_name) => {
debug!(index_name = %index_name, "Created index");
// Persist index spec to disk.
let mut spec = doc! { "key": key_for_persist };
if options_for_persist.unique {
spec.insert("unique", true);
}
if options_for_persist.sparse {
spec.insert("sparse", true);
}
if let Some(ttl) = options_for_persist.expire_after_seconds {
spec.insert("expireAfterSeconds", ttl as i64);
}
if let Err(e) = ctx.storage.save_index(db, coll, &index_name, spec).await {
tracing::warn!(index = %index_name, error = %e, "failed to persist index spec");
}
created_count += 1;
}
Err(e) => {
@@ -180,9 +204,21 @@ async fn handle_drop_indexes(
match index_spec {
Some(Bson::String(name)) if name == "*" => {
// Drop all indexes except _id_.
// Collect names to drop from storage first.
let names_to_drop: Vec<String> = if let Some(engine) = ctx.indexes.get(&ns_key) {
engine.list_indexes().iter()
.filter(|info| info.name != "_id_")
.map(|info| info.name.clone())
.collect()
} else {
Vec::new()
};
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
engine.drop_all_indexes();
}
for idx_name in &names_to_drop {
let _ = ctx.storage.drop_index(db, coll, idx_name).await;
}
}
Some(Bson::String(name)) => {
// Drop by name.
@@ -196,6 +232,7 @@ async fn handle_drop_indexes(
name
)));
}
let _ = ctx.storage.drop_index(db, coll, name).await;
}
Some(Bson::Document(key_spec)) => {
// Drop by key spec: find the index with matching key.
@@ -210,6 +247,7 @@ async fn handle_drop_indexes(
engine.drop_index(&name).map_err(|e| {
CommandError::IndexError(e.to_string())
})?;
let _ = ctx.storage.drop_index(db, coll, &name).await;
} else {
return Err(CommandError::IndexError(
"index not found with specified key".into(),
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use bson::{doc, oid::ObjectId, Bson, Document};
use rustdb_index::IndexEngine;
use rustdb_storage::OpType;
use tracing::debug;
@@ -56,6 +55,11 @@ pub async fn handle(
let mut inserted_count: i32 = 0;
let mut write_errors: Vec<Document> = Vec::new();
// Ensure the IndexEngine is loaded (with persisted specs from indexes.json).
// This must happen BEFORE any writes, so unique constraints are enforced
// even on the first write after a restart.
drop(ctx.get_or_init_index_engine(db, coll).await);
for (idx, mut doc) in docs.into_iter().enumerate() {
// Auto-generate _id if not present.
if !doc.contains_key("_id") {
@@ -63,6 +67,7 @@ pub async fn handle(
}
// Pre-check unique index constraints BEFORE storage write.
// The engine is guaranteed to exist from the get_or_init call above.
if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints(&doc) {
let err_msg = e.to_string();
@@ -92,17 +97,15 @@ pub async fn handle(
None,
);
// Update index engine.
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
if let Err(e) = engine.on_insert(&doc) {
tracing::error!(
namespace = %ns_key,
error = %e,
"index update failed after successful insert (pre-check passed but insert failed)"
);
// Update index engine (already initialized above).
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
if let Err(e) = engine.on_insert(&doc) {
tracing::error!(
namespace = %ns_key,
error = %e,
"index update failed after successful insert"
);
}
}
inserted_count += 1;
}
@@ -1,7 +1,6 @@
use std::collections::HashSet;
use bson::{doc, oid::ObjectId, Bson, Document};
use rustdb_index::IndexEngine;
use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection};
use rustdb_storage::OpType;
use tracing::debug;
@@ -47,6 +46,10 @@ async fn handle_update(
ensure_collection_exists(db, coll, ctx).await?;
let ns_key = format!("{}.{}", db, coll);
// Ensure the IndexEngine is loaded with persisted specs from indexes.json.
drop(ctx.get_or_init_index_engine(db, coll).await);
let mut total_n: i32 = 0;
let mut total_n_modified: i32 = 0;
let mut upserted_list: Vec<Document> = Vec::new();
@@ -179,13 +182,11 @@ async fn handle_update(
None,
);
// Update index.
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
if let Err(e) = engine.on_insert(&updated) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
// Update index (engine already initialized above).
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
if let Err(e) = engine.on_insert(&updated) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
}
}
total_n += 1;
@@ -402,6 +403,9 @@ async fn handle_find_and_modify(
let ns_key = format!("{}.{}", db, coll);
// Ensure the IndexEngine is loaded with persisted specs.
drop(ctx.get_or_init_index_engine(db, coll).await);
// Load and filter documents.
let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?;
@@ -573,12 +577,10 @@ async fn handle_find_and_modify(
// Update index.
{
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
if let Err(e) = engine.on_insert(&updated_doc) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
if let Err(e) = engine.on_insert(&updated_doc) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
}
}
}
+72 -2
View File
@@ -16,7 +16,7 @@ use rustdb_config::{RustDbOptions, StorageType};
use rustdb_wire::{WireCodec, OP_QUERY};
use rustdb_wire::{encode_op_msg_response, encode_op_reply_response};
use rustdb_storage::{StorageAdapter, MemoryStorageAdapter, FileStorageAdapter, OpLog};
// IndexEngine is used indirectly via CommandContext
use rustdb_index::{IndexEngine, IndexOptions};
use rustdb_txn::{TransactionEngine, SessionEngine};
use rustdb_commands::{CommandRouter, CommandContext};
@@ -80,9 +80,79 @@ impl RustDb {
});
}
let indexes: Arc<DashMap<String, IndexEngine>> = Arc::new(DashMap::new());
// Restore persisted indexes from storage.
if let Ok(databases) = storage.list_databases().await {
for db_name in &databases {
if let Ok(collections) = storage.list_collections(db_name).await {
for coll_name in &collections {
if let Ok(specs) = storage.get_indexes(db_name, coll_name).await {
let has_custom = specs.iter().any(|s| {
s.get_str("name").unwrap_or("_id_") != "_id_"
});
if !has_custom {
continue;
}
let ns_key = format!("{}.{}", db_name, coll_name);
let mut engine = IndexEngine::new();
for spec in &specs {
let name = spec.get_str("name").unwrap_or("").to_string();
if name == "_id_" {
continue; // already created by IndexEngine::new()
}
let key = match spec.get("key") {
Some(bson::Bson::Document(k)) => k.clone(),
_ => continue,
};
let unique = matches!(spec.get("unique"), Some(bson::Bson::Boolean(true)));
let sparse = matches!(spec.get("sparse"), Some(bson::Bson::Boolean(true)));
let expire_after_seconds = match spec.get("expireAfterSeconds") {
Some(bson::Bson::Int32(n)) => Some(*n as u64),
Some(bson::Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name: Some(name.clone()),
unique,
sparse,
expire_after_seconds,
};
if let Err(e) = engine.create_index(key, options) {
tracing::warn!(
namespace = %ns_key,
index = %name,
error = %e,
"failed to restore index"
);
}
}
// Rebuild index data from existing documents.
if let Ok(docs) = storage.find_all(db_name, coll_name).await {
if !docs.is_empty() {
engine.rebuild_from_documents(&docs);
}
}
tracing::info!(
namespace = %ns_key,
indexes = engine.list_indexes().len(),
"restored indexes"
);
indexes.insert(ns_key, engine);
}
}
}
}
}
let ctx = Arc::new(CommandContext {
storage,
indexes: Arc::new(DashMap::new()),
indexes,
transactions: Arc::new(TransactionEngine::new()),
sessions: Arc::new(SessionEngine::new(30 * 60 * 1000, 60 * 1000)),
cursors: Arc::new(DashMap::new()),
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartdb',
version: '2.5.0',
version: '2.5.4',
description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.'
}
File diff suppressed because one or more lines are too long