19 Commits

Author SHA1 Message Date
jkunz 02ad9a29a7 v2.5.9 2026-04-05 09:54:56 +00:00
jkunz 24c504518d fix(rustdb-storage): run collection compaction during file storage initialization after crashes 2026-04-05 09:54:56 +00:00
jkunz 92f07ef3d7 v2.5.8 2026-04-05 09:48:10 +00:00
jkunz 22e010c554 fix(rustdb-storage): detect stale hint files using data file size metadata and add restart persistence regression tests 2026-04-05 09:48:10 +00:00
jkunz 8ebc1bb9e1 v2.5.7 2026-04-05 03:54:13 +00:00
jkunz 3fc21dcd99 fix(repo): no changes to commit 2026-04-05 03:54:13 +00:00
jkunz ad5e0e8a72 chore: gitignore generated bundled.ts to fix release cycle 2026-04-05 03:54:05 +00:00
jkunz c384df20ce v2.5.6 2026-04-05 03:53:29 +00:00
jkunz 4e944f3d05 fix(repo): no changes to commit 2026-04-05 03:53:29 +00:00
jkunz e0455daa2e chore: rebuild bundled debug server with current version 2026-04-05 03:53:22 +00:00
jkunz f3f1afe9af v2.5.5 2026-04-05 03:52:29 +00:00
jkunz 94dc9cfc3f fix(repo): no changes to commit 2026-04-05 03:52:29 +00:00
jkunz a9c0ced1ca v2.5.4 2026-04-05 03:52:23 +00:00
jkunz c8626a9afd fix(package): bump package version to 2.5.3 2026-04-05 03:52:23 +00:00
jkunz 55a1f66e57 chore: update bundled debug server output 2026-04-05 03:52:21 +00:00
jkunz 5b5f35821f v2.5.3 2026-04-05 03:51:58 +00:00
jkunz e8161e6417 fix(rustdb-commands): restore persisted index initialization before writes to enforce unique constraints after restart 2026-04-05 03:51:58 +00:00
jkunz 1a10c32b12 v2.5.2 2026-04-05 03:26:52 +00:00
jkunz cb8cb87d9f fix(rustdb-indexes): persist created indexes and restore them on server startup 2026-04-05 03:26:52 +00:00
19 changed files with 935 additions and 78 deletions
+3
View File
@@ -13,5 +13,8 @@ rust/target/
package-lock.json
yarn.lock
# generated bundle (rebuilt on every build, embeds version)
ts_debugserver/bundled.ts
# playwright
.playwright-mcp/
+45
View File
@@ -1,5 +1,50 @@
# Changelog
## 2026-04-05 - 2.5.9 - fix(rustdb-storage)
run collection compaction during file storage initialization after crashes
- Triggers compaction for all loaded collections before starting the periodic background compaction task.
- Helps clean up dead weight left from before a crash during startup.
## 2026-04-05 - 2.5.8 - fix(rustdb-storage)
detect stale hint files using data file size metadata and add restart persistence regression tests
- Store the current data.rdb size in hint file headers and validate it on load to rebuild KeyDir when hints are stale or written in the old format.
- Persist updated hint metadata after compaction and shutdown to avoid missing appended tombstones after restart.
- Add validation reporting for stale hint files based on recorded versus actual data file size.
- Add regression tests covering delete persistence across restarts, missing hint recovery, stale socket cleanup, and unique index enforcement persistence.
## 2026-04-05 - 2.5.7 - fix(repo)
no changes to commit
## 2026-04-05 - 2.5.6 - fix(repo)
no changes to commit
## 2026-04-05 - 2.5.5 - fix(repo)
no changes to commit
## 2026-04-05 - 2.5.4 - fix(package)
bump package version to 2.5.3
- Updates the package metadata version by one patch release.
## 2026-04-05 - 2.5.3 - fix(rustdb-commands)
restore persisted index initialization before writes to enforce unique constraints after restart
- load stored index specifications from storage when creating command context index engines
- rebuild index data from existing documents so custom indexes are active before insert, update, and upsert operations
- add @push.rocks/smartdata as a runtime dependency
## 2026-04-05 - 2.5.2 - fix(rustdb-indexes)
persist created indexes and restore them on server startup
- Save index specifications to storage when indexes are created.
- Remove persisted index metadata when indexes are dropped by name, key spec, or wildcard.
- Rebuild in-memory index engines from stored definitions and existing documents during startup.
## 2026-04-05 - 2.5.1 - fix(docs)
update project documentation
+2 -1
View File
@@ -1,6 +1,6 @@
{
"name": "@push.rocks/smartdb",
"version": "2.5.1",
"version": "2.5.9",
"private": false,
"description": "A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.",
"exports": {
@@ -29,6 +29,7 @@
"dependencies": {
"@api.global/typedserver": "^8.0.0",
"@design.estate/dees-element": "^2.0.0",
"@push.rocks/smartdata": "7.1.5",
"@push.rocks/smartrust": "^1.3.2",
"bson": "^7.2.0"
},
+7 -4
View File
@@ -14,6 +14,9 @@ importers:
'@design.estate/dees-element':
specifier: ^2.0.0
version: 2.2.3
'@push.rocks/smartdata':
specifier: 7.1.5
version: 7.1.5(socks@2.8.7)
'@push.rocks/smartrust':
specifier: ^1.3.2
version: 1.3.2
@@ -1026,8 +1029,8 @@ packages:
'@push.rocks/smartcrypto@2.0.4':
resolution: {integrity: sha512-1+/5bsjyataf5uUkUNnnVXGRAt+gHVk1KDzozjTqgqJxHvQk1d9fVDohL6CxUhUucTPtu5VR5xNBiV8YCDuGyw==}
'@push.rocks/smartdata@7.1.3':
resolution: {integrity: sha512-7vQJ9pdRk450yn2m9tmGPdSRlQVmxFPZjHD4sGYsfqCQPg+GLFusu+H16zpf+jKzAq4F2ZBMPaYymJHXvXiVcw==}
'@push.rocks/smartdata@7.1.5':
resolution: {integrity: sha512-7x7VedEg6RocWndqUPuTbY2Bh85Q/x0LOVHL4o/NVXyh3IGNtiVQ8ple4WR0qYqlHRAojX4eDSBPMiYzIasqAg==}
'@push.rocks/smartdelay@3.0.5':
resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==}
@@ -5665,7 +5668,7 @@ snapshots:
'@types/node-forge': 1.3.14
node-forge: 1.4.0
'@push.rocks/smartdata@7.1.3(socks@2.8.7)':
'@push.rocks/smartdata@7.1.5(socks@2.8.7)':
dependencies:
'@push.rocks/lik': 6.4.0
'@push.rocks/smartdelay': 3.0.5
@@ -5899,7 +5902,7 @@ snapshots:
'@push.rocks/smartmongo@5.1.1(socks@2.8.7)':
dependencies:
'@push.rocks/mongodump': 1.1.0(socks@2.8.7)
'@push.rocks/smartdata': 7.1.3(socks@2.8.7)
'@push.rocks/smartdata': 7.1.5(socks@2.8.7)
'@push.rocks/smartfs': 1.5.0
'@push.rocks/smartpath': 6.0.0
'@push.rocks/smartpromise': 4.2.3
+63 -2
View File
@@ -1,8 +1,8 @@
use std::sync::Arc;
use bson::Document;
use bson::{Bson, Document};
use dashmap::DashMap;
use rustdb_index::IndexEngine;
use rustdb_index::{IndexEngine, IndexOptions};
use rustdb_storage::{OpLog, StorageAdapter};
use rustdb_txn::{SessionEngine, TransactionEngine};
@@ -24,6 +24,67 @@ pub struct CommandContext {
pub oplog: Arc<OpLog>,
}
impl CommandContext {
/// Get or lazily initialize an IndexEngine for a namespace.
///
/// If no IndexEngine exists yet for this namespace, loads persisted index
/// specs from `indexes.json` via the storage adapter, creates the engine
/// with those specs, and rebuilds index data from existing documents.
/// This ensures unique indexes are enforced even on the very first write
/// after a restart.
pub async fn get_or_init_index_engine(&self, db: &str, coll: &str) -> dashmap::mapref::one::RefMut<'_, String, IndexEngine> {
let ns_key = format!("{}.{}", db, coll);
// Fast path: engine already exists.
if self.indexes.contains_key(&ns_key) {
return self.indexes.entry(ns_key).or_insert_with(IndexEngine::new);
}
// Slow path: load from persisted specs.
let mut engine = IndexEngine::new();
let mut has_custom = false;
if let Ok(specs) = self.storage.get_indexes(db, coll).await {
for spec in &specs {
let name = spec.get_str("name").unwrap_or("").to_string();
if name == "_id_" || name.is_empty() {
continue;
}
let key = match spec.get("key") {
Some(Bson::Document(k)) => k.clone(),
_ => continue,
};
let unique = matches!(spec.get("unique"), Some(Bson::Boolean(true)));
let sparse = matches!(spec.get("sparse"), Some(Bson::Boolean(true)));
let expire_after_seconds = match spec.get("expireAfterSeconds") {
Some(Bson::Int32(n)) => Some(*n as u64),
Some(Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name: Some(name),
unique,
sparse,
expire_after_seconds,
};
let _ = engine.create_index(key, options);
has_custom = true;
}
}
if has_custom {
// Rebuild index data from existing documents.
if let Ok(docs) = self.storage.find_all(db, coll).await {
if !docs.is_empty() {
engine.rebuild_from_documents(&docs);
}
}
}
self.indexes.entry(ns_key).or_insert(engine)
}
}
/// State of an open cursor from a find or aggregate command.
pub struct CursorState {
/// Documents remaining to be returned.
@@ -101,7 +101,15 @@ async fn handle_create_indexes(
expire_after_seconds,
};
// Create the index.
let options_for_persist = IndexOptions {
name: options.name.clone(),
unique: options.unique,
sparse: options.sparse,
expire_after_seconds: options.expire_after_seconds,
};
let key_for_persist = key.clone();
// Create the index in-memory.
let mut engine = ctx
.indexes
.entry(ns_key.clone())
@@ -110,6 +118,22 @@ async fn handle_create_indexes(
match engine.create_index(key, options) {
Ok(index_name) => {
debug!(index_name = %index_name, "Created index");
// Persist index spec to disk.
let mut spec = doc! { "key": key_for_persist };
if options_for_persist.unique {
spec.insert("unique", true);
}
if options_for_persist.sparse {
spec.insert("sparse", true);
}
if let Some(ttl) = options_for_persist.expire_after_seconds {
spec.insert("expireAfterSeconds", ttl as i64);
}
if let Err(e) = ctx.storage.save_index(db, coll, &index_name, spec).await {
tracing::warn!(index = %index_name, error = %e, "failed to persist index spec");
}
created_count += 1;
}
Err(e) => {
@@ -180,9 +204,21 @@ async fn handle_drop_indexes(
match index_spec {
Some(Bson::String(name)) if name == "*" => {
// Drop all indexes except _id_.
// Collect names to drop from storage first.
let names_to_drop: Vec<String> = if let Some(engine) = ctx.indexes.get(&ns_key) {
engine.list_indexes().iter()
.filter(|info| info.name != "_id_")
.map(|info| info.name.clone())
.collect()
} else {
Vec::new()
};
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
engine.drop_all_indexes();
}
for idx_name in &names_to_drop {
let _ = ctx.storage.drop_index(db, coll, idx_name).await;
}
}
Some(Bson::String(name)) => {
// Drop by name.
@@ -196,6 +232,7 @@ async fn handle_drop_indexes(
name
)));
}
let _ = ctx.storage.drop_index(db, coll, name).await;
}
Some(Bson::Document(key_spec)) => {
// Drop by key spec: find the index with matching key.
@@ -210,6 +247,7 @@ async fn handle_drop_indexes(
engine.drop_index(&name).map_err(|e| {
CommandError::IndexError(e.to_string())
})?;
let _ = ctx.storage.drop_index(db, coll, &name).await;
} else {
return Err(CommandError::IndexError(
"index not found with specified key".into(),
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use bson::{doc, oid::ObjectId, Bson, Document};
use rustdb_index::IndexEngine;
use rustdb_storage::OpType;
use tracing::debug;
@@ -56,6 +55,11 @@ pub async fn handle(
let mut inserted_count: i32 = 0;
let mut write_errors: Vec<Document> = Vec::new();
// Ensure the IndexEngine is loaded (with persisted specs from indexes.json).
// This must happen BEFORE any writes, so unique constraints are enforced
// even on the first write after a restart.
drop(ctx.get_or_init_index_engine(db, coll).await);
for (idx, mut doc) in docs.into_iter().enumerate() {
// Auto-generate _id if not present.
if !doc.contains_key("_id") {
@@ -63,6 +67,7 @@ pub async fn handle(
}
// Pre-check unique index constraints BEFORE storage write.
// The engine is guaranteed to exist from the get_or_init call above.
if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints(&doc) {
let err_msg = e.to_string();
@@ -92,18 +97,16 @@ pub async fn handle(
None,
);
// Update index engine.
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
// Update index engine (already initialized above).
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
if let Err(e) = engine.on_insert(&doc) {
tracing::error!(
namespace = %ns_key,
error = %e,
"index update failed after successful insert (pre-check passed but insert failed)"
"index update failed after successful insert"
);
}
}
inserted_count += 1;
}
Err(e) => {
@@ -1,7 +1,6 @@
use std::collections::HashSet;
use bson::{doc, oid::ObjectId, Bson, Document};
use rustdb_index::IndexEngine;
use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection};
use rustdb_storage::OpType;
use tracing::debug;
@@ -47,6 +46,10 @@ async fn handle_update(
ensure_collection_exists(db, coll, ctx).await?;
let ns_key = format!("{}.{}", db, coll);
// Ensure the IndexEngine is loaded with persisted specs from indexes.json.
drop(ctx.get_or_init_index_engine(db, coll).await);
let mut total_n: i32 = 0;
let mut total_n_modified: i32 = 0;
let mut upserted_list: Vec<Document> = Vec::new();
@@ -179,14 +182,12 @@ async fn handle_update(
None,
);
// Update index.
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
// Update index (engine already initialized above).
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
if let Err(e) = engine.on_insert(&updated) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
}
}
total_n += 1;
upserted_list.push(doc! {
@@ -402,6 +403,9 @@ async fn handle_find_and_modify(
let ns_key = format!("{}.{}", db, coll);
// Ensure the IndexEngine is loaded with persisted specs.
drop(ctx.get_or_init_index_engine(db, coll).await);
// Load and filter documents.
let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?;
@@ -573,14 +577,12 @@ async fn handle_find_and_modify(
// Update index.
{
let mut engine = ctx
.indexes
.entry(ns_key.clone())
.or_insert_with(IndexEngine::new);
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
if let Err(e) = engine.on_insert(&updated_doc) {
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
}
}
}
let value = if return_new {
apply_fields_projection(&updated_doc, &fields)
+40 -10
View File
@@ -178,7 +178,8 @@ impl CollectionState {
tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir);
} else {
// Persist hint file after successful compaction to prevent stale hints
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path()) {
let current_size = self.data_file_size.load(Ordering::Relaxed);
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path(), current_size) {
tracing::warn!("failed to persist hint after compaction for {:?}: {e}", self.coll_dir);
}
}
@@ -257,28 +258,49 @@ impl FileStorageAdapter {
// Try loading from hint file first, fall back to data file scan
let (keydir, dead_bytes, loaded_from_hint) = if hint_path.exists() && data_path.exists() {
match KeyDir::load_from_hint_file(&hint_path) {
Ok(Some(kd)) => {
// Validate hint against actual data file
Ok(Some((kd, stored_size))) => {
let actual_size = std::fs::metadata(&data_path)
.map(|m| m.len())
.unwrap_or(0);
// Check if data.rdb changed since the hint was written.
// If stored_size is 0, this is an old-format hint without size tracking.
let size_matches = stored_size > 0 && stored_size == actual_size;
if !size_matches {
// data.rdb size differs from hint snapshot — records were appended
// (inserts, tombstones) after the hint was written. Full scan required
// to pick up tombstones that would otherwise be invisible.
if stored_size == 0 {
debug!("hint file {:?} has no size tracking, rebuilding from data file", hint_path);
} else {
tracing::warn!(
"hint file {:?} is stale: data size changed ({} -> {}), rebuilding",
hint_path, stored_size, actual_size
);
}
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
(kd, dead, false)
} else {
// Size matches — validate entry integrity with spot-checks
let hint_valid = kd.validate_against_data_file(&data_path, 16)
.unwrap_or(false);
if hint_valid {
debug!("loaded KeyDir from hint file: {:?}", hint_path);
let file_size = std::fs::metadata(&data_path)
.map(|m| m.len())
.unwrap_or(FILE_HEADER_SIZE as u64);
let live_bytes: u64 = {
let mut total = 0u64;
kd.for_each(|_, e| total += e.record_len as u64);
total
};
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
let dead = actual_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
(kd, dead, true)
} else {
tracing::warn!("hint file {:?} is stale, rebuilding from data file", hint_path);
tracing::warn!("hint file {:?} failed validation, rebuilding from data file", hint_path);
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
(kd, dead, false)
}
}
}
_ => {
debug!("hint file invalid, rebuilding KeyDir from data file");
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
@@ -482,6 +504,13 @@ impl StorageAdapter for FileStorageAdapter {
"FileStorageAdapter initialization complete"
);
// Run compaction on all collections that need it (dead weight from before crash)
for entry in self.collections.iter() {
let state = entry.value();
let _guard = state.write_lock.lock().unwrap();
state.try_compact();
}
// Start periodic compaction task (runs every 24 hours)
{
let collections = self.collections.clone();
@@ -510,10 +539,11 @@ impl StorageAdapter for FileStorageAdapter {
handle.abort();
}
// Persist all KeyDir hint files
// Persist all KeyDir hint files with current data file sizes
for entry in self.collections.iter() {
let state = entry.value();
let _ = state.keydir.persist_to_hint_file(&state.hint_path());
let current_size = state.data_file_size.load(Ordering::Relaxed);
let _ = state.keydir.persist_to_hint_file(&state.hint_path(), current_size);
}
debug!("FileStorageAdapter closed");
Ok(())
+14 -7
View File
@@ -198,14 +198,17 @@ impl KeyDir {
/// Persist the KeyDir to a hint file for fast restart.
///
/// `data_file_size` is the current size of data.rdb — stored in the hint header
/// so that on next load we can detect if data.rdb changed (stale hint).
///
/// Hint file format (after the 64-byte file header):
/// For each entry: [key_len:u32 LE][key bytes][offset:u64 LE][record_len:u32 LE][value_len:u32 LE][timestamp:u64 LE]
pub fn persist_to_hint_file(&self, path: &Path) -> StorageResult<()> {
pub fn persist_to_hint_file(&self, path: &Path, data_file_size: u64) -> StorageResult<()> {
let file = std::fs::File::create(path)?;
let mut writer = BufWriter::new(file);
// Write file header
let hdr = FileHeader::new(FileType::Hint);
// Write file header with data_file_size for staleness detection
let hdr = FileHeader::new_hint(data_file_size);
writer.write_all(&hdr.encode())?;
// Write entries
@@ -225,7 +228,9 @@ impl KeyDir {
}
/// Load a KeyDir from a hint file. Returns None if the file doesn't exist.
pub fn load_from_hint_file(path: &Path) -> StorageResult<Option<Self>> {
/// Returns `(keydir, stored_data_file_size)` where `stored_data_file_size` is the
/// data.rdb size recorded when the hint was written (0 = old format, unknown).
pub fn load_from_hint_file(path: &Path) -> StorageResult<Option<(Self, u64)>> {
if !path.exists() {
return Ok(None);
}
@@ -254,6 +259,7 @@ impl KeyDir {
)));
}
let stored_data_file_size = hdr.data_file_size;
let keydir = KeyDir::new();
loop {
@@ -292,7 +298,7 @@ impl KeyDir {
);
}
Ok(Some(keydir))
Ok(Some((keydir, stored_data_file_size)))
}
// -----------------------------------------------------------------------
@@ -517,9 +523,10 @@ mod tests {
},
);
kd.persist_to_hint_file(&hint_path).unwrap();
let loaded = KeyDir::load_from_hint_file(&hint_path).unwrap().unwrap();
kd.persist_to_hint_file(&hint_path, 12345).unwrap();
let (loaded, stored_size) = KeyDir::load_from_hint_file(&hint_path).unwrap().unwrap();
assert_eq!(stored_size, 12345);
assert_eq!(loaded.len(), 2);
let e1 = loaded.get("doc1").unwrap();
assert_eq!(e1.offset, 64);
+21 -1
View File
@@ -79,6 +79,9 @@ pub struct FileHeader {
pub file_type: FileType,
pub flags: u32,
pub created_ms: u64,
/// For hint files: the data.rdb file size at the time the hint was written.
/// Used to detect stale hints after ungraceful shutdown. 0 = unknown (old format).
pub data_file_size: u64,
}
impl FileHeader {
@@ -89,6 +92,18 @@ impl FileHeader {
file_type,
flags: 0,
created_ms: now_ms(),
data_file_size: 0,
}
}
/// Create a new hint header that records the data file size.
pub fn new_hint(data_file_size: u64) -> Self {
Self {
version: FORMAT_VERSION,
file_type: FileType::Hint,
flags: 0,
created_ms: now_ms(),
data_file_size,
}
}
@@ -100,7 +115,8 @@ impl FileHeader {
buf[10] = self.file_type as u8;
buf[11..15].copy_from_slice(&self.flags.to_le_bytes());
buf[15..23].copy_from_slice(&self.created_ms.to_le_bytes());
// bytes 23..64 are reserved (zeros)
buf[23..31].copy_from_slice(&self.data_file_size.to_le_bytes());
// bytes 31..64 are reserved (zeros)
buf
}
@@ -127,11 +143,15 @@ impl FileHeader {
let created_ms = u64::from_le_bytes([
buf[15], buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22],
]);
let data_file_size = u64::from_le_bytes([
buf[23], buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30],
]);
Ok(Self {
version,
file_type,
flags,
created_ms,
data_file_size,
})
}
}
+7 -1
View File
@@ -295,7 +295,13 @@ fn validate_collection(db: &str, coll: &str, coll_dir: &Path) -> CollectionRepor
// Validate hint file if present
if hint_path.exists() {
match KeyDir::load_from_hint_file(&hint_path) {
Ok(Some(hint_kd)) => {
Ok(Some((hint_kd, stored_size))) => {
if stored_size > 0 && stored_size != report.data_file_size {
report.errors.push(format!(
"hint file is stale: recorded data size {} but actual is {}",
stored_size, report.data_file_size
));
}
// Check for orphaned entries: keys in hint but not live in data
hint_kd.for_each(|key, _entry| {
if !live_ids.contains(key) {
+72 -2
View File
@@ -16,7 +16,7 @@ use rustdb_config::{RustDbOptions, StorageType};
use rustdb_wire::{WireCodec, OP_QUERY};
use rustdb_wire::{encode_op_msg_response, encode_op_reply_response};
use rustdb_storage::{StorageAdapter, MemoryStorageAdapter, FileStorageAdapter, OpLog};
// IndexEngine is used indirectly via CommandContext
use rustdb_index::{IndexEngine, IndexOptions};
use rustdb_txn::{TransactionEngine, SessionEngine};
use rustdb_commands::{CommandRouter, CommandContext};
@@ -80,9 +80,79 @@ impl RustDb {
});
}
let indexes: Arc<DashMap<String, IndexEngine>> = Arc::new(DashMap::new());
// Restore persisted indexes from storage.
if let Ok(databases) = storage.list_databases().await {
for db_name in &databases {
if let Ok(collections) = storage.list_collections(db_name).await {
for coll_name in &collections {
if let Ok(specs) = storage.get_indexes(db_name, coll_name).await {
let has_custom = specs.iter().any(|s| {
s.get_str("name").unwrap_or("_id_") != "_id_"
});
if !has_custom {
continue;
}
let ns_key = format!("{}.{}", db_name, coll_name);
let mut engine = IndexEngine::new();
for spec in &specs {
let name = spec.get_str("name").unwrap_or("").to_string();
if name == "_id_" {
continue; // already created by IndexEngine::new()
}
let key = match spec.get("key") {
Some(bson::Bson::Document(k)) => k.clone(),
_ => continue,
};
let unique = matches!(spec.get("unique"), Some(bson::Bson::Boolean(true)));
let sparse = matches!(spec.get("sparse"), Some(bson::Bson::Boolean(true)));
let expire_after_seconds = match spec.get("expireAfterSeconds") {
Some(bson::Bson::Int32(n)) => Some(*n as u64),
Some(bson::Bson::Int64(n)) => Some(*n as u64),
_ => None,
};
let options = IndexOptions {
name: Some(name.clone()),
unique,
sparse,
expire_after_seconds,
};
if let Err(e) = engine.create_index(key, options) {
tracing::warn!(
namespace = %ns_key,
index = %name,
error = %e,
"failed to restore index"
);
}
}
// Rebuild index data from existing documents.
if let Ok(docs) = storage.find_all(db_name, coll_name).await {
if !docs.is_empty() {
engine.rebuild_from_documents(&docs);
}
}
tracing::info!(
namespace = %ns_key,
indexes = engine.list_indexes().len(),
"restored indexes"
);
indexes.insert(ns_key, engine);
}
}
}
}
}
let ctx = Arc::new(CommandContext {
storage,
indexes: Arc::new(DashMap::new()),
indexes,
transactions: Arc::new(TransactionEngine::new()),
sessions: Arc::new(SessionEngine::new(30 * 60 * 1000, 60 * 1000)),
cursors: Arc::new(DashMap::new()),
+191
View File
@@ -0,0 +1,191 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import { MongoClient, Db } from 'mongodb';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Deletes persist across restart (tombstone + hint staleness detection)
// Covers: append_tombstone to data.rdb, hint file data_file_size tracking,
// stale hint detection on restart
// ---------------------------------------------------------------------------
let tmpDir: string;
let localDb: smartdb.LocalSmartDb;
let client: MongoClient;
let db: Db;
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-delete-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Setup
// ============================================================================
tap.test('setup: start local db and insert documents', async () => {
tmpDir = makeTmpDir();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
const coll = db.collection('items');
await coll.insertMany([
{ name: 'keep-1', value: 100 },
{ name: 'keep-2', value: 200 },
{ name: 'delete-me', value: 999 },
{ name: 'keep-3', value: 300 },
]);
const count = await coll.countDocuments();
expect(count).toEqual(4);
});
// ============================================================================
// Delete and verify
// ============================================================================
tap.test('delete-persistence: delete a document', async () => {
const coll = db.collection('items');
const result = await coll.deleteOne({ name: 'delete-me' });
expect(result.deletedCount).toEqual(1);
const remaining = await coll.countDocuments();
expect(remaining).toEqual(3);
const deleted = await coll.findOne({ name: 'delete-me' });
expect(deleted).toBeNull();
});
// ============================================================================
// Graceful restart: delete survives
// ============================================================================
tap.test('delete-persistence: graceful stop and restart', async () => {
await client.close();
await localDb.stop(); // graceful — writes hint file
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
});
tap.test('delete-persistence: deleted doc stays deleted after graceful restart', async () => {
const coll = db.collection('items');
const count = await coll.countDocuments();
expect(count).toEqual(3);
const deleted = await coll.findOne({ name: 'delete-me' });
expect(deleted).toBeNull();
// The remaining docs are intact
const keep1 = await coll.findOne({ name: 'keep-1' });
expect(keep1).toBeTruthy();
expect(keep1!.value).toEqual(100);
});
// ============================================================================
// Simulate ungraceful restart: delete after hint write, then restart
// The hint file data_file_size check should detect the stale hint
// ============================================================================
tap.test('delete-persistence: insert and delete more docs, then restart', async () => {
const coll = db.collection('items');
// Insert a new doc
await coll.insertOne({ name: 'temporary', value: 777 });
expect(await coll.countDocuments()).toEqual(4);
// Delete it
await coll.deleteOne({ name: 'temporary' });
expect(await coll.countDocuments()).toEqual(3);
const gone = await coll.findOne({ name: 'temporary' });
expect(gone).toBeNull();
});
tap.test('delete-persistence: stop and restart again', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
});
tap.test('delete-persistence: all deletes survived second restart', async () => {
const coll = db.collection('items');
const count = await coll.countDocuments();
expect(count).toEqual(3);
// Both deletes are permanent
expect(await coll.findOne({ name: 'delete-me' })).toBeNull();
expect(await coll.findOne({ name: 'temporary' })).toBeNull();
// Survivors intact
const names = (await coll.find({}).toArray()).map(d => d.name).sort();
expect(names).toEqual(['keep-1', 'keep-2', 'keep-3']);
});
// ============================================================================
// Delete all docs and verify empty after restart
// ============================================================================
tap.test('delete-persistence: delete all remaining docs', async () => {
const coll = db.collection('items');
await coll.deleteMany({});
expect(await coll.countDocuments()).toEqual(0);
});
tap.test('delete-persistence: restart with empty collection', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
});
tap.test('delete-persistence: collection is empty after restart', async () => {
const coll = db.collection('items');
const count = await coll.countDocuments();
expect(count).toEqual(0);
});
// ============================================================================
// Cleanup
// ============================================================================
tap.test('delete-persistence: cleanup', async () => {
await client.close();
await localDb.stop();
cleanTmpDir(tmpDir);
});
export default tap.start();
+126
View File
@@ -0,0 +1,126 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import { MongoClient, Db } from 'mongodb';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Missing data.rdb header recovery + startup logging
// Covers: ensure_data_header, BuildStats, info-level startup logging
// ---------------------------------------------------------------------------
let tmpDir: string;
let localDb: smartdb.LocalSmartDb;
let client: MongoClient;
let db: Db;
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-header-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Setup: create data, then corrupt it
// ============================================================================
tap.test('setup: start, insert data, stop', async () => {
tmpDir = makeTmpDir();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('headertest');
const coll = db.collection('docs');
await coll.insertMany([
{ key: 'a', val: 1 },
{ key: 'b', val: 2 },
{ key: 'c', val: 3 },
]);
await client.close();
await localDb.stop();
});
// ============================================================================
// Delete hint file and restart: should rebuild from data.rdb scan
// ============================================================================
tap.test('header-recovery: delete hint file and restart', async () => {
// Find and delete hint files
const dbDir = path.join(tmpDir, 'headertest', 'docs');
const hintPath = path.join(dbDir, 'keydir.hint');
if (fs.existsSync(hintPath)) {
fs.unlinkSync(hintPath);
}
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('headertest');
});
tap.test('header-recovery: data intact after hint deletion', async () => {
const coll = db.collection('docs');
const count = await coll.countDocuments();
expect(count).toEqual(3);
const a = await coll.findOne({ key: 'a' });
expect(a!.val).toEqual(1);
});
// ============================================================================
// Write new data after restart, stop, restart again
// ============================================================================
tap.test('header-recovery: write after hint-less restart', async () => {
const coll = db.collection('docs');
await coll.insertOne({ key: 'd', val: 4 });
expect(await coll.countDocuments()).toEqual(4);
});
tap.test('header-recovery: restart and verify all data', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('headertest');
const coll = db.collection('docs');
const count = await coll.countDocuments();
expect(count).toEqual(4);
const keys = (await coll.find({}).toArray()).map(d => d.key).sort();
expect(keys).toEqual(['a', 'b', 'c', 'd']);
});
// ============================================================================
// Cleanup
// ============================================================================
tap.test('header-recovery: cleanup', async () => {
await client.close();
await localDb.stop();
cleanTmpDir(tmpDir);
});
export default tap.start();
+82
View File
@@ -0,0 +1,82 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import * as fs from 'fs';
import * as net from 'net';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Stale socket cleanup on startup
// Covers: LocalSmartDb.cleanStaleSockets(), isSocketAlive()
// ---------------------------------------------------------------------------
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-socket-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Stale socket cleanup: active sockets are preserved
// ============================================================================
tap.test('stale-sockets: does not remove active sockets', async () => {
const tmpDir = makeTmpDir();
const activeSocketPath = path.join(os.tmpdir(), `smartdb-active-${Date.now()}.sock`);
// Create an active socket (server still listening)
const activeServer = net.createServer();
await new Promise<void>((resolve) => activeServer.listen(activeSocketPath, resolve));
expect(fs.existsSync(activeSocketPath)).toBeTrue();
// Start LocalSmartDb — should NOT remove the active socket
const localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
await localDb.start();
expect(fs.existsSync(activeSocketPath)).toBeTrue();
// Cleanup
await localDb.stop();
await new Promise<void>((resolve) => activeServer.close(() => resolve()));
try { fs.unlinkSync(activeSocketPath); } catch {}
cleanTmpDir(tmpDir);
});
// ============================================================================
// Stale socket cleanup: startup works with no stale sockets
// ============================================================================
tap.test('stale-sockets: startup works cleanly with no stale sockets', async () => {
const tmpDir = makeTmpDir();
const localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
expect(localDb.running).toBeTrue();
expect(info.socketPath).toBeTruthy();
await localDb.stop();
cleanTmpDir(tmpDir);
});
// ============================================================================
// Stale socket cleanup: the socket file for the current instance is cleaned on stop
// ============================================================================
tap.test('stale-sockets: own socket file is removed on stop', async () => {
const tmpDir = makeTmpDir();
const localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
expect(fs.existsSync(info.socketPath)).toBeTrue();
await localDb.stop();
// Socket file should be gone after graceful stop
expect(fs.existsSync(info.socketPath)).toBeFalse();
cleanTmpDir(tmpDir);
});
export default tap.start();
+180
View File
@@ -0,0 +1,180 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import { MongoClient, Db } from 'mongodb';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Unique index enforcement via wire protocol
// Covers: unique index pre-check, createIndexes persistence, index restoration
// ---------------------------------------------------------------------------
let tmpDir: string;
let localDb: smartdb.LocalSmartDb;
let client: MongoClient;
let db: Db;
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-unique-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Setup
// ============================================================================
tap.test('setup: start local db', async () => {
tmpDir = makeTmpDir();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('uniquetest');
});
// ============================================================================
// Unique index enforcement on insert
// ============================================================================
tap.test('unique-index: createIndex with unique: true', async () => {
const coll = db.collection('users');
await coll.insertOne({ email: 'alice@example.com', name: 'Alice' });
const indexName = await coll.createIndex({ email: 1 }, { unique: true });
expect(indexName).toBeTruthy();
});
tap.test('unique-index: reject duplicate on insertOne', async () => {
const coll = db.collection('users');
let threw = false;
try {
await coll.insertOne({ email: 'alice@example.com', name: 'Alice2' });
} catch (err: any) {
threw = true;
expect(err.code).toEqual(11000);
}
expect(threw).toBeTrue();
// Verify only 1 document exists
const count = await coll.countDocuments();
expect(count).toEqual(1);
});
tap.test('unique-index: allow insert with different unique value', async () => {
const coll = db.collection('users');
await coll.insertOne({ email: 'bob@example.com', name: 'Bob' });
const count = await coll.countDocuments();
expect(count).toEqual(2);
});
// ============================================================================
// Unique index enforcement on update
// ============================================================================
tap.test('unique-index: reject duplicate on updateOne that changes unique field', async () => {
const coll = db.collection('users');
let threw = false;
try {
await coll.updateOne(
{ email: 'bob@example.com' },
{ $set: { email: 'alice@example.com' } }
);
} catch (err: any) {
threw = true;
expect(err.code).toEqual(11000);
}
expect(threw).toBeTrue();
// Bob's email should be unchanged
const bob = await coll.findOne({ name: 'Bob' });
expect(bob!.email).toEqual('bob@example.com');
});
tap.test('unique-index: allow update that keeps same unique value', async () => {
const coll = db.collection('users');
await coll.updateOne(
{ email: 'bob@example.com' },
{ $set: { name: 'Robert' } }
);
const bob = await coll.findOne({ email: 'bob@example.com' });
expect(bob!.name).toEqual('Robert');
});
// ============================================================================
// Unique index enforcement on upsert
// ============================================================================
tap.test('unique-index: reject duplicate on upsert insert', async () => {
const coll = db.collection('users');
let threw = false;
try {
await coll.updateOne(
{ email: 'new@example.com' },
{ $set: { email: 'alice@example.com', name: 'Imposter' } },
{ upsert: true }
);
} catch (err: any) {
threw = true;
}
expect(threw).toBeTrue();
});
// ============================================================================
// Unique index survives restart (persistence + restoration)
// ============================================================================
tap.test('unique-index: stop and restart', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('uniquetest');
});
tap.test('unique-index: enforcement persists after restart', async () => {
const coll = db.collection('users');
// Data should still be there
const count = await coll.countDocuments();
expect(count).toEqual(2);
// Unique constraint should still be enforced without calling createIndex again
let threw = false;
try {
await coll.insertOne({ email: 'alice@example.com', name: 'Alice3' });
} catch (err: any) {
threw = true;
expect(err.code).toEqual(11000);
}
expect(threw).toBeTrue();
// Count unchanged
const countAfter = await coll.countDocuments();
expect(countAfter).toEqual(2);
});
// ============================================================================
// Cleanup
// ============================================================================
tap.test('unique-index: cleanup', async () => {
await client.close();
await localDb.stop();
cleanTmpDir(tmpDir);
});
export default tap.start();
+1 -1
View File
@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: '@push.rocks/smartdb',
version: '2.5.1',
version: '2.5.9',
description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.'
}
File diff suppressed because one or more lines are too long