21 Commits

Author SHA1 Message Date
jkunz 8ebc1bb9e1 v2.5.7 2026-04-05 03:54:13 +00:00
jkunz 3fc21dcd99 fix(repo): no changes to commit 2026-04-05 03:54:13 +00:00
jkunz ad5e0e8a72 chore: gitignore generated bundled.ts to fix release cycle 2026-04-05 03:54:05 +00:00
jkunz c384df20ce v2.5.6 2026-04-05 03:53:29 +00:00
jkunz 4e944f3d05 fix(repo): no changes to commit 2026-04-05 03:53:29 +00:00
jkunz e0455daa2e chore: rebuild bundled debug server with current version 2026-04-05 03:53:22 +00:00
jkunz f3f1afe9af v2.5.5 2026-04-05 03:52:29 +00:00
jkunz 94dc9cfc3f fix(repo): no changes to commit 2026-04-05 03:52:29 +00:00
jkunz a9c0ced1ca v2.5.4 2026-04-05 03:52:23 +00:00
jkunz c8626a9afd fix(package): bump package version to 2.5.3 2026-04-05 03:52:23 +00:00
jkunz 55a1f66e57 chore: update bundled debug server output 2026-04-05 03:52:21 +00:00
jkunz 5b5f35821f v2.5.3 2026-04-05 03:51:58 +00:00
jkunz e8161e6417 fix(rustdb-commands): restore persisted index initialization before writes to enforce unique constraints after restart 2026-04-05 03:51:58 +00:00
jkunz 1a10c32b12 v2.5.2 2026-04-05 03:26:52 +00:00
jkunz cb8cb87d9f fix(rustdb-indexes): persist created indexes and restore them on server startup 2026-04-05 03:26:52 +00:00
jkunz 96117d54b9 v2.5.1 2026-04-05 02:48:00 +00:00
jkunz 53f58e45c3 fix(docs): update project documentation 2026-04-05 02:48:00 +00:00
jkunz 34d708be7e v2.5.0 2026-04-05 02:46:05 +00:00
jkunz 418e8dc052 feat(storage): add offline data validation and strengthen storage/index integrity checks 2026-04-05 02:46:05 +00:00
jkunz b8567ebe08 v2.4.1 2026-04-05 01:31:44 +00:00
jkunz 827bfa6370 fix(package): update package metadata 2026-04-05 01:31:44 +00:00
19 changed files with 976 additions and 81 deletions
+3
View File
@@ -13,5 +13,8 @@ rust/target/
package-lock.json package-lock.json
yarn.lock yarn.lock
# generated bundle (rebuilt on every build, embeds version)
ts_debugserver/bundled.ts
# playwright # playwright
.playwright-mcp/ .playwright-mcp/
+52
View File
@@ -1,5 +1,57 @@
# Changelog # Changelog
## 2026-04-05 - 2.5.7 - fix(repo)
no changes to commit
## 2026-04-05 - 2.5.6 - fix(repo)
no changes to commit
## 2026-04-05 - 2.5.5 - fix(repo)
no changes to commit
## 2026-04-05 - 2.5.4 - fix(package)
bump package version to 2.5.3
- Updates the package metadata version by one patch release.
## 2026-04-05 - 2.5.3 - fix(rustdb-commands)
restore persisted index initialization before writes to enforce unique constraints after restart
- load stored index specifications from storage when creating command context index engines
- rebuild index data from existing documents so custom indexes are active before insert, update, and upsert operations
- add @push.rocks/smartdata as a runtime dependency
## 2026-04-05 - 2.5.2 - fix(rustdb-indexes)
persist created indexes and restore them on server startup
- Save index specifications to storage when indexes are created.
- Remove persisted index metadata when indexes are dropped by name, key spec, or wildcard.
- Rebuild in-memory index engines from stored definitions and existing documents during startup.
## 2026-04-05 - 2.5.1 - fix(docs)
update project documentation
- Modifies a single documentation-related file with a minimal text change.
- No source code, API, or package metadata changes are indicated in the diff summary.
## 2026-04-05 - 2.5.0 - feat(storage)
add offline data validation and strengthen storage/index integrity checks
- adds a `--validate-data <PATH>` CLI mode to run offline integrity checks on storage directories
- introduces storage validation reporting for headers, checksums, duplicate ids, tombstones, and stale or orphaned hint entries
- pre-checks unique index constraints before insert, update, upsert, and findAndModify writes to prevent duplicate-key violations before storage changes
- validates hint files against data files during collection load and rebuilds indexes from data when hints are stale
- ensures new data files always receive a SMARTDB header and persists fresh hint files after successful compaction
- cleans up stale local Unix socket files before starting the TypeScript local server
## 2026-04-05 - 2.4.1 - fix(package)
update package metadata
- Adjusts package manifest content with a minimal one-line change.
## 2026-04-05 - 2.4.0 - feat(rustdb) ## 2026-04-05 - 2.4.0 - feat(rustdb)
add restore and periodic persistence support for in-memory storage add restore and periodic persistence support for in-memory storage
+2 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartdb", "name": "@push.rocks/smartdb",
"version": "2.4.0", "version": "2.5.7",
"private": false, "private": false,
"description": "A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.", "description": "A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.",
"exports": { "exports": {
@@ -29,6 +29,7 @@
"dependencies": { "dependencies": {
"@api.global/typedserver": "^8.0.0", "@api.global/typedserver": "^8.0.0",
"@design.estate/dees-element": "^2.0.0", "@design.estate/dees-element": "^2.0.0",
"@push.rocks/smartdata": "7.1.5",
"@push.rocks/smartrust": "^1.3.2", "@push.rocks/smartrust": "^1.3.2",
"bson": "^7.2.0" "bson": "^7.2.0"
}, },
+7 -4
View File
@@ -14,6 +14,9 @@ importers:
'@design.estate/dees-element': '@design.estate/dees-element':
specifier: ^2.0.0 specifier: ^2.0.0
version: 2.2.3 version: 2.2.3
'@push.rocks/smartdata':
specifier: 7.1.5
version: 7.1.5(socks@2.8.7)
'@push.rocks/smartrust': '@push.rocks/smartrust':
specifier: ^1.3.2 specifier: ^1.3.2
version: 1.3.2 version: 1.3.2
@@ -1026,8 +1029,8 @@ packages:
'@push.rocks/smartcrypto@2.0.4': '@push.rocks/smartcrypto@2.0.4':
resolution: {integrity: sha512-1+/5bsjyataf5uUkUNnnVXGRAt+gHVk1KDzozjTqgqJxHvQk1d9fVDohL6CxUhUucTPtu5VR5xNBiV8YCDuGyw==} resolution: {integrity: sha512-1+/5bsjyataf5uUkUNnnVXGRAt+gHVk1KDzozjTqgqJxHvQk1d9fVDohL6CxUhUucTPtu5VR5xNBiV8YCDuGyw==}
'@push.rocks/smartdata@7.1.3': '@push.rocks/smartdata@7.1.5':
resolution: {integrity: sha512-7vQJ9pdRk450yn2m9tmGPdSRlQVmxFPZjHD4sGYsfqCQPg+GLFusu+H16zpf+jKzAq4F2ZBMPaYymJHXvXiVcw==} resolution: {integrity: sha512-7x7VedEg6RocWndqUPuTbY2Bh85Q/x0LOVHL4o/NVXyh3IGNtiVQ8ple4WR0qYqlHRAojX4eDSBPMiYzIasqAg==}
'@push.rocks/smartdelay@3.0.5': '@push.rocks/smartdelay@3.0.5':
resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==} resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==}
@@ -5665,7 +5668,7 @@ snapshots:
'@types/node-forge': 1.3.14 '@types/node-forge': 1.3.14
node-forge: 1.4.0 node-forge: 1.4.0
'@push.rocks/smartdata@7.1.3(socks@2.8.7)': '@push.rocks/smartdata@7.1.5(socks@2.8.7)':
dependencies: dependencies:
'@push.rocks/lik': 6.4.0 '@push.rocks/lik': 6.4.0
'@push.rocks/smartdelay': 3.0.5 '@push.rocks/smartdelay': 3.0.5
@@ -5899,7 +5902,7 @@ snapshots:
'@push.rocks/smartmongo@5.1.1(socks@2.8.7)': '@push.rocks/smartmongo@5.1.1(socks@2.8.7)':
dependencies: dependencies:
'@push.rocks/mongodump': 1.1.0(socks@2.8.7) '@push.rocks/mongodump': 1.1.0(socks@2.8.7)
'@push.rocks/smartdata': 7.1.3(socks@2.8.7) '@push.rocks/smartdata': 7.1.5(socks@2.8.7)
'@push.rocks/smartfs': 1.5.0 '@push.rocks/smartfs': 1.5.0
'@push.rocks/smartpath': 6.0.0 '@push.rocks/smartpath': 6.0.0
'@push.rocks/smartpromise': 4.2.3 '@push.rocks/smartpromise': 4.2.3
+63 -2
View File
@@ -1,8 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use bson::Document; use bson::{Bson, Document};
use dashmap::DashMap; use dashmap::DashMap;
use rustdb_index::IndexEngine; use rustdb_index::{IndexEngine, IndexOptions};
use rustdb_storage::{OpLog, StorageAdapter}; use rustdb_storage::{OpLog, StorageAdapter};
use rustdb_txn::{SessionEngine, TransactionEngine}; use rustdb_txn::{SessionEngine, TransactionEngine};
@@ -24,6 +24,67 @@ pub struct CommandContext {
pub oplog: Arc<OpLog>, pub oplog: Arc<OpLog>,
} }
impl CommandContext {
/// Get or lazily initialize an IndexEngine for a namespace.
///
/// If no IndexEngine exists yet for this namespace, loads persisted index
/// specs from `indexes.json` via the storage adapter, creates the engine
/// with those specs, and rebuilds index data from existing documents.
/// This ensures unique indexes are enforced even on the very first write
/// after a restart.
pub async fn get_or_init_index_engine(&self, db: &str, coll: &str) -> dashmap::mapref::one::RefMut<'_, String, IndexEngine> {
let ns_key = format!("{}.{}", db, coll);
// Fast path: engine already exists.
if self.indexes.contains_key(&ns_key) {
return self.indexes.entry(ns_key).or_insert_with(IndexEngine::new);
}
// Slow path: load from persisted specs.
let mut engine = IndexEngine::new();
let mut has_custom = false;
if let Ok(specs) = self.storage.get_indexes(db, coll).await {
for spec in &specs {
let name = spec.get_str("name").unwrap_or("").to_string();
if name == "_id_" || name.is_empty() {
continue;
}
let key = match spec.get("key") {
Some(Bson::Document(k)) => k.clone(),
_ => continue,
};
let unique = matches!(spec.get("unique"), Some(Bson::Boolean(true)));
let sparse = matches!(spec.get("sparse"), Some(Bson::Boolean(true)));
let expire_after_seconds = match spec.get("expireAfterSeconds") {
Some(Bson::Int32(n)) => Some(*n as u64),
Some(Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name: Some(name),
unique,
sparse,
expire_after_seconds,
};
let _ = engine.create_index(key, options);
has_custom = true;
}
}
if has_custom {
// Rebuild index data from existing documents.
if let Ok(docs) = self.storage.find_all(db, coll).await {
if !docs.is_empty() {
engine.rebuild_from_documents(&docs);
}
}
}
self.indexes.entry(ns_key).or_insert(engine)
}
}
/// State of an open cursor from a find or aggregate command. /// State of an open cursor from a find or aggregate command.
pub struct CursorState { pub struct CursorState {
/// Documents remaining to be returned. /// Documents remaining to be returned.
@@ -101,7 +101,15 @@ async fn handle_create_indexes(
expire_after_seconds, expire_after_seconds,
}; };
// Create the index. let options_for_persist = IndexOptions {
name: options.name.clone(),
unique: options.unique,
sparse: options.sparse,
expire_after_seconds: options.expire_after_seconds,
};
let key_for_persist = key.clone();
// Create the index in-memory.
let mut engine = ctx let mut engine = ctx
.indexes .indexes
.entry(ns_key.clone()) .entry(ns_key.clone())
@@ -110,6 +118,22 @@ async fn handle_create_indexes(
match engine.create_index(key, options) { match engine.create_index(key, options) {
Ok(index_name) => { Ok(index_name) => {
debug!(index_name = %index_name, "Created index"); debug!(index_name = %index_name, "Created index");
// Persist index spec to disk.
let mut spec = doc! { "key": key_for_persist };
if options_for_persist.unique {
spec.insert("unique", true);
}
if options_for_persist.sparse {
spec.insert("sparse", true);
}
if let Some(ttl) = options_for_persist.expire_after_seconds {
spec.insert("expireAfterSeconds", ttl as i64);
}
if let Err(e) = ctx.storage.save_index(db, coll, &index_name, spec).await {
tracing::warn!(index = %index_name, error = %e, "failed to persist index spec");
}
created_count += 1; created_count += 1;
} }
Err(e) => { Err(e) => {
@@ -180,9 +204,21 @@ async fn handle_drop_indexes(
match index_spec { match index_spec {
Some(Bson::String(name)) if name == "*" => { Some(Bson::String(name)) if name == "*" => {
// Drop all indexes except _id_. // Drop all indexes except _id_.
// Collect names to drop from storage first.
let names_to_drop: Vec<String> = if let Some(engine) = ctx.indexes.get(&ns_key) {
engine.list_indexes().iter()
.filter(|info| info.name != "_id_")
.map(|info| info.name.clone())
.collect()
} else {
Vec::new()
};
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
engine.drop_all_indexes(); engine.drop_all_indexes();
} }
for idx_name in &names_to_drop {
let _ = ctx.storage.drop_index(db, coll, idx_name).await;
}
} }
Some(Bson::String(name)) => { Some(Bson::String(name)) => {
// Drop by name. // Drop by name.
@@ -196,6 +232,7 @@ async fn handle_drop_indexes(
name name
))); )));
} }
let _ = ctx.storage.drop_index(db, coll, name).await;
} }
Some(Bson::Document(key_spec)) => { Some(Bson::Document(key_spec)) => {
// Drop by key spec: find the index with matching key. // Drop by key spec: find the index with matching key.
@@ -210,6 +247,7 @@ async fn handle_drop_indexes(
engine.drop_index(&name).map_err(|e| { engine.drop_index(&name).map_err(|e| {
CommandError::IndexError(e.to_string()) CommandError::IndexError(e.to_string())
})?; })?;
let _ = ctx.storage.drop_index(db, coll, &name).await;
} else { } else {
return Err(CommandError::IndexError( return Err(CommandError::IndexError(
"index not found with specified key".into(), "index not found with specified key".into(),
@@ -1,9 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use bson::{doc, oid::ObjectId, Bson, Document}; use bson::{doc, oid::ObjectId, Bson, Document};
use rustdb_index::IndexEngine;
use rustdb_storage::OpType; use rustdb_storage::OpType;
use tracing::{debug, warn}; use tracing::debug;
use crate::context::CommandContext; use crate::context::CommandContext;
use crate::error::{CommandError, CommandResult}; use crate::error::{CommandError, CommandResult};
@@ -56,12 +55,35 @@ pub async fn handle(
let mut inserted_count: i32 = 0; let mut inserted_count: i32 = 0;
let mut write_errors: Vec<Document> = Vec::new(); let mut write_errors: Vec<Document> = Vec::new();
// Ensure the IndexEngine is loaded (with persisted specs from indexes.json).
// This must happen BEFORE any writes, so unique constraints are enforced
// even on the first write after a restart.
drop(ctx.get_or_init_index_engine(db, coll).await);
for (idx, mut doc) in docs.into_iter().enumerate() { for (idx, mut doc) in docs.into_iter().enumerate() {
// Auto-generate _id if not present. // Auto-generate _id if not present.
if !doc.contains_key("_id") { if !doc.contains_key("_id") {
doc.insert("_id", ObjectId::new()); doc.insert("_id", ObjectId::new());
} }
// Pre-check unique index constraints BEFORE storage write.
// The engine is guaranteed to exist from the get_or_init call above.
if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints(&doc) {
let err_msg = e.to_string();
write_errors.push(doc! {
"index": idx as i32,
"code": 11000_i32,
"codeName": "DuplicateKey",
"errmsg": &err_msg,
});
if ordered {
break;
}
continue;
}
}
// Attempt storage insert. // Attempt storage insert.
match ctx.storage.insert_one(db, coll, doc.clone()).await { match ctx.storage.insert_one(db, coll, doc.clone()).await {
Ok(id_str) => { Ok(id_str) => {
@@ -75,17 +97,15 @@ pub async fn handle(
None, None,
); );
// Update index engine. // Update index engine (already initialized above).
let mut engine = ctx if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
.indexes if let Err(e) = engine.on_insert(&doc) {
.entry(ns_key.clone()) tracing::error!(
.or_insert_with(IndexEngine::new); namespace = %ns_key,
if let Err(e) = engine.on_insert(&doc) { error = %e,
warn!( "index update failed after successful insert"
namespace = %ns_key, );
error = %e, }
"index update failed after successful insert"
);
} }
inserted_count += 1; inserted_count += 1;
} }
@@ -1,7 +1,6 @@
use std::collections::HashSet; use std::collections::HashSet;
use bson::{doc, oid::ObjectId, Bson, Document}; use bson::{doc, oid::ObjectId, Bson, Document};
use rustdb_index::IndexEngine;
use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection}; use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection};
use rustdb_storage::OpType; use rustdb_storage::OpType;
use tracing::debug; use tracing::debug;
@@ -47,6 +46,10 @@ async fn handle_update(
ensure_collection_exists(db, coll, ctx).await?; ensure_collection_exists(db, coll, ctx).await?;
let ns_key = format!("{}.{}", db, coll); let ns_key = format!("{}.{}", db, coll);
// Ensure the IndexEngine is loaded with persisted specs from indexes.json.
drop(ctx.get_or_init_index_engine(db, coll).await);
let mut total_n: i32 = 0; let mut total_n: i32 = 0;
let mut total_n_modified: i32 = 0; let mut total_n_modified: i32 = 0;
let mut upserted_list: Vec<Document> = Vec::new(); let mut upserted_list: Vec<Document> = Vec::new();
@@ -150,6 +153,22 @@ async fn handle_update(
updated.get("_id").unwrap().clone() updated.get("_id").unwrap().clone()
}; };
// Pre-check unique index constraints before upsert insert.
if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints(&updated) {
write_errors.push(doc! {
"index": idx as i32,
"code": 11000_i32,
"codeName": "DuplicateKey",
"errmsg": e.to_string(),
});
if ordered {
break;
}
continue;
}
}
// Insert the new document. // Insert the new document.
match ctx.storage.insert_one(db, coll, updated.clone()).await { match ctx.storage.insert_one(db, coll, updated.clone()).await {
Ok(id_str) => { Ok(id_str) => {
@@ -163,12 +182,12 @@ async fn handle_update(
None, None,
); );
// Update index. // Update index (engine already initialized above).
let mut engine = ctx if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
.indexes if let Err(e) = engine.on_insert(&updated) {
.entry(ns_key.clone()) tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
.or_insert_with(IndexEngine::new); }
let _ = engine.on_insert(&updated); }
total_n += 1; total_n += 1;
upserted_list.push(doc! { upserted_list.push(doc! {
@@ -216,6 +235,22 @@ async fn handle_update(
array_filters.as_deref(), array_filters.as_deref(),
) { ) {
Ok(updated_doc) => { Ok(updated_doc) => {
// Pre-check unique index constraints before storage write.
if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) {
write_errors.push(doc! {
"index": idx as i32,
"code": 11000_i32,
"codeName": "DuplicateKey",
"errmsg": e.to_string(),
});
if ordered {
break;
}
continue;
}
}
let id_str = extract_id_string(matched_doc); let id_str = extract_id_string(matched_doc);
match ctx match ctx
.storage .storage
@@ -235,7 +270,9 @@ async fn handle_update(
// Update index. // Update index.
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
let _ = engine.on_update(matched_doc, &updated_doc); if let Err(e) = engine.on_update(matched_doc, &updated_doc) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after update");
}
} }
total_n += 1; total_n += 1;
// Check if the document actually changed. // Check if the document actually changed.
@@ -366,6 +403,9 @@ async fn handle_find_and_modify(
let ns_key = format!("{}.{}", db, coll); let ns_key = format!("{}.{}", db, coll);
// Ensure the IndexEngine is loaded with persisted specs.
drop(ctx.get_or_init_index_engine(db, coll).await);
// Load and filter documents. // Load and filter documents.
let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?; let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?;
@@ -444,6 +484,13 @@ async fn handle_find_and_modify(
) )
.map_err(|e| CommandError::InternalError(e.to_string()))?; .map_err(|e| CommandError::InternalError(e.to_string()))?;
// Pre-check unique index constraints before storage write.
if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints_for_update(&original_doc, &updated_doc) {
return Err(CommandError::StorageError(e.to_string()));
}
}
let id_str = extract_id_string(&original_doc); let id_str = extract_id_string(&original_doc);
ctx.storage ctx.storage
.update_by_id(db, coll, &id_str, updated_doc.clone()) .update_by_id(db, coll, &id_str, updated_doc.clone())
@@ -461,7 +508,9 @@ async fn handle_find_and_modify(
// Update index. // Update index.
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
let _ = engine.on_update(&original_doc, &updated_doc); if let Err(e) = engine.on_update(&original_doc, &updated_doc) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify update");
}
} }
let return_doc = if return_new { let return_doc = if return_new {
@@ -505,6 +554,13 @@ async fn handle_find_and_modify(
updated_doc.get("_id").unwrap().clone() updated_doc.get("_id").unwrap().clone()
}; };
// Pre-check unique index constraints before upsert insert.
if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints(&updated_doc) {
return Err(CommandError::StorageError(e.to_string()));
}
}
let inserted_id_str = ctx.storage let inserted_id_str = ctx.storage
.insert_one(db, coll, updated_doc.clone()) .insert_one(db, coll, updated_doc.clone())
.await?; .await?;
@@ -521,11 +577,11 @@ async fn handle_find_and_modify(
// Update index. // Update index.
{ {
let mut engine = ctx if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
.indexes if let Err(e) = engine.on_insert(&updated_doc) {
.entry(ns_key.clone()) tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
.or_insert_with(IndexEngine::new); }
let _ = engine.on_insert(&updated_doc); }
} }
let value = if return_new { let value = if return_new {
+49
View File
@@ -153,6 +153,55 @@ impl IndexEngine {
self.indexes.contains_key(name) self.indexes.contains_key(name)
} }
/// Check unique constraints for a document without modifying the index.
/// Returns Ok(()) if no conflict, Err(DuplicateKey) if a unique constraint
/// would be violated. This is a read-only check (immutable &self).
pub fn check_unique_constraints(&self, doc: &Document) -> Result<(), IndexError> {
for idx in self.indexes.values() {
if idx.unique {
let key_bytes = Self::extract_key_bytes(doc, &idx.key, idx.sparse);
if let Some(ref kb) = key_bytes {
if let Some(existing_ids) = idx.hash.get(kb) {
if !existing_ids.is_empty() {
return Err(IndexError::DuplicateKey {
index: idx.name.clone(),
key: format!("{:?}", kb),
});
}
}
}
}
}
Ok(())
}
/// Check unique constraints for an update, excluding the document being updated.
/// Returns Ok(()) if no conflict. This is a read-only check (immutable &self).
pub fn check_unique_constraints_for_update(
&self,
old_doc: &Document,
new_doc: &Document,
) -> Result<(), IndexError> {
let doc_id = Self::extract_id(old_doc);
for idx in self.indexes.values() {
if idx.unique {
let new_key_bytes = Self::extract_key_bytes(new_doc, &idx.key, idx.sparse);
if let Some(ref kb) = new_key_bytes {
if let Some(existing_ids) = idx.hash.get(kb) {
let has_conflict = existing_ids.iter().any(|id| *id != doc_id);
if has_conflict {
return Err(IndexError::DuplicateKey {
index: idx.name.clone(),
key: format!("{:?}", kb),
});
}
}
}
}
}
Ok(())
}
/// Notify the engine that a document has been inserted. /// Notify the engine that a document has been inserted.
/// Checks unique constraints and updates all index structures. /// Checks unique constraints and updates all index structures.
pub fn on_insert(&mut self, doc: &Document) -> Result<(), IndexError> { pub fn on_insert(&mut self, doc: &Document) -> Result<(), IndexError> {
+1 -1
View File
@@ -253,7 +253,7 @@ mod tests {
assert!(b_entry.offset > a_entry.offset); assert!(b_entry.offset > a_entry.offset);
// Verify the compacted file can be used to rebuild KeyDir // Verify the compacted file can be used to rebuild KeyDir
let (rebuilt, dead) = KeyDir::build_from_data_file(&data_path).unwrap(); let (rebuilt, dead, _stats) = KeyDir::build_from_data_file(&data_path).unwrap();
assert_eq!(rebuilt.len(), 2); assert_eq!(rebuilt.len(), 2);
assert_eq!(dead, 0); // no dead records in compacted file assert_eq!(dead, 0); // no dead records in compacted file
} }
+72 -23
View File
@@ -21,7 +21,7 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use bson::{doc, oid::ObjectId, Document}; use bson::{doc, oid::ObjectId, Document};
use dashmap::DashMap; use dashmap::DashMap;
use tracing::debug; use tracing::{debug, info};
use crate::adapter::StorageAdapter; use crate::adapter::StorageAdapter;
use crate::binary_wal::{BinaryWal, WalOpType}; use crate::binary_wal::{BinaryWal, WalOpType};
@@ -83,6 +83,20 @@ impl CollectionState {
.map_err(|e| StorageError::SerializationError(format!("BSON decode: {e}"))) .map_err(|e| StorageError::SerializationError(format!("BSON decode: {e}")))
} }
/// Ensure a data file has the 64-byte SMARTDB header.
/// If the file was just created (empty), writes the header and updates
/// the data_file_size counter. Must be called under write_lock.
fn ensure_data_header(&self, file: &mut std::fs::File) -> StorageResult<()> {
let pos = file.seek(SeekFrom::End(0))?;
if pos == 0 {
let hdr = FileHeader::new(FileType::Data);
file.write_all(&hdr.encode())?;
self.data_file_size
.fetch_add(FILE_HEADER_SIZE as u64, Ordering::Relaxed);
}
Ok(())
}
/// Append a data record and update the KeyDir. Must be called under write_lock. /// Append a data record and update the KeyDir. Must be called under write_lock.
fn append_record( fn append_record(
&self, &self,
@@ -104,6 +118,7 @@ impl CollectionState {
.append(true) .append(true)
.open(&data_path)?; .open(&data_path)?;
self.ensure_data_header(&mut file)?;
let offset = file.seek(SeekFrom::End(0))?; let offset = file.seek(SeekFrom::End(0))?;
file.write_all(&encoded)?; file.write_all(&encoded)?;
file.sync_all()?; file.sync_all()?;
@@ -137,6 +152,7 @@ impl CollectionState {
.append(true) .append(true)
.open(&data_path)?; .open(&data_path)?;
self.ensure_data_header(&mut file)?;
file.write_all(&encoded)?; file.write_all(&encoded)?;
file.sync_all()?; file.sync_all()?;
@@ -160,6 +176,11 @@ impl CollectionState {
&self.data_file_size, &self.data_file_size,
) { ) {
tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir); tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir);
} else {
// Persist hint file after successful compaction to prevent stale hints
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path()) {
tracing::warn!("failed to persist hint after compaction for {:?}: {e}", self.coll_dir);
}
} }
} }
} }
@@ -234,33 +255,42 @@ impl FileStorageAdapter {
let hint_path = coll_dir.join("keydir.hint"); let hint_path = coll_dir.join("keydir.hint");
// Try loading from hint file first, fall back to data file scan // Try loading from hint file first, fall back to data file scan
let (keydir, dead_bytes) = if hint_path.exists() && data_path.exists() { let (keydir, dead_bytes, loaded_from_hint) = if hint_path.exists() && data_path.exists() {
match KeyDir::load_from_hint_file(&hint_path) { match KeyDir::load_from_hint_file(&hint_path) {
Ok(Some(kd)) => { Ok(Some(kd)) => {
debug!("loaded KeyDir from hint file: {:?}", hint_path); // Validate hint against actual data file
// We don't know dead_bytes from the hint file; estimate from file size let hint_valid = kd.validate_against_data_file(&data_path, 16)
let file_size = std::fs::metadata(&data_path) .unwrap_or(false);
.map(|m| m.len()) if hint_valid {
.unwrap_or(FILE_HEADER_SIZE as u64); debug!("loaded KeyDir from hint file: {:?}", hint_path);
let live_bytes: u64 = { let file_size = std::fs::metadata(&data_path)
let mut total = 0u64; .map(|m| m.len())
kd.for_each(|_, e| total += e.record_len as u64); .unwrap_or(FILE_HEADER_SIZE as u64);
total let live_bytes: u64 = {
}; let mut total = 0u64;
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes); kd.for_each(|_, e| total += e.record_len as u64);
(kd, dead) total
};
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
(kd, dead, true)
} else {
tracing::warn!("hint file {:?} is stale, rebuilding from data file", hint_path);
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
(kd, dead, false)
}
} }
_ => { _ => {
debug!("hint file invalid, rebuilding KeyDir from data file"); debug!("hint file invalid, rebuilding KeyDir from data file");
KeyDir::build_from_data_file(&data_path)? let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
(kd, dead, false)
} }
} }
} else if data_path.exists() { } else if data_path.exists() {
KeyDir::build_from_data_file(&data_path)? let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
(kd, dead, false)
} else { } else {
(KeyDir::new(), 0) (KeyDir::new(), 0, false)
}; };
let doc_count = keydir.len(); let doc_count = keydir.len();
let data_file_size = if data_path.exists() { let data_file_size = if data_path.exists() {
std::fs::metadata(&data_path)?.len() std::fs::metadata(&data_path)?.len()
@@ -268,6 +298,15 @@ impl FileStorageAdapter {
FILE_HEADER_SIZE as u64 FILE_HEADER_SIZE as u64
}; };
info!(
collection = %coll_dir.display(),
documents = doc_count,
data_bytes = data_file_size,
dead_bytes = dead_bytes,
source = if loaded_from_hint { "hint" } else { "scan" },
"loaded collection"
);
// Initialize WAL and recover // Initialize WAL and recover
let wal = BinaryWal::new(wal_path); let wal = BinaryWal::new(wal_path);
wal.initialize()?; wal.initialize()?;
@@ -275,10 +314,10 @@ impl FileStorageAdapter {
// Recover uncommitted WAL entries // Recover uncommitted WAL entries
let uncommitted = wal.recover()?; let uncommitted = wal.recover()?;
if !uncommitted.is_empty() { if !uncommitted.is_empty() {
debug!( info!(
"recovering {} uncommitted WAL entries for {:?}", collection = %coll_dir.display(),
uncommitted.len(), entries = uncommitted.len(),
coll_dir "recovering uncommitted WAL entries"
); );
} }
@@ -415,15 +454,18 @@ impl FileStorageAdapter {
impl StorageAdapter for FileStorageAdapter { impl StorageAdapter for FileStorageAdapter {
async fn initialize(&self) -> StorageResult<()> { async fn initialize(&self) -> StorageResult<()> {
std::fs::create_dir_all(&self.base_path)?; std::fs::create_dir_all(&self.base_path)?;
debug!("FileStorageAdapter initialized at {:?}", self.base_path);
// Pre-load all existing collections // Pre-load all existing collections
let mut db_count: usize = 0;
if let Ok(entries) = std::fs::read_dir(&self.base_path) { if let Ok(entries) = std::fs::read_dir(&self.base_path) {
for entry in entries.flatten() { for entry in entries.flatten() {
if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) { if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
if let Some(db_name) = entry.file_name().to_str() { if let Some(db_name) = entry.file_name().to_str() {
let db_name = db_name.to_string(); let db_name = db_name.to_string();
if let Ok(colls) = self.list_collection_dirs(&db_name) { if let Ok(colls) = self.list_collection_dirs(&db_name) {
if !colls.is_empty() {
db_count += 1;
}
for coll_name in colls { for coll_name in colls {
let _ = self.get_or_init_collection(&db_name, &coll_name); let _ = self.get_or_init_collection(&db_name, &coll_name);
} }
@@ -433,6 +475,13 @@ impl StorageAdapter for FileStorageAdapter {
} }
} }
info!(
databases = db_count,
collections = self.collections.len(),
path = %self.base_path.display(),
"FileStorageAdapter initialization complete"
);
// Start periodic compaction task (runs every 24 hours) // Start periodic compaction task (runs every 24 hours)
{ {
let collections = self.collections.clone(); let collections = self.collections.clone();
+115 -6
View File
@@ -6,7 +6,7 @@
//! The KeyDir can be rebuilt from a data file scan, or loaded quickly from a //! The KeyDir can be rebuilt from a data file scan, or loaded quickly from a
//! persisted hint file for fast restart. //! persisted hint file for fast restart.
use std::io::{self, BufReader, BufWriter, Read, Write}; use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
use std::path::Path; use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
@@ -14,7 +14,7 @@ use dashmap::DashMap;
use crate::error::{StorageError, StorageResult}; use crate::error::{StorageError, StorageResult};
use crate::record::{ use crate::record::{
FileHeader, FileType, RecordScanner, FILE_HEADER_SIZE, FORMAT_VERSION, DataRecord, FileHeader, FileType, RecordScanner, FILE_HEADER_SIZE, FORMAT_VERSION,
}; };
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -34,6 +34,23 @@ pub struct KeyDirEntry {
pub timestamp: u64, pub timestamp: u64,
} }
// ---------------------------------------------------------------------------
// BuildStats — statistics from building KeyDir from a data file scan
// ---------------------------------------------------------------------------
/// Statistics collected while building a KeyDir from a data file scan.
#[derive(Debug, Clone, Default)]
pub struct BuildStats {
/// Total records scanned (live + tombstones + superseded).
pub total_records_scanned: u64,
/// Number of live documents in the final KeyDir.
pub live_documents: u64,
/// Number of tombstone records encountered.
pub tombstones: u64,
/// Number of records superseded by a later write for the same key.
pub superseded_records: u64,
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// KeyDir // KeyDir
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -116,9 +133,9 @@ impl KeyDir {
/// Rebuild the KeyDir by scanning an entire data file. /// Rebuild the KeyDir by scanning an entire data file.
/// The file must start with a valid `FileHeader`. /// The file must start with a valid `FileHeader`.
/// Returns `(keydir, dead_bytes)` where `dead_bytes` is the total size of /// Returns `(keydir, dead_bytes, stats)` where `dead_bytes` is the total size of
/// stale records (superseded by later writes or tombstoned). /// stale records (superseded by later writes or tombstoned).
pub fn build_from_data_file(path: &Path) -> StorageResult<(Self, u64)> { pub fn build_from_data_file(path: &Path) -> StorageResult<(Self, u64, BuildStats)> {
let file = std::fs::File::open(path)?; let file = std::fs::File::open(path)?;
let mut reader = BufReader::new(file); let mut reader = BufReader::new(file);
@@ -135,6 +152,7 @@ impl KeyDir {
let keydir = KeyDir::new(); let keydir = KeyDir::new();
let mut dead_bytes: u64 = 0; let mut dead_bytes: u64 = 0;
let mut stats = BuildStats::default();
let scanner = RecordScanner::new(reader, FILE_HEADER_SIZE as u64); let scanner = RecordScanner::new(reader, FILE_HEADER_SIZE as u64);
for result in scanner { for result in scanner {
@@ -146,7 +164,10 @@ impl KeyDir {
let key = String::from_utf8(record.key) let key = String::from_utf8(record.key)
.map_err(|e| StorageError::CorruptRecord(format!("invalid UTF-8 key: {e}")))?; .map_err(|e| StorageError::CorruptRecord(format!("invalid UTF-8 key: {e}")))?;
stats.total_records_scanned += 1;
if is_tombstone { if is_tombstone {
stats.tombstones += 1;
// Remove from index; the tombstone itself is dead weight // Remove from index; the tombstone itself is dead weight
if let Some(prev) = keydir.remove(&key) { if let Some(prev) = keydir.remove(&key) {
dead_bytes += prev.record_len as u64; dead_bytes += prev.record_len as u64;
@@ -162,11 +183,13 @@ impl KeyDir {
if let Some(prev) = keydir.insert(key, entry) { if let Some(prev) = keydir.insert(key, entry) {
// Previous version of same key is now dead // Previous version of same key is now dead
dead_bytes += prev.record_len as u64; dead_bytes += prev.record_len as u64;
stats.superseded_records += 1;
} }
} }
} }
Ok((keydir, dead_bytes)) stats.live_documents = keydir.len();
Ok((keydir, dead_bytes, stats))
} }
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@@ -271,6 +294,86 @@ impl KeyDir {
Ok(Some(keydir)) Ok(Some(keydir))
} }
// -----------------------------------------------------------------------
// Hint file validation
// -----------------------------------------------------------------------
/// Validate this KeyDir (loaded from a hint file) against the actual data file.
/// Returns `Ok(true)` if the hint appears consistent, `Ok(false)` if a rebuild
/// from the data file is recommended.
///
/// Checks:
/// 1. All entry offsets + record_len fit within the data file size.
/// 2. All entry offsets are >= FILE_HEADER_SIZE.
/// 3. A random sample of entries is spot-checked by reading the record at
/// the offset and verifying the key matches.
pub fn validate_against_data_file(&self, data_path: &Path, sample_size: usize) -> StorageResult<bool> {
let file_size = std::fs::metadata(data_path)
.map(|m| m.len())
.unwrap_or(0);
if file_size < FILE_HEADER_SIZE as u64 {
// Data file is too small to even contain a header
return Ok(self.is_empty());
}
// Pass 1: bounds check all entries
let mut all_keys: Vec<(String, KeyDirEntry)> = Vec::with_capacity(self.len() as usize);
let mut bounds_ok = true;
self.for_each(|key, entry| {
if entry.offset < FILE_HEADER_SIZE as u64
|| entry.offset + entry.record_len as u64 > file_size
{
bounds_ok = false;
}
all_keys.push((key.to_string(), *entry));
});
if !bounds_ok {
return Ok(false);
}
// Pass 2: spot-check a sample of entries by reading records from data.rdb
if all_keys.is_empty() {
return Ok(true);
}
// Sort by offset for sequential I/O, take first `sample_size` entries
all_keys.sort_by_key(|(_, e)| e.offset);
let step = if all_keys.len() <= sample_size {
1
} else {
all_keys.len() / sample_size
};
let mut file = std::fs::File::open(data_path)?;
let mut checked = 0usize;
for (i, (expected_key, entry)) in all_keys.iter().enumerate() {
if checked >= sample_size {
break;
}
if i % step != 0 {
continue;
}
// Seek to the entry's offset and try to decode the record
file.seek(SeekFrom::Start(entry.offset))?;
match DataRecord::decode_from(&mut file) {
Ok(Some((record, _disk_size))) => {
let record_key = String::from_utf8_lossy(&record.key);
if record_key != *expected_key {
return Ok(false);
}
}
Ok(None) | Err(_) => {
return Ok(false);
}
}
checked += 1;
}
Ok(true)
}
} }
impl Default for KeyDir { impl Default for KeyDir {
@@ -372,7 +475,7 @@ mod tests {
f.write_all(&r3.encode()).unwrap(); f.write_all(&r3.encode()).unwrap();
} }
let (kd, dead_bytes) = KeyDir::build_from_data_file(&data_path).unwrap(); let (kd, dead_bytes, stats) = KeyDir::build_from_data_file(&data_path).unwrap();
// Only B should be live // Only B should be live
assert_eq!(kd.len(), 1); assert_eq!(kd.len(), 1);
@@ -381,6 +484,12 @@ mod tests {
// Dead bytes: r1 (aaa live, then superseded by tombstone) + r3 (tombstone itself) // Dead bytes: r1 (aaa live, then superseded by tombstone) + r3 (tombstone itself)
assert!(dead_bytes > 0); assert!(dead_bytes > 0);
// Stats
assert_eq!(stats.total_records_scanned, 3);
assert_eq!(stats.live_documents, 1);
assert_eq!(stats.tombstones, 1);
assert_eq!(stats.superseded_records, 0); // aaa was removed by tombstone, not superseded
} }
#[test] #[test]
+2 -1
View File
@@ -16,13 +16,14 @@ pub mod keydir;
pub mod memory; pub mod memory;
pub mod oplog; pub mod oplog;
pub mod record; pub mod record;
pub mod validate;
pub use adapter::StorageAdapter; pub use adapter::StorageAdapter;
pub use binary_wal::{BinaryWal, WalEntry, WalOpType}; pub use binary_wal::{BinaryWal, WalEntry, WalOpType};
pub use compaction::{compact_data_file, should_compact, CompactionResult}; pub use compaction::{compact_data_file, should_compact, CompactionResult};
pub use error::{StorageError, StorageResult}; pub use error::{StorageError, StorageResult};
pub use file::FileStorageAdapter; pub use file::FileStorageAdapter;
pub use keydir::{KeyDir, KeyDirEntry}; pub use keydir::{BuildStats, KeyDir, KeyDirEntry};
pub use memory::MemoryStorageAdapter; pub use memory::MemoryStorageAdapter;
pub use oplog::{OpLog, OpLogEntry, OpLogStats, OpType}; pub use oplog::{OpLog, OpLogEntry, OpLogStats, OpType};
pub use record::{ pub use record::{
+324
View File
@@ -0,0 +1,324 @@
//! Data integrity validation for RustDb storage directories.
//!
//! Provides offline validation of data files without starting the server.
//! Checks header magic, record CRC32 checksums, duplicate IDs, and
//! keydir.hint consistency.
use std::collections::HashMap;
use std::io::{BufReader, Read};
use std::path::Path;
use crate::error::{StorageError, StorageResult};
use crate::keydir::KeyDir;
use crate::record::{FileHeader, FileType, RecordScanner, FILE_HEADER_SIZE};
/// Result of validating an entire data directory.
pub struct ValidationReport {
pub collections: Vec<CollectionReport>,
}
/// Result of validating a single collection.
pub struct CollectionReport {
pub db: String,
pub collection: String,
pub header_valid: bool,
pub total_records: u64,
pub live_documents: u64,
pub tombstones: u64,
pub duplicate_ids: Vec<String>,
pub checksum_errors: u64,
pub decode_errors: u64,
pub data_file_size: u64,
pub hint_file_exists: bool,
pub orphaned_hint_entries: u64,
pub errors: Vec<String>,
}
impl ValidationReport {
/// Whether any errors were found across all collections.
pub fn has_errors(&self) -> bool {
self.collections.iter().any(|c| {
!c.header_valid
|| !c.duplicate_ids.is_empty()
|| c.checksum_errors > 0
|| c.decode_errors > 0
|| c.orphaned_hint_entries > 0
|| !c.errors.is_empty()
})
}
/// Print a human-readable summary to stdout.
pub fn print_summary(&self) {
println!("=== SmartDB Data Integrity Report ===");
println!();
let mut total_errors = 0u64;
for report in &self.collections {
println!("Database: {}", report.db);
println!(" Collection: {}", report.collection);
println!(
" Header: {}",
if report.header_valid { "OK" } else { "INVALID" }
);
println!(
" Records: {} ({} live, {} tombstones)",
report.total_records, report.live_documents, report.tombstones
);
println!(" Data size: {} bytes", report.data_file_size);
if report.duplicate_ids.is_empty() {
println!(" Duplicates: 0");
} else {
let ids_preview: Vec<&str> = report.duplicate_ids.iter().take(5).map(|s| s.as_str()).collect();
let suffix = if report.duplicate_ids.len() > 5 {
format!(", ... and {} more", report.duplicate_ids.len() - 5)
} else {
String::new()
};
println!(
" Duplicates: {} (ids: {}{})",
report.duplicate_ids.len(),
ids_preview.join(", "),
suffix
);
}
if report.checksum_errors > 0 {
println!(" CRC errors: {}", report.checksum_errors);
} else {
println!(" CRC errors: 0");
}
if report.decode_errors > 0 {
println!(" Decode errors: {}", report.decode_errors);
}
if report.hint_file_exists {
if report.orphaned_hint_entries > 0 {
println!(
" Hint file: STALE ({} orphaned entries)",
report.orphaned_hint_entries
);
} else {
println!(" Hint file: OK");
}
} else {
println!(" Hint file: absent");
}
for err in &report.errors {
println!(" ERROR: {}", err);
}
println!();
if !report.header_valid { total_errors += 1; }
total_errors += report.duplicate_ids.len() as u64;
total_errors += report.checksum_errors;
total_errors += report.decode_errors;
total_errors += report.orphaned_hint_entries;
total_errors += report.errors.len() as u64;
}
println!(
"Summary: {} collection(s) checked, {} error(s) found.",
self.collections.len(),
total_errors
);
}
}
/// Validate all collections in a data directory.
///
/// The directory structure is expected to be:
/// ```text
/// {base_path}/{db}/{collection}/data.rdb
/// ```
pub fn validate_data_directory(base_path: &str) -> StorageResult<ValidationReport> {
let base = Path::new(base_path);
if !base.exists() {
return Err(StorageError::IoError(std::io::Error::new(
std::io::ErrorKind::NotFound,
format!("data directory not found: {base_path}"),
)));
}
let mut collections = Vec::new();
// Iterate database directories
let entries = std::fs::read_dir(base)?;
for entry in entries {
let entry = entry?;
if !entry.file_type()?.is_dir() {
continue;
}
let db_name = match entry.file_name().to_str() {
Some(s) => s.to_string(),
None => continue,
};
// Iterate collection directories
let db_entries = std::fs::read_dir(entry.path())?;
for coll_entry in db_entries {
let coll_entry = coll_entry?;
if !coll_entry.file_type()?.is_dir() {
continue;
}
let coll_name = match coll_entry.file_name().to_str() {
Some(s) => s.to_string(),
None => continue,
};
let data_path = coll_entry.path().join("data.rdb");
if !data_path.exists() {
continue;
}
let report = validate_collection(&db_name, &coll_name, &coll_entry.path());
collections.push(report);
}
}
// Sort for deterministic output
collections.sort_by(|a, b| (&a.db, &a.collection).cmp(&(&b.db, &b.collection)));
Ok(ValidationReport { collections })
}
/// Validate a single collection directory.
fn validate_collection(db: &str, coll: &str, coll_dir: &Path) -> CollectionReport {
let data_path = coll_dir.join("data.rdb");
let hint_path = coll_dir.join("keydir.hint");
let mut report = CollectionReport {
db: db.to_string(),
collection: coll.to_string(),
header_valid: false,
total_records: 0,
live_documents: 0,
tombstones: 0,
duplicate_ids: Vec::new(),
checksum_errors: 0,
decode_errors: 0,
data_file_size: 0,
hint_file_exists: hint_path.exists(),
orphaned_hint_entries: 0,
errors: Vec::new(),
};
// Get file size
match std::fs::metadata(&data_path) {
Ok(m) => report.data_file_size = m.len(),
Err(e) => {
report.errors.push(format!("cannot stat data.rdb: {e}"));
return report;
}
}
// Open and validate header
let file = match std::fs::File::open(&data_path) {
Ok(f) => f,
Err(e) => {
report.errors.push(format!("cannot open data.rdb: {e}"));
return report;
}
};
let mut reader = BufReader::new(file);
let mut hdr_buf = [0u8; FILE_HEADER_SIZE];
if let Err(e) = reader.read_exact(&mut hdr_buf) {
report.errors.push(format!("cannot read header: {e}"));
return report;
}
match FileHeader::decode(&hdr_buf) {
Ok(hdr) => {
if hdr.file_type != FileType::Data {
report.errors.push(format!(
"wrong file type: expected Data, got {:?}",
hdr.file_type
));
} else {
report.header_valid = true;
}
}
Err(e) => {
report.errors.push(format!("invalid header: {e}"));
return report;
}
}
// Scan all records
let mut id_counts: HashMap<String, u64> = HashMap::new();
let mut live_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
let scanner = RecordScanner::new(reader, FILE_HEADER_SIZE as u64);
for result in scanner {
match result {
Ok((_offset, record)) => {
report.total_records += 1;
let key = String::from_utf8_lossy(&record.key).to_string();
if record.is_tombstone() {
report.tombstones += 1;
live_ids.remove(&key);
} else {
*id_counts.entry(key.clone()).or_insert(0) += 1;
live_ids.insert(key);
}
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("checksum") || err_str.contains("Checksum") {
report.checksum_errors += 1;
} else {
report.decode_errors += 1;
}
// Cannot continue scanning after a decode error — the stream position is lost
report.errors.push(format!("record decode error: {e}"));
break;
}
}
}
report.live_documents = live_ids.len() as u64;
// Find duplicates (keys that appeared more than once as live inserts)
for (id, count) in &id_counts {
if *count > 1 {
report.duplicate_ids.push(id.clone());
}
}
report.duplicate_ids.sort();
// Validate hint file if present
if hint_path.exists() {
match KeyDir::load_from_hint_file(&hint_path) {
Ok(Some(hint_kd)) => {
// Check for orphaned entries: keys in hint but not live in data
hint_kd.for_each(|key, _entry| {
if !live_ids.contains(key) {
report.orphaned_hint_entries += 1;
}
});
// Also check if hint references offsets beyond file size
hint_kd.for_each(|_key, entry| {
if entry.offset + entry.record_len as u64 > report.data_file_size {
report.orphaned_hint_entries += 1;
}
});
}
Ok(None) => {
// File existed but was empty or unreadable
report.errors.push("hint file exists but is empty".into());
}
Err(e) => {
report.errors.push(format!("hint file decode error: {e}"));
}
}
}
report
}
+72 -2
View File
@@ -16,7 +16,7 @@ use rustdb_config::{RustDbOptions, StorageType};
use rustdb_wire::{WireCodec, OP_QUERY}; use rustdb_wire::{WireCodec, OP_QUERY};
use rustdb_wire::{encode_op_msg_response, encode_op_reply_response}; use rustdb_wire::{encode_op_msg_response, encode_op_reply_response};
use rustdb_storage::{StorageAdapter, MemoryStorageAdapter, FileStorageAdapter, OpLog}; use rustdb_storage::{StorageAdapter, MemoryStorageAdapter, FileStorageAdapter, OpLog};
// IndexEngine is used indirectly via CommandContext use rustdb_index::{IndexEngine, IndexOptions};
use rustdb_txn::{TransactionEngine, SessionEngine}; use rustdb_txn::{TransactionEngine, SessionEngine};
use rustdb_commands::{CommandRouter, CommandContext}; use rustdb_commands::{CommandRouter, CommandContext};
@@ -80,9 +80,79 @@ impl RustDb {
}); });
} }
let indexes: Arc<DashMap<String, IndexEngine>> = Arc::new(DashMap::new());
// Restore persisted indexes from storage.
if let Ok(databases) = storage.list_databases().await {
for db_name in &databases {
if let Ok(collections) = storage.list_collections(db_name).await {
for coll_name in &collections {
if let Ok(specs) = storage.get_indexes(db_name, coll_name).await {
let has_custom = specs.iter().any(|s| {
s.get_str("name").unwrap_or("_id_") != "_id_"
});
if !has_custom {
continue;
}
let ns_key = format!("{}.{}", db_name, coll_name);
let mut engine = IndexEngine::new();
for spec in &specs {
let name = spec.get_str("name").unwrap_or("").to_string();
if name == "_id_" {
continue; // already created by IndexEngine::new()
}
let key = match spec.get("key") {
Some(bson::Bson::Document(k)) => k.clone(),
_ => continue,
};
let unique = matches!(spec.get("unique"), Some(bson::Bson::Boolean(true)));
let sparse = matches!(spec.get("sparse"), Some(bson::Bson::Boolean(true)));
let expire_after_seconds = match spec.get("expireAfterSeconds") {
Some(bson::Bson::Int32(n)) => Some(*n as u64),
Some(bson::Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name: Some(name.clone()),
unique,
sparse,
expire_after_seconds,
};
if let Err(e) = engine.create_index(key, options) {
tracing::warn!(
namespace = %ns_key,
index = %name,
error = %e,
"failed to restore index"
);
}
}
// Rebuild index data from existing documents.
if let Ok(docs) = storage.find_all(db_name, coll_name).await {
if !docs.is_empty() {
engine.rebuild_from_documents(&docs);
}
}
tracing::info!(
namespace = %ns_key,
indexes = engine.list_indexes().len(),
"restored indexes"
);
indexes.insert(ns_key, engine);
}
}
}
}
}
let ctx = Arc::new(CommandContext { let ctx = Arc::new(CommandContext {
storage, storage,
indexes: Arc::new(DashMap::new()), indexes,
transactions: Arc::new(TransactionEngine::new()), transactions: Arc::new(TransactionEngine::new()),
sessions: Arc::new(SessionEngine::new(30 * 60 * 1000, 60 * 1000)), sessions: Arc::new(SessionEngine::new(30 * 60 * 1000, 60 * 1000)),
cursors: Arc::new(DashMap::new()), cursors: Arc::new(DashMap::new()),
+17 -1
View File
@@ -25,6 +25,10 @@ struct Cli {
#[arg(long)] #[arg(long)]
validate: bool, validate: bool,
/// Validate data integrity of a storage directory (offline check)
#[arg(long, value_name = "PATH")]
validate_data: Option<String>,
/// Run in management mode (JSON-over-stdin IPC for TypeScript wrapper) /// Run in management mode (JSON-over-stdin IPC for TypeScript wrapper)
#[arg(long)] #[arg(long)]
management: bool, management: bool,
@@ -55,7 +59,7 @@ async fn main() -> Result<()> {
let options = RustDbOptions::from_file(&cli.config) let options = RustDbOptions::from_file(&cli.config)
.map_err(|e| anyhow::anyhow!("Failed to load config '{}': {}", cli.config, e))?; .map_err(|e| anyhow::anyhow!("Failed to load config '{}': {}", cli.config, e))?;
// Validate-only mode // Validate-only mode (config)
if cli.validate { if cli.validate {
match options.validate() { match options.validate() {
Ok(()) => { Ok(()) => {
@@ -69,6 +73,18 @@ async fn main() -> Result<()> {
} }
} }
// Validate data integrity mode
if let Some(ref data_path) = cli.validate_data {
tracing::info!("Validating data integrity at {}", data_path);
let report = rustdb_storage::validate::validate_data_directory(data_path)
.map_err(|e| anyhow::anyhow!("Validation failed: {}", e))?;
report.print_summary();
if report.has_errors() {
std::process::exit(1);
}
return Ok(());
}
// Create and start server // Create and start server
let mut db = RustDb::new(options).await?; let mut db = RustDb::new(options).await?;
db.start().await?; db.start().await?;
+1 -1
View File
@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdb', name: '@push.rocks/smartdb',
version: '2.4.0', version: '2.5.7',
description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.' description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.'
} }
+54
View File
@@ -1,4 +1,6 @@
import * as crypto from 'crypto'; import * as crypto from 'crypto';
import * as fs from 'fs/promises';
import * as net from 'net';
import * as path from 'path'; import * as path from 'path';
import * as os from 'os'; import * as os from 'os';
import { SmartdbServer } from '../ts_smartdb/index.js'; import { SmartdbServer } from '../ts_smartdb/index.js';
@@ -66,6 +68,55 @@ export class LocalSmartDb {
return path.join(os.tmpdir(), `smartdb-${randomId}.sock`); return path.join(os.tmpdir(), `smartdb-${randomId}.sock`);
} }
/**
* Check if a Unix socket is alive by attempting to connect.
*/
private static isSocketAlive(socketPath: string): Promise<boolean> {
return new Promise((resolve) => {
const client = net.createConnection({ path: socketPath }, () => {
client.destroy();
resolve(true);
});
client.on('error', () => {
resolve(false);
});
client.setTimeout(500, () => {
client.destroy();
resolve(false);
});
});
}
/**
* Remove stale smartdb-*.sock files from /tmp.
* A socket is considered stale if connecting to it fails.
*/
private static async cleanStaleSockets(): Promise<void> {
const tmpDir = os.tmpdir();
let entries: string[];
try {
entries = await fs.readdir(tmpDir);
} catch {
return;
}
const socketFiles = entries.filter(
(f) => f.startsWith('smartdb-') && f.endsWith('.sock')
);
for (const name of socketFiles) {
const fullPath = path.join(tmpDir, name);
try {
const stat = await fs.stat(fullPath);
if (!stat.isSocket()) continue;
const alive = await LocalSmartDb.isSocketAlive(fullPath);
if (!alive) {
await fs.unlink(fullPath);
}
} catch {
// File may have been removed already; ignore
}
}
}
/** /**
* Start the local SmartDB server and return connection info * Start the local SmartDB server and return connection info
*/ */
@@ -74,6 +125,9 @@ export class LocalSmartDb {
throw new Error('LocalSmartDb is already running'); throw new Error('LocalSmartDb is already running');
} }
// Clean up stale sockets from previous crashed instances
await LocalSmartDb.cleanStaleSockets();
// Run storage migration before starting the Rust engine // Run storage migration before starting the Rust engine
const migrator = new StorageMigrator(this.options.folderPath); const migrator = new StorageMigrator(this.options.folderPath);
await migrator.run(); await migrator.run();
File diff suppressed because one or more lines are too long