Compare commits
16 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5b5f35821f | |||
| e8161e6417 | |||
| 1a10c32b12 | |||
| cb8cb87d9f | |||
| 96117d54b9 | |||
| 53f58e45c3 | |||
| 34d708be7e | |||
| 418e8dc052 | |||
| b8567ebe08 | |||
| 827bfa6370 | |||
| ceba64e34a | |||
| 8646d58f06 | |||
| 8ce6ff11c3 | |||
| 5c7aaebaba | |||
| be7d086c0b | |||
| 91a7b69f1d |
@@ -1,5 +1,60 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-05 - 2.5.3 - fix(rustdb-commands)
|
||||
restore persisted index initialization before writes to enforce unique constraints after restart
|
||||
|
||||
- load stored index specifications from storage when creating command context index engines
|
||||
- rebuild index data from existing documents so custom indexes are active before insert, update, and upsert operations
|
||||
- add @push.rocks/smartdata as a runtime dependency
|
||||
|
||||
## 2026-04-05 - 2.5.2 - fix(rustdb-indexes)
|
||||
persist created indexes and restore them on server startup
|
||||
|
||||
- Save index specifications to storage when indexes are created.
|
||||
- Remove persisted index metadata when indexes are dropped by name, key spec, or wildcard.
|
||||
- Rebuild in-memory index engines from stored definitions and existing documents during startup.
|
||||
|
||||
## 2026-04-05 - 2.5.1 - fix(docs)
|
||||
update project documentation
|
||||
|
||||
- Modifies a single documentation-related file with a minimal text change.
|
||||
- No source code, API, or package metadata changes are indicated in the diff summary.
|
||||
|
||||
## 2026-04-05 - 2.5.0 - feat(storage)
|
||||
add offline data validation and strengthen storage/index integrity checks
|
||||
|
||||
- adds a `--validate-data <PATH>` CLI mode to run offline integrity checks on storage directories
|
||||
- introduces storage validation reporting for headers, checksums, duplicate ids, tombstones, and stale or orphaned hint entries
|
||||
- pre-checks unique index constraints before insert, update, upsert, and findAndModify writes to prevent duplicate-key violations before storage changes
|
||||
- validates hint files against data files during collection load and rebuilds indexes from data when hints are stale
|
||||
- ensures new data files always receive a SMARTDB header and persists fresh hint files after successful compaction
|
||||
- cleans up stale local Unix socket files before starting the TypeScript local server
|
||||
|
||||
## 2026-04-05 - 2.4.1 - fix(package)
|
||||
update package metadata
|
||||
|
||||
- Adjusts package manifest content with a minimal one-line change.
|
||||
|
||||
## 2026-04-05 - 2.4.0 - feat(rustdb)
|
||||
add restore and periodic persistence support for in-memory storage
|
||||
|
||||
- Restore previously persisted state during startup when a persist path is configured.
|
||||
- Spawn a background task to periodically persist in-memory data using the configured interval.
|
||||
- Warn when running purely in-memory without durable persistence configured.
|
||||
|
||||
## 2026-04-04 - 2.3.1 - fix(package)
|
||||
update package metadata
|
||||
|
||||
- Adjusts a single package-level metadata entry in the project configuration.
|
||||
|
||||
## 2026-04-04 - 2.3.0 - feat(test)
|
||||
add integration coverage for file storage, compaction, migration, and LocalSmartDb workflows
|
||||
|
||||
- adds end-to-end tests for file-backed storage creation, CRUD operations, bulk updates, persistence, and index file generation
|
||||
- adds compaction stress tests covering repeated updates, tombstones, file shrinking behavior, and restart integrity
|
||||
- adds migration tests for automatic v0 JSON layout detection, v1 conversion, restart persistence, and post-migration writes
|
||||
- adds LocalSmartDb lifecycle and unix socket tests, including restart persistence, custom socket paths, and database isolation
|
||||
|
||||
## 2026-04-04 - 2.2.0 - feat(storage)
|
||||
add Bitcask storage migration, binary WAL, and data compaction support
|
||||
|
||||
|
||||
+2
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@push.rocks/smartdb",
|
||||
"version": "2.2.0",
|
||||
"version": "2.5.3",
|
||||
"private": false,
|
||||
"description": "A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.",
|
||||
"exports": {
|
||||
@@ -29,6 +29,7 @@
|
||||
"dependencies": {
|
||||
"@api.global/typedserver": "^8.0.0",
|
||||
"@design.estate/dees-element": "^2.0.0",
|
||||
"@push.rocks/smartdata": "7.1.5",
|
||||
"@push.rocks/smartrust": "^1.3.2",
|
||||
"bson": "^7.2.0"
|
||||
},
|
||||
|
||||
Generated
+7
-4
@@ -14,6 +14,9 @@ importers:
|
||||
'@design.estate/dees-element':
|
||||
specifier: ^2.0.0
|
||||
version: 2.2.3
|
||||
'@push.rocks/smartdata':
|
||||
specifier: 7.1.5
|
||||
version: 7.1.5(socks@2.8.7)
|
||||
'@push.rocks/smartrust':
|
||||
specifier: ^1.3.2
|
||||
version: 1.3.2
|
||||
@@ -1026,8 +1029,8 @@ packages:
|
||||
'@push.rocks/smartcrypto@2.0.4':
|
||||
resolution: {integrity: sha512-1+/5bsjyataf5uUkUNnnVXGRAt+gHVk1KDzozjTqgqJxHvQk1d9fVDohL6CxUhUucTPtu5VR5xNBiV8YCDuGyw==}
|
||||
|
||||
'@push.rocks/smartdata@7.1.3':
|
||||
resolution: {integrity: sha512-7vQJ9pdRk450yn2m9tmGPdSRlQVmxFPZjHD4sGYsfqCQPg+GLFusu+H16zpf+jKzAq4F2ZBMPaYymJHXvXiVcw==}
|
||||
'@push.rocks/smartdata@7.1.5':
|
||||
resolution: {integrity: sha512-7x7VedEg6RocWndqUPuTbY2Bh85Q/x0LOVHL4o/NVXyh3IGNtiVQ8ple4WR0qYqlHRAojX4eDSBPMiYzIasqAg==}
|
||||
|
||||
'@push.rocks/smartdelay@3.0.5':
|
||||
resolution: {integrity: sha512-mUuI7kj2f7ztjpic96FvRIlf2RsKBa5arw81AHNsndbxO6asRcxuWL8dTVxouEIK8YsBUlj0AsrCkHhMbLQdHw==}
|
||||
@@ -5665,7 +5668,7 @@ snapshots:
|
||||
'@types/node-forge': 1.3.14
|
||||
node-forge: 1.4.0
|
||||
|
||||
'@push.rocks/smartdata@7.1.3(socks@2.8.7)':
|
||||
'@push.rocks/smartdata@7.1.5(socks@2.8.7)':
|
||||
dependencies:
|
||||
'@push.rocks/lik': 6.4.0
|
||||
'@push.rocks/smartdelay': 3.0.5
|
||||
@@ -5899,7 +5902,7 @@ snapshots:
|
||||
'@push.rocks/smartmongo@5.1.1(socks@2.8.7)':
|
||||
dependencies:
|
||||
'@push.rocks/mongodump': 1.1.0(socks@2.8.7)
|
||||
'@push.rocks/smartdata': 7.1.3(socks@2.8.7)
|
||||
'@push.rocks/smartdata': 7.1.5(socks@2.8.7)
|
||||
'@push.rocks/smartfs': 1.5.0
|
||||
'@push.rocks/smartpath': 6.0.0
|
||||
'@push.rocks/smartpromise': 4.2.3
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use bson::Document;
|
||||
use bson::{Bson, Document};
|
||||
use dashmap::DashMap;
|
||||
use rustdb_index::IndexEngine;
|
||||
use rustdb_index::{IndexEngine, IndexOptions};
|
||||
use rustdb_storage::{OpLog, StorageAdapter};
|
||||
use rustdb_txn::{SessionEngine, TransactionEngine};
|
||||
|
||||
@@ -24,6 +24,67 @@ pub struct CommandContext {
|
||||
pub oplog: Arc<OpLog>,
|
||||
}
|
||||
|
||||
impl CommandContext {
|
||||
/// Get or lazily initialize an IndexEngine for a namespace.
|
||||
///
|
||||
/// If no IndexEngine exists yet for this namespace, loads persisted index
|
||||
/// specs from `indexes.json` via the storage adapter, creates the engine
|
||||
/// with those specs, and rebuilds index data from existing documents.
|
||||
/// This ensures unique indexes are enforced even on the very first write
|
||||
/// after a restart.
|
||||
pub async fn get_or_init_index_engine(&self, db: &str, coll: &str) -> dashmap::mapref::one::RefMut<'_, String, IndexEngine> {
|
||||
let ns_key = format!("{}.{}", db, coll);
|
||||
|
||||
// Fast path: engine already exists.
|
||||
if self.indexes.contains_key(&ns_key) {
|
||||
return self.indexes.entry(ns_key).or_insert_with(IndexEngine::new);
|
||||
}
|
||||
|
||||
// Slow path: load from persisted specs.
|
||||
let mut engine = IndexEngine::new();
|
||||
let mut has_custom = false;
|
||||
|
||||
if let Ok(specs) = self.storage.get_indexes(db, coll).await {
|
||||
for spec in &specs {
|
||||
let name = spec.get_str("name").unwrap_or("").to_string();
|
||||
if name == "_id_" || name.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let key = match spec.get("key") {
|
||||
Some(Bson::Document(k)) => k.clone(),
|
||||
_ => continue,
|
||||
};
|
||||
let unique = matches!(spec.get("unique"), Some(Bson::Boolean(true)));
|
||||
let sparse = matches!(spec.get("sparse"), Some(Bson::Boolean(true)));
|
||||
let expire_after_seconds = match spec.get("expireAfterSeconds") {
|
||||
Some(Bson::Int32(n)) => Some(*n as u64),
|
||||
Some(Bson::Int64(n)) => Some(*n as u64),
|
||||
_ => None,
|
||||
};
|
||||
let options = IndexOptions {
|
||||
name: Some(name),
|
||||
unique,
|
||||
sparse,
|
||||
expire_after_seconds,
|
||||
};
|
||||
let _ = engine.create_index(key, options);
|
||||
has_custom = true;
|
||||
}
|
||||
}
|
||||
|
||||
if has_custom {
|
||||
// Rebuild index data from existing documents.
|
||||
if let Ok(docs) = self.storage.find_all(db, coll).await {
|
||||
if !docs.is_empty() {
|
||||
engine.rebuild_from_documents(&docs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.indexes.entry(ns_key).or_insert(engine)
|
||||
}
|
||||
}
|
||||
|
||||
/// State of an open cursor from a find or aggregate command.
|
||||
pub struct CursorState {
|
||||
/// Documents remaining to be returned.
|
||||
|
||||
@@ -101,7 +101,15 @@ async fn handle_create_indexes(
|
||||
expire_after_seconds,
|
||||
};
|
||||
|
||||
// Create the index.
|
||||
let options_for_persist = IndexOptions {
|
||||
name: options.name.clone(),
|
||||
unique: options.unique,
|
||||
sparse: options.sparse,
|
||||
expire_after_seconds: options.expire_after_seconds,
|
||||
};
|
||||
let key_for_persist = key.clone();
|
||||
|
||||
// Create the index in-memory.
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
@@ -110,6 +118,22 @@ async fn handle_create_indexes(
|
||||
match engine.create_index(key, options) {
|
||||
Ok(index_name) => {
|
||||
debug!(index_name = %index_name, "Created index");
|
||||
|
||||
// Persist index spec to disk.
|
||||
let mut spec = doc! { "key": key_for_persist };
|
||||
if options_for_persist.unique {
|
||||
spec.insert("unique", true);
|
||||
}
|
||||
if options_for_persist.sparse {
|
||||
spec.insert("sparse", true);
|
||||
}
|
||||
if let Some(ttl) = options_for_persist.expire_after_seconds {
|
||||
spec.insert("expireAfterSeconds", ttl as i64);
|
||||
}
|
||||
if let Err(e) = ctx.storage.save_index(db, coll, &index_name, spec).await {
|
||||
tracing::warn!(index = %index_name, error = %e, "failed to persist index spec");
|
||||
}
|
||||
|
||||
created_count += 1;
|
||||
}
|
||||
Err(e) => {
|
||||
@@ -180,9 +204,21 @@ async fn handle_drop_indexes(
|
||||
match index_spec {
|
||||
Some(Bson::String(name)) if name == "*" => {
|
||||
// Drop all indexes except _id_.
|
||||
// Collect names to drop from storage first.
|
||||
let names_to_drop: Vec<String> = if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
engine.list_indexes().iter()
|
||||
.filter(|info| info.name != "_id_")
|
||||
.map(|info| info.name.clone())
|
||||
.collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
engine.drop_all_indexes();
|
||||
}
|
||||
for idx_name in &names_to_drop {
|
||||
let _ = ctx.storage.drop_index(db, coll, idx_name).await;
|
||||
}
|
||||
}
|
||||
Some(Bson::String(name)) => {
|
||||
// Drop by name.
|
||||
@@ -196,6 +232,7 @@ async fn handle_drop_indexes(
|
||||
name
|
||||
)));
|
||||
}
|
||||
let _ = ctx.storage.drop_index(db, coll, name).await;
|
||||
}
|
||||
Some(Bson::Document(key_spec)) => {
|
||||
// Drop by key spec: find the index with matching key.
|
||||
@@ -210,6 +247,7 @@ async fn handle_drop_indexes(
|
||||
engine.drop_index(&name).map_err(|e| {
|
||||
CommandError::IndexError(e.to_string())
|
||||
})?;
|
||||
let _ = ctx.storage.drop_index(db, coll, &name).await;
|
||||
} else {
|
||||
return Err(CommandError::IndexError(
|
||||
"index not found with specified key".into(),
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use bson::{doc, oid::ObjectId, Bson, Document};
|
||||
use rustdb_index::IndexEngine;
|
||||
use rustdb_storage::OpType;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::context::CommandContext;
|
||||
use crate::error::{CommandError, CommandResult};
|
||||
@@ -56,12 +55,35 @@ pub async fn handle(
|
||||
let mut inserted_count: i32 = 0;
|
||||
let mut write_errors: Vec<Document> = Vec::new();
|
||||
|
||||
// Ensure the IndexEngine is loaded (with persisted specs from indexes.json).
|
||||
// This must happen BEFORE any writes, so unique constraints are enforced
|
||||
// even on the first write after a restart.
|
||||
drop(ctx.get_or_init_index_engine(db, coll).await);
|
||||
|
||||
for (idx, mut doc) in docs.into_iter().enumerate() {
|
||||
// Auto-generate _id if not present.
|
||||
if !doc.contains_key("_id") {
|
||||
doc.insert("_id", ObjectId::new());
|
||||
}
|
||||
|
||||
// Pre-check unique index constraints BEFORE storage write.
|
||||
// The engine is guaranteed to exist from the get_or_init call above.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints(&doc) {
|
||||
let err_msg = e.to_string();
|
||||
write_errors.push(doc! {
|
||||
"index": idx as i32,
|
||||
"code": 11000_i32,
|
||||
"codeName": "DuplicateKey",
|
||||
"errmsg": &err_msg,
|
||||
});
|
||||
if ordered {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt storage insert.
|
||||
match ctx.storage.insert_one(db, coll, doc.clone()).await {
|
||||
Ok(id_str) => {
|
||||
@@ -75,17 +97,15 @@ pub async fn handle(
|
||||
None,
|
||||
);
|
||||
|
||||
// Update index engine.
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
if let Err(e) = engine.on_insert(&doc) {
|
||||
warn!(
|
||||
namespace = %ns_key,
|
||||
error = %e,
|
||||
"index update failed after successful insert"
|
||||
);
|
||||
// Update index engine (already initialized above).
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
if let Err(e) = engine.on_insert(&doc) {
|
||||
tracing::error!(
|
||||
namespace = %ns_key,
|
||||
error = %e,
|
||||
"index update failed after successful insert"
|
||||
);
|
||||
}
|
||||
}
|
||||
inserted_count += 1;
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::collections::HashSet;
|
||||
|
||||
use bson::{doc, oid::ObjectId, Bson, Document};
|
||||
use rustdb_index::IndexEngine;
|
||||
use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection};
|
||||
use rustdb_storage::OpType;
|
||||
use tracing::debug;
|
||||
@@ -47,6 +46,10 @@ async fn handle_update(
|
||||
ensure_collection_exists(db, coll, ctx).await?;
|
||||
|
||||
let ns_key = format!("{}.{}", db, coll);
|
||||
|
||||
// Ensure the IndexEngine is loaded with persisted specs from indexes.json.
|
||||
drop(ctx.get_or_init_index_engine(db, coll).await);
|
||||
|
||||
let mut total_n: i32 = 0;
|
||||
let mut total_n_modified: i32 = 0;
|
||||
let mut upserted_list: Vec<Document> = Vec::new();
|
||||
@@ -150,6 +153,22 @@ async fn handle_update(
|
||||
updated.get("_id").unwrap().clone()
|
||||
};
|
||||
|
||||
// Pre-check unique index constraints before upsert insert.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints(&updated) {
|
||||
write_errors.push(doc! {
|
||||
"index": idx as i32,
|
||||
"code": 11000_i32,
|
||||
"codeName": "DuplicateKey",
|
||||
"errmsg": e.to_string(),
|
||||
});
|
||||
if ordered {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the new document.
|
||||
match ctx.storage.insert_one(db, coll, updated.clone()).await {
|
||||
Ok(id_str) => {
|
||||
@@ -163,12 +182,12 @@ async fn handle_update(
|
||||
None,
|
||||
);
|
||||
|
||||
// Update index.
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
let _ = engine.on_insert(&updated);
|
||||
// Update index (engine already initialized above).
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
if let Err(e) = engine.on_insert(&updated) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
|
||||
}
|
||||
}
|
||||
|
||||
total_n += 1;
|
||||
upserted_list.push(doc! {
|
||||
@@ -216,6 +235,22 @@ async fn handle_update(
|
||||
array_filters.as_deref(),
|
||||
) {
|
||||
Ok(updated_doc) => {
|
||||
// Pre-check unique index constraints before storage write.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) {
|
||||
write_errors.push(doc! {
|
||||
"index": idx as i32,
|
||||
"code": 11000_i32,
|
||||
"codeName": "DuplicateKey",
|
||||
"errmsg": e.to_string(),
|
||||
});
|
||||
if ordered {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let id_str = extract_id_string(matched_doc);
|
||||
match ctx
|
||||
.storage
|
||||
@@ -235,7 +270,9 @@ async fn handle_update(
|
||||
|
||||
// Update index.
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
let _ = engine.on_update(matched_doc, &updated_doc);
|
||||
if let Err(e) = engine.on_update(matched_doc, &updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after update");
|
||||
}
|
||||
}
|
||||
total_n += 1;
|
||||
// Check if the document actually changed.
|
||||
@@ -366,6 +403,9 @@ async fn handle_find_and_modify(
|
||||
|
||||
let ns_key = format!("{}.{}", db, coll);
|
||||
|
||||
// Ensure the IndexEngine is loaded with persisted specs.
|
||||
drop(ctx.get_or_init_index_engine(db, coll).await);
|
||||
|
||||
// Load and filter documents.
|
||||
let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?;
|
||||
|
||||
@@ -444,6 +484,13 @@ async fn handle_find_and_modify(
|
||||
)
|
||||
.map_err(|e| CommandError::InternalError(e.to_string()))?;
|
||||
|
||||
// Pre-check unique index constraints before storage write.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints_for_update(&original_doc, &updated_doc) {
|
||||
return Err(CommandError::StorageError(e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let id_str = extract_id_string(&original_doc);
|
||||
ctx.storage
|
||||
.update_by_id(db, coll, &id_str, updated_doc.clone())
|
||||
@@ -461,7 +508,9 @@ async fn handle_find_and_modify(
|
||||
|
||||
// Update index.
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
let _ = engine.on_update(&original_doc, &updated_doc);
|
||||
if let Err(e) = engine.on_update(&original_doc, &updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify update");
|
||||
}
|
||||
}
|
||||
|
||||
let return_doc = if return_new {
|
||||
@@ -505,6 +554,13 @@ async fn handle_find_and_modify(
|
||||
updated_doc.get("_id").unwrap().clone()
|
||||
};
|
||||
|
||||
// Pre-check unique index constraints before upsert insert.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints(&updated_doc) {
|
||||
return Err(CommandError::StorageError(e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let inserted_id_str = ctx.storage
|
||||
.insert_one(db, coll, updated_doc.clone())
|
||||
.await?;
|
||||
@@ -521,11 +577,11 @@ async fn handle_find_and_modify(
|
||||
|
||||
// Update index.
|
||||
{
|
||||
let mut engine = ctx
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
let _ = engine.on_insert(&updated_doc);
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
if let Err(e) = engine.on_insert(&updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let value = if return_new {
|
||||
|
||||
@@ -153,6 +153,55 @@ impl IndexEngine {
|
||||
self.indexes.contains_key(name)
|
||||
}
|
||||
|
||||
/// Check unique constraints for a document without modifying the index.
|
||||
/// Returns Ok(()) if no conflict, Err(DuplicateKey) if a unique constraint
|
||||
/// would be violated. This is a read-only check (immutable &self).
|
||||
pub fn check_unique_constraints(&self, doc: &Document) -> Result<(), IndexError> {
|
||||
for idx in self.indexes.values() {
|
||||
if idx.unique {
|
||||
let key_bytes = Self::extract_key_bytes(doc, &idx.key, idx.sparse);
|
||||
if let Some(ref kb) = key_bytes {
|
||||
if let Some(existing_ids) = idx.hash.get(kb) {
|
||||
if !existing_ids.is_empty() {
|
||||
return Err(IndexError::DuplicateKey {
|
||||
index: idx.name.clone(),
|
||||
key: format!("{:?}", kb),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check unique constraints for an update, excluding the document being updated.
|
||||
/// Returns Ok(()) if no conflict. This is a read-only check (immutable &self).
|
||||
pub fn check_unique_constraints_for_update(
|
||||
&self,
|
||||
old_doc: &Document,
|
||||
new_doc: &Document,
|
||||
) -> Result<(), IndexError> {
|
||||
let doc_id = Self::extract_id(old_doc);
|
||||
for idx in self.indexes.values() {
|
||||
if idx.unique {
|
||||
let new_key_bytes = Self::extract_key_bytes(new_doc, &idx.key, idx.sparse);
|
||||
if let Some(ref kb) = new_key_bytes {
|
||||
if let Some(existing_ids) = idx.hash.get(kb) {
|
||||
let has_conflict = existing_ids.iter().any(|id| *id != doc_id);
|
||||
if has_conflict {
|
||||
return Err(IndexError::DuplicateKey {
|
||||
index: idx.name.clone(),
|
||||
key: format!("{:?}", kb),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notify the engine that a document has been inserted.
|
||||
/// Checks unique constraints and updates all index structures.
|
||||
pub fn on_insert(&mut self, doc: &Document) -> Result<(), IndexError> {
|
||||
|
||||
@@ -253,7 +253,7 @@ mod tests {
|
||||
assert!(b_entry.offset > a_entry.offset);
|
||||
|
||||
// Verify the compacted file can be used to rebuild KeyDir
|
||||
let (rebuilt, dead) = KeyDir::build_from_data_file(&data_path).unwrap();
|
||||
let (rebuilt, dead, _stats) = KeyDir::build_from_data_file(&data_path).unwrap();
|
||||
assert_eq!(rebuilt.len(), 2);
|
||||
assert_eq!(dead, 0); // no dead records in compacted file
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use bson::{doc, oid::ObjectId, Document};
|
||||
use dashmap::DashMap;
|
||||
use tracing::debug;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::adapter::StorageAdapter;
|
||||
use crate::binary_wal::{BinaryWal, WalOpType};
|
||||
@@ -83,6 +83,20 @@ impl CollectionState {
|
||||
.map_err(|e| StorageError::SerializationError(format!("BSON decode: {e}")))
|
||||
}
|
||||
|
||||
/// Ensure a data file has the 64-byte SMARTDB header.
|
||||
/// If the file was just created (empty), writes the header and updates
|
||||
/// the data_file_size counter. Must be called under write_lock.
|
||||
fn ensure_data_header(&self, file: &mut std::fs::File) -> StorageResult<()> {
|
||||
let pos = file.seek(SeekFrom::End(0))?;
|
||||
if pos == 0 {
|
||||
let hdr = FileHeader::new(FileType::Data);
|
||||
file.write_all(&hdr.encode())?;
|
||||
self.data_file_size
|
||||
.fetch_add(FILE_HEADER_SIZE as u64, Ordering::Relaxed);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Append a data record and update the KeyDir. Must be called under write_lock.
|
||||
fn append_record(
|
||||
&self,
|
||||
@@ -104,6 +118,7 @@ impl CollectionState {
|
||||
.append(true)
|
||||
.open(&data_path)?;
|
||||
|
||||
self.ensure_data_header(&mut file)?;
|
||||
let offset = file.seek(SeekFrom::End(0))?;
|
||||
file.write_all(&encoded)?;
|
||||
file.sync_all()?;
|
||||
@@ -137,6 +152,7 @@ impl CollectionState {
|
||||
.append(true)
|
||||
.open(&data_path)?;
|
||||
|
||||
self.ensure_data_header(&mut file)?;
|
||||
file.write_all(&encoded)?;
|
||||
file.sync_all()?;
|
||||
|
||||
@@ -160,6 +176,11 @@ impl CollectionState {
|
||||
&self.data_file_size,
|
||||
) {
|
||||
tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir);
|
||||
} else {
|
||||
// Persist hint file after successful compaction to prevent stale hints
|
||||
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path()) {
|
||||
tracing::warn!("failed to persist hint after compaction for {:?}: {e}", self.coll_dir);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -234,33 +255,42 @@ impl FileStorageAdapter {
|
||||
let hint_path = coll_dir.join("keydir.hint");
|
||||
|
||||
// Try loading from hint file first, fall back to data file scan
|
||||
let (keydir, dead_bytes) = if hint_path.exists() && data_path.exists() {
|
||||
let (keydir, dead_bytes, loaded_from_hint) = if hint_path.exists() && data_path.exists() {
|
||||
match KeyDir::load_from_hint_file(&hint_path) {
|
||||
Ok(Some(kd)) => {
|
||||
debug!("loaded KeyDir from hint file: {:?}", hint_path);
|
||||
// We don't know dead_bytes from the hint file; estimate from file size
|
||||
let file_size = std::fs::metadata(&data_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(FILE_HEADER_SIZE as u64);
|
||||
let live_bytes: u64 = {
|
||||
let mut total = 0u64;
|
||||
kd.for_each(|_, e| total += e.record_len as u64);
|
||||
total
|
||||
};
|
||||
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
|
||||
(kd, dead)
|
||||
// Validate hint against actual data file
|
||||
let hint_valid = kd.validate_against_data_file(&data_path, 16)
|
||||
.unwrap_or(false);
|
||||
if hint_valid {
|
||||
debug!("loaded KeyDir from hint file: {:?}", hint_path);
|
||||
let file_size = std::fs::metadata(&data_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(FILE_HEADER_SIZE as u64);
|
||||
let live_bytes: u64 = {
|
||||
let mut total = 0u64;
|
||||
kd.for_each(|_, e| total += e.record_len as u64);
|
||||
total
|
||||
};
|
||||
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
|
||||
(kd, dead, true)
|
||||
} else {
|
||||
tracing::warn!("hint file {:?} is stale, rebuilding from data file", hint_path);
|
||||
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
|
||||
(kd, dead, false)
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
debug!("hint file invalid, rebuilding KeyDir from data file");
|
||||
KeyDir::build_from_data_file(&data_path)?
|
||||
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
|
||||
(kd, dead, false)
|
||||
}
|
||||
}
|
||||
} else if data_path.exists() {
|
||||
KeyDir::build_from_data_file(&data_path)?
|
||||
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
|
||||
(kd, dead, false)
|
||||
} else {
|
||||
(KeyDir::new(), 0)
|
||||
(KeyDir::new(), 0, false)
|
||||
};
|
||||
|
||||
let doc_count = keydir.len();
|
||||
let data_file_size = if data_path.exists() {
|
||||
std::fs::metadata(&data_path)?.len()
|
||||
@@ -268,6 +298,15 @@ impl FileStorageAdapter {
|
||||
FILE_HEADER_SIZE as u64
|
||||
};
|
||||
|
||||
info!(
|
||||
collection = %coll_dir.display(),
|
||||
documents = doc_count,
|
||||
data_bytes = data_file_size,
|
||||
dead_bytes = dead_bytes,
|
||||
source = if loaded_from_hint { "hint" } else { "scan" },
|
||||
"loaded collection"
|
||||
);
|
||||
|
||||
// Initialize WAL and recover
|
||||
let wal = BinaryWal::new(wal_path);
|
||||
wal.initialize()?;
|
||||
@@ -275,10 +314,10 @@ impl FileStorageAdapter {
|
||||
// Recover uncommitted WAL entries
|
||||
let uncommitted = wal.recover()?;
|
||||
if !uncommitted.is_empty() {
|
||||
debug!(
|
||||
"recovering {} uncommitted WAL entries for {:?}",
|
||||
uncommitted.len(),
|
||||
coll_dir
|
||||
info!(
|
||||
collection = %coll_dir.display(),
|
||||
entries = uncommitted.len(),
|
||||
"recovering uncommitted WAL entries"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -415,15 +454,18 @@ impl FileStorageAdapter {
|
||||
impl StorageAdapter for FileStorageAdapter {
|
||||
async fn initialize(&self) -> StorageResult<()> {
|
||||
std::fs::create_dir_all(&self.base_path)?;
|
||||
debug!("FileStorageAdapter initialized at {:?}", self.base_path);
|
||||
|
||||
// Pre-load all existing collections
|
||||
let mut db_count: usize = 0;
|
||||
if let Ok(entries) = std::fs::read_dir(&self.base_path) {
|
||||
for entry in entries.flatten() {
|
||||
if entry.file_type().map(|ft| ft.is_dir()).unwrap_or(false) {
|
||||
if let Some(db_name) = entry.file_name().to_str() {
|
||||
let db_name = db_name.to_string();
|
||||
if let Ok(colls) = self.list_collection_dirs(&db_name) {
|
||||
if !colls.is_empty() {
|
||||
db_count += 1;
|
||||
}
|
||||
for coll_name in colls {
|
||||
let _ = self.get_or_init_collection(&db_name, &coll_name);
|
||||
}
|
||||
@@ -433,6 +475,13 @@ impl StorageAdapter for FileStorageAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
databases = db_count,
|
||||
collections = self.collections.len(),
|
||||
path = %self.base_path.display(),
|
||||
"FileStorageAdapter initialization complete"
|
||||
);
|
||||
|
||||
// Start periodic compaction task (runs every 24 hours)
|
||||
{
|
||||
let collections = self.collections.clone();
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! The KeyDir can be rebuilt from a data file scan, or loaded quickly from a
|
||||
//! persisted hint file for fast restart.
|
||||
|
||||
use std::io::{self, BufReader, BufWriter, Read, Write};
|
||||
use std::io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
@@ -14,7 +14,7 @@ use dashmap::DashMap;
|
||||
|
||||
use crate::error::{StorageError, StorageResult};
|
||||
use crate::record::{
|
||||
FileHeader, FileType, RecordScanner, FILE_HEADER_SIZE, FORMAT_VERSION,
|
||||
DataRecord, FileHeader, FileType, RecordScanner, FILE_HEADER_SIZE, FORMAT_VERSION,
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -34,6 +34,23 @@ pub struct KeyDirEntry {
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// BuildStats — statistics from building KeyDir from a data file scan
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Statistics collected while building a KeyDir from a data file scan.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct BuildStats {
|
||||
/// Total records scanned (live + tombstones + superseded).
|
||||
pub total_records_scanned: u64,
|
||||
/// Number of live documents in the final KeyDir.
|
||||
pub live_documents: u64,
|
||||
/// Number of tombstone records encountered.
|
||||
pub tombstones: u64,
|
||||
/// Number of records superseded by a later write for the same key.
|
||||
pub superseded_records: u64,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// KeyDir
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -116,9 +133,9 @@ impl KeyDir {
|
||||
|
||||
/// Rebuild the KeyDir by scanning an entire data file.
|
||||
/// The file must start with a valid `FileHeader`.
|
||||
/// Returns `(keydir, dead_bytes)` where `dead_bytes` is the total size of
|
||||
/// Returns `(keydir, dead_bytes, stats)` where `dead_bytes` is the total size of
|
||||
/// stale records (superseded by later writes or tombstoned).
|
||||
pub fn build_from_data_file(path: &Path) -> StorageResult<(Self, u64)> {
|
||||
pub fn build_from_data_file(path: &Path) -> StorageResult<(Self, u64, BuildStats)> {
|
||||
let file = std::fs::File::open(path)?;
|
||||
let mut reader = BufReader::new(file);
|
||||
|
||||
@@ -135,6 +152,7 @@ impl KeyDir {
|
||||
|
||||
let keydir = KeyDir::new();
|
||||
let mut dead_bytes: u64 = 0;
|
||||
let mut stats = BuildStats::default();
|
||||
|
||||
let scanner = RecordScanner::new(reader, FILE_HEADER_SIZE as u64);
|
||||
for result in scanner {
|
||||
@@ -146,7 +164,10 @@ impl KeyDir {
|
||||
let key = String::from_utf8(record.key)
|
||||
.map_err(|e| StorageError::CorruptRecord(format!("invalid UTF-8 key: {e}")))?;
|
||||
|
||||
stats.total_records_scanned += 1;
|
||||
|
||||
if is_tombstone {
|
||||
stats.tombstones += 1;
|
||||
// Remove from index; the tombstone itself is dead weight
|
||||
if let Some(prev) = keydir.remove(&key) {
|
||||
dead_bytes += prev.record_len as u64;
|
||||
@@ -162,11 +183,13 @@ impl KeyDir {
|
||||
if let Some(prev) = keydir.insert(key, entry) {
|
||||
// Previous version of same key is now dead
|
||||
dead_bytes += prev.record_len as u64;
|
||||
stats.superseded_records += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((keydir, dead_bytes))
|
||||
stats.live_documents = keydir.len();
|
||||
Ok((keydir, dead_bytes, stats))
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
@@ -271,6 +294,86 @@ impl KeyDir {
|
||||
|
||||
Ok(Some(keydir))
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Hint file validation
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// Validate this KeyDir (loaded from a hint file) against the actual data file.
|
||||
/// Returns `Ok(true)` if the hint appears consistent, `Ok(false)` if a rebuild
|
||||
/// from the data file is recommended.
|
||||
///
|
||||
/// Checks:
|
||||
/// 1. All entry offsets + record_len fit within the data file size.
|
||||
/// 2. All entry offsets are >= FILE_HEADER_SIZE.
|
||||
/// 3. A random sample of entries is spot-checked by reading the record at
|
||||
/// the offset and verifying the key matches.
|
||||
pub fn validate_against_data_file(&self, data_path: &Path, sample_size: usize) -> StorageResult<bool> {
|
||||
let file_size = std::fs::metadata(data_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(0);
|
||||
|
||||
if file_size < FILE_HEADER_SIZE as u64 {
|
||||
// Data file is too small to even contain a header
|
||||
return Ok(self.is_empty());
|
||||
}
|
||||
|
||||
// Pass 1: bounds check all entries
|
||||
let mut all_keys: Vec<(String, KeyDirEntry)> = Vec::with_capacity(self.len() as usize);
|
||||
let mut bounds_ok = true;
|
||||
self.for_each(|key, entry| {
|
||||
if entry.offset < FILE_HEADER_SIZE as u64
|
||||
|| entry.offset + entry.record_len as u64 > file_size
|
||||
{
|
||||
bounds_ok = false;
|
||||
}
|
||||
all_keys.push((key.to_string(), *entry));
|
||||
});
|
||||
|
||||
if !bounds_ok {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Pass 2: spot-check a sample of entries by reading records from data.rdb
|
||||
if all_keys.is_empty() {
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// Sort by offset for sequential I/O, take first `sample_size` entries
|
||||
all_keys.sort_by_key(|(_, e)| e.offset);
|
||||
let step = if all_keys.len() <= sample_size {
|
||||
1
|
||||
} else {
|
||||
all_keys.len() / sample_size
|
||||
};
|
||||
|
||||
let mut file = std::fs::File::open(data_path)?;
|
||||
let mut checked = 0usize;
|
||||
for (i, (expected_key, entry)) in all_keys.iter().enumerate() {
|
||||
if checked >= sample_size {
|
||||
break;
|
||||
}
|
||||
if i % step != 0 {
|
||||
continue;
|
||||
}
|
||||
// Seek to the entry's offset and try to decode the record
|
||||
file.seek(SeekFrom::Start(entry.offset))?;
|
||||
match DataRecord::decode_from(&mut file) {
|
||||
Ok(Some((record, _disk_size))) => {
|
||||
let record_key = String::from_utf8_lossy(&record.key);
|
||||
if record_key != *expected_key {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Ok(None) | Err(_) => {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
checked += 1;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for KeyDir {
|
||||
@@ -372,7 +475,7 @@ mod tests {
|
||||
f.write_all(&r3.encode()).unwrap();
|
||||
}
|
||||
|
||||
let (kd, dead_bytes) = KeyDir::build_from_data_file(&data_path).unwrap();
|
||||
let (kd, dead_bytes, stats) = KeyDir::build_from_data_file(&data_path).unwrap();
|
||||
|
||||
// Only B should be live
|
||||
assert_eq!(kd.len(), 1);
|
||||
@@ -381,6 +484,12 @@ mod tests {
|
||||
|
||||
// Dead bytes: r1 (aaa live, then superseded by tombstone) + r3 (tombstone itself)
|
||||
assert!(dead_bytes > 0);
|
||||
|
||||
// Stats
|
||||
assert_eq!(stats.total_records_scanned, 3);
|
||||
assert_eq!(stats.live_documents, 1);
|
||||
assert_eq!(stats.tombstones, 1);
|
||||
assert_eq!(stats.superseded_records, 0); // aaa was removed by tombstone, not superseded
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -16,13 +16,14 @@ pub mod keydir;
|
||||
pub mod memory;
|
||||
pub mod oplog;
|
||||
pub mod record;
|
||||
pub mod validate;
|
||||
|
||||
pub use adapter::StorageAdapter;
|
||||
pub use binary_wal::{BinaryWal, WalEntry, WalOpType};
|
||||
pub use compaction::{compact_data_file, should_compact, CompactionResult};
|
||||
pub use error::{StorageError, StorageResult};
|
||||
pub use file::FileStorageAdapter;
|
||||
pub use keydir::{KeyDir, KeyDirEntry};
|
||||
pub use keydir::{BuildStats, KeyDir, KeyDirEntry};
|
||||
pub use memory::MemoryStorageAdapter;
|
||||
pub use oplog::{OpLog, OpLogEntry, OpLogStats, OpType};
|
||||
pub use record::{
|
||||
|
||||
@@ -0,0 +1,324 @@
|
||||
//! Data integrity validation for RustDb storage directories.
|
||||
//!
|
||||
//! Provides offline validation of data files without starting the server.
|
||||
//! Checks header magic, record CRC32 checksums, duplicate IDs, and
|
||||
//! keydir.hint consistency.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::io::{BufReader, Read};
|
||||
use std::path::Path;
|
||||
|
||||
use crate::error::{StorageError, StorageResult};
|
||||
use crate::keydir::KeyDir;
|
||||
use crate::record::{FileHeader, FileType, RecordScanner, FILE_HEADER_SIZE};
|
||||
|
||||
/// Result of validating an entire data directory.
|
||||
pub struct ValidationReport {
|
||||
pub collections: Vec<CollectionReport>,
|
||||
}
|
||||
|
||||
/// Result of validating a single collection.
|
||||
pub struct CollectionReport {
|
||||
pub db: String,
|
||||
pub collection: String,
|
||||
pub header_valid: bool,
|
||||
pub total_records: u64,
|
||||
pub live_documents: u64,
|
||||
pub tombstones: u64,
|
||||
pub duplicate_ids: Vec<String>,
|
||||
pub checksum_errors: u64,
|
||||
pub decode_errors: u64,
|
||||
pub data_file_size: u64,
|
||||
pub hint_file_exists: bool,
|
||||
pub orphaned_hint_entries: u64,
|
||||
pub errors: Vec<String>,
|
||||
}
|
||||
|
||||
impl ValidationReport {
|
||||
/// Whether any errors were found across all collections.
|
||||
pub fn has_errors(&self) -> bool {
|
||||
self.collections.iter().any(|c| {
|
||||
!c.header_valid
|
||||
|| !c.duplicate_ids.is_empty()
|
||||
|| c.checksum_errors > 0
|
||||
|| c.decode_errors > 0
|
||||
|| c.orphaned_hint_entries > 0
|
||||
|| !c.errors.is_empty()
|
||||
})
|
||||
}
|
||||
|
||||
/// Print a human-readable summary to stdout.
|
||||
pub fn print_summary(&self) {
|
||||
println!("=== SmartDB Data Integrity Report ===");
|
||||
println!();
|
||||
|
||||
let mut total_errors = 0u64;
|
||||
|
||||
for report in &self.collections {
|
||||
println!("Database: {}", report.db);
|
||||
println!(" Collection: {}", report.collection);
|
||||
println!(
|
||||
" Header: {}",
|
||||
if report.header_valid { "OK" } else { "INVALID" }
|
||||
);
|
||||
println!(
|
||||
" Records: {} ({} live, {} tombstones)",
|
||||
report.total_records, report.live_documents, report.tombstones
|
||||
);
|
||||
println!(" Data size: {} bytes", report.data_file_size);
|
||||
|
||||
if report.duplicate_ids.is_empty() {
|
||||
println!(" Duplicates: 0");
|
||||
} else {
|
||||
let ids_preview: Vec<&str> = report.duplicate_ids.iter().take(5).map(|s| s.as_str()).collect();
|
||||
let suffix = if report.duplicate_ids.len() > 5 {
|
||||
format!(", ... and {} more", report.duplicate_ids.len() - 5)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
println!(
|
||||
" Duplicates: {} (ids: {}{})",
|
||||
report.duplicate_ids.len(),
|
||||
ids_preview.join(", "),
|
||||
suffix
|
||||
);
|
||||
}
|
||||
|
||||
if report.checksum_errors > 0 {
|
||||
println!(" CRC errors: {}", report.checksum_errors);
|
||||
} else {
|
||||
println!(" CRC errors: 0");
|
||||
}
|
||||
|
||||
if report.decode_errors > 0 {
|
||||
println!(" Decode errors: {}", report.decode_errors);
|
||||
}
|
||||
|
||||
if report.hint_file_exists {
|
||||
if report.orphaned_hint_entries > 0 {
|
||||
println!(
|
||||
" Hint file: STALE ({} orphaned entries)",
|
||||
report.orphaned_hint_entries
|
||||
);
|
||||
} else {
|
||||
println!(" Hint file: OK");
|
||||
}
|
||||
} else {
|
||||
println!(" Hint file: absent");
|
||||
}
|
||||
|
||||
for err in &report.errors {
|
||||
println!(" ERROR: {}", err);
|
||||
}
|
||||
|
||||
println!();
|
||||
|
||||
if !report.header_valid { total_errors += 1; }
|
||||
total_errors += report.duplicate_ids.len() as u64;
|
||||
total_errors += report.checksum_errors;
|
||||
total_errors += report.decode_errors;
|
||||
total_errors += report.orphaned_hint_entries;
|
||||
total_errors += report.errors.len() as u64;
|
||||
}
|
||||
|
||||
println!(
|
||||
"Summary: {} collection(s) checked, {} error(s) found.",
|
||||
self.collections.len(),
|
||||
total_errors
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Validate all collections in a data directory.
|
||||
///
|
||||
/// The directory structure is expected to be:
|
||||
/// ```text
|
||||
/// {base_path}/{db}/{collection}/data.rdb
|
||||
/// ```
|
||||
pub fn validate_data_directory(base_path: &str) -> StorageResult<ValidationReport> {
|
||||
let base = Path::new(base_path);
|
||||
if !base.exists() {
|
||||
return Err(StorageError::IoError(std::io::Error::new(
|
||||
std::io::ErrorKind::NotFound,
|
||||
format!("data directory not found: {base_path}"),
|
||||
)));
|
||||
}
|
||||
|
||||
let mut collections = Vec::new();
|
||||
|
||||
// Iterate database directories
|
||||
let entries = std::fs::read_dir(base)?;
|
||||
for entry in entries {
|
||||
let entry = entry?;
|
||||
if !entry.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let db_name = match entry.file_name().to_str() {
|
||||
Some(s) => s.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
// Iterate collection directories
|
||||
let db_entries = std::fs::read_dir(entry.path())?;
|
||||
for coll_entry in db_entries {
|
||||
let coll_entry = coll_entry?;
|
||||
if !coll_entry.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let coll_name = match coll_entry.file_name().to_str() {
|
||||
Some(s) => s.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let data_path = coll_entry.path().join("data.rdb");
|
||||
if !data_path.exists() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let report = validate_collection(&db_name, &coll_name, &coll_entry.path());
|
||||
collections.push(report);
|
||||
}
|
||||
}
|
||||
|
||||
// Sort for deterministic output
|
||||
collections.sort_by(|a, b| (&a.db, &a.collection).cmp(&(&b.db, &b.collection)));
|
||||
|
||||
Ok(ValidationReport { collections })
|
||||
}
|
||||
|
||||
/// Validate a single collection directory.
|
||||
fn validate_collection(db: &str, coll: &str, coll_dir: &Path) -> CollectionReport {
|
||||
let data_path = coll_dir.join("data.rdb");
|
||||
let hint_path = coll_dir.join("keydir.hint");
|
||||
|
||||
let mut report = CollectionReport {
|
||||
db: db.to_string(),
|
||||
collection: coll.to_string(),
|
||||
header_valid: false,
|
||||
total_records: 0,
|
||||
live_documents: 0,
|
||||
tombstones: 0,
|
||||
duplicate_ids: Vec::new(),
|
||||
checksum_errors: 0,
|
||||
decode_errors: 0,
|
||||
data_file_size: 0,
|
||||
hint_file_exists: hint_path.exists(),
|
||||
orphaned_hint_entries: 0,
|
||||
errors: Vec::new(),
|
||||
};
|
||||
|
||||
// Get file size
|
||||
match std::fs::metadata(&data_path) {
|
||||
Ok(m) => report.data_file_size = m.len(),
|
||||
Err(e) => {
|
||||
report.errors.push(format!("cannot stat data.rdb: {e}"));
|
||||
return report;
|
||||
}
|
||||
}
|
||||
|
||||
// Open and validate header
|
||||
let file = match std::fs::File::open(&data_path) {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
report.errors.push(format!("cannot open data.rdb: {e}"));
|
||||
return report;
|
||||
}
|
||||
};
|
||||
let mut reader = BufReader::new(file);
|
||||
|
||||
let mut hdr_buf = [0u8; FILE_HEADER_SIZE];
|
||||
if let Err(e) = reader.read_exact(&mut hdr_buf) {
|
||||
report.errors.push(format!("cannot read header: {e}"));
|
||||
return report;
|
||||
}
|
||||
|
||||
match FileHeader::decode(&hdr_buf) {
|
||||
Ok(hdr) => {
|
||||
if hdr.file_type != FileType::Data {
|
||||
report.errors.push(format!(
|
||||
"wrong file type: expected Data, got {:?}",
|
||||
hdr.file_type
|
||||
));
|
||||
} else {
|
||||
report.header_valid = true;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
report.errors.push(format!("invalid header: {e}"));
|
||||
return report;
|
||||
}
|
||||
}
|
||||
|
||||
// Scan all records
|
||||
let mut id_counts: HashMap<String, u64> = HashMap::new();
|
||||
let mut live_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
|
||||
let scanner = RecordScanner::new(reader, FILE_HEADER_SIZE as u64);
|
||||
|
||||
for result in scanner {
|
||||
match result {
|
||||
Ok((_offset, record)) => {
|
||||
report.total_records += 1;
|
||||
let key = String::from_utf8_lossy(&record.key).to_string();
|
||||
|
||||
if record.is_tombstone() {
|
||||
report.tombstones += 1;
|
||||
live_ids.remove(&key);
|
||||
} else {
|
||||
*id_counts.entry(key.clone()).or_insert(0) += 1;
|
||||
live_ids.insert(key);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let err_str = e.to_string();
|
||||
if err_str.contains("checksum") || err_str.contains("Checksum") {
|
||||
report.checksum_errors += 1;
|
||||
} else {
|
||||
report.decode_errors += 1;
|
||||
}
|
||||
// Cannot continue scanning after a decode error — the stream position is lost
|
||||
report.errors.push(format!("record decode error: {e}"));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
report.live_documents = live_ids.len() as u64;
|
||||
|
||||
// Find duplicates (keys that appeared more than once as live inserts)
|
||||
for (id, count) in &id_counts {
|
||||
if *count > 1 {
|
||||
report.duplicate_ids.push(id.clone());
|
||||
}
|
||||
}
|
||||
report.duplicate_ids.sort();
|
||||
|
||||
// Validate hint file if present
|
||||
if hint_path.exists() {
|
||||
match KeyDir::load_from_hint_file(&hint_path) {
|
||||
Ok(Some(hint_kd)) => {
|
||||
// Check for orphaned entries: keys in hint but not live in data
|
||||
hint_kd.for_each(|key, _entry| {
|
||||
if !live_ids.contains(key) {
|
||||
report.orphaned_hint_entries += 1;
|
||||
}
|
||||
});
|
||||
|
||||
// Also check if hint references offsets beyond file size
|
||||
hint_kd.for_each(|_key, entry| {
|
||||
if entry.offset + entry.record_len as u64 > report.data_file_size {
|
||||
report.orphaned_hint_entries += 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
Ok(None) => {
|
||||
// File existed but was empty or unreadable
|
||||
report.errors.push("hint file exists but is empty".into());
|
||||
}
|
||||
Err(e) => {
|
||||
report.errors.push(format!("hint file decode error: {e}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
report
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
pub mod management;
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use dashmap::DashMap;
|
||||
@@ -14,7 +16,7 @@ use rustdb_config::{RustDbOptions, StorageType};
|
||||
use rustdb_wire::{WireCodec, OP_QUERY};
|
||||
use rustdb_wire::{encode_op_msg_response, encode_op_reply_response};
|
||||
use rustdb_storage::{StorageAdapter, MemoryStorageAdapter, FileStorageAdapter, OpLog};
|
||||
// IndexEngine is used indirectly via CommandContext
|
||||
use rustdb_index::{IndexEngine, IndexOptions};
|
||||
use rustdb_txn::{TransactionEngine, SessionEngine};
|
||||
use rustdb_commands::{CommandRouter, CommandContext};
|
||||
|
||||
@@ -33,7 +35,16 @@ impl RustDb {
|
||||
// Create storage adapter
|
||||
let storage: Arc<dyn StorageAdapter> = match options.storage {
|
||||
StorageType::Memory => {
|
||||
let adapter = MemoryStorageAdapter::new();
|
||||
let adapter = if let Some(ref pp) = options.persist_path {
|
||||
tracing::info!("MemoryStorageAdapter with periodic persistence to {}", pp);
|
||||
MemoryStorageAdapter::with_persist_path(PathBuf::from(pp))
|
||||
} else {
|
||||
tracing::warn!(
|
||||
"SmartDB is using in-memory storage — data will NOT survive a restart. \
|
||||
Set storage to 'file' for durable persistence."
|
||||
);
|
||||
MemoryStorageAdapter::new()
|
||||
};
|
||||
Arc::new(adapter)
|
||||
}
|
||||
StorageType::File => {
|
||||
@@ -49,9 +60,99 @@ impl RustDb {
|
||||
// Initialize storage
|
||||
storage.initialize().await?;
|
||||
|
||||
// Restore any previously persisted state (no-op for file storage and
|
||||
// memory storage without a persist_path).
|
||||
storage.restore().await?;
|
||||
|
||||
// Spawn periodic persistence task for memory storage with persist_path.
|
||||
if options.storage == StorageType::Memory && options.persist_path.is_some() {
|
||||
let persist_storage = storage.clone();
|
||||
let interval_ms = options.persist_interval_ms;
|
||||
tokio::spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
|
||||
interval.tick().await; // skip the immediate first tick
|
||||
loop {
|
||||
interval.tick().await;
|
||||
if let Err(e) = persist_storage.persist().await {
|
||||
tracing::error!("Periodic persist failed: {}", e);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let indexes: Arc<DashMap<String, IndexEngine>> = Arc::new(DashMap::new());
|
||||
|
||||
// Restore persisted indexes from storage.
|
||||
if let Ok(databases) = storage.list_databases().await {
|
||||
for db_name in &databases {
|
||||
if let Ok(collections) = storage.list_collections(db_name).await {
|
||||
for coll_name in &collections {
|
||||
if let Ok(specs) = storage.get_indexes(db_name, coll_name).await {
|
||||
let has_custom = specs.iter().any(|s| {
|
||||
s.get_str("name").unwrap_or("_id_") != "_id_"
|
||||
});
|
||||
if !has_custom {
|
||||
continue;
|
||||
}
|
||||
|
||||
let ns_key = format!("{}.{}", db_name, coll_name);
|
||||
let mut engine = IndexEngine::new();
|
||||
|
||||
for spec in &specs {
|
||||
let name = spec.get_str("name").unwrap_or("").to_string();
|
||||
if name == "_id_" {
|
||||
continue; // already created by IndexEngine::new()
|
||||
}
|
||||
let key = match spec.get("key") {
|
||||
Some(bson::Bson::Document(k)) => k.clone(),
|
||||
_ => continue,
|
||||
};
|
||||
let unique = matches!(spec.get("unique"), Some(bson::Bson::Boolean(true)));
|
||||
let sparse = matches!(spec.get("sparse"), Some(bson::Bson::Boolean(true)));
|
||||
let expire_after_seconds = match spec.get("expireAfterSeconds") {
|
||||
Some(bson::Bson::Int32(n)) => Some(*n as u64),
|
||||
Some(bson::Bson::Int64(n)) => Some(*n as u64),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let options = IndexOptions {
|
||||
name: Some(name.clone()),
|
||||
unique,
|
||||
sparse,
|
||||
expire_after_seconds,
|
||||
};
|
||||
if let Err(e) = engine.create_index(key, options) {
|
||||
tracing::warn!(
|
||||
namespace = %ns_key,
|
||||
index = %name,
|
||||
error = %e,
|
||||
"failed to restore index"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Rebuild index data from existing documents.
|
||||
if let Ok(docs) = storage.find_all(db_name, coll_name).await {
|
||||
if !docs.is_empty() {
|
||||
engine.rebuild_from_documents(&docs);
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
namespace = %ns_key,
|
||||
indexes = engine.list_indexes().len(),
|
||||
"restored indexes"
|
||||
);
|
||||
indexes.insert(ns_key, engine);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let ctx = Arc::new(CommandContext {
|
||||
storage,
|
||||
indexes: Arc::new(DashMap::new()),
|
||||
indexes,
|
||||
transactions: Arc::new(TransactionEngine::new()),
|
||||
sessions: Arc::new(SessionEngine::new(30 * 60 * 1000, 60 * 1000)),
|
||||
cursors: Arc::new(DashMap::new()),
|
||||
|
||||
@@ -25,6 +25,10 @@ struct Cli {
|
||||
#[arg(long)]
|
||||
validate: bool,
|
||||
|
||||
/// Validate data integrity of a storage directory (offline check)
|
||||
#[arg(long, value_name = "PATH")]
|
||||
validate_data: Option<String>,
|
||||
|
||||
/// Run in management mode (JSON-over-stdin IPC for TypeScript wrapper)
|
||||
#[arg(long)]
|
||||
management: bool,
|
||||
@@ -55,7 +59,7 @@ async fn main() -> Result<()> {
|
||||
let options = RustDbOptions::from_file(&cli.config)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to load config '{}': {}", cli.config, e))?;
|
||||
|
||||
// Validate-only mode
|
||||
// Validate-only mode (config)
|
||||
if cli.validate {
|
||||
match options.validate() {
|
||||
Ok(()) => {
|
||||
@@ -69,6 +73,18 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Validate data integrity mode
|
||||
if let Some(ref data_path) = cli.validate_data {
|
||||
tracing::info!("Validating data integrity at {}", data_path);
|
||||
let report = rustdb_storage::validate::validate_data_directory(data_path)
|
||||
.map_err(|e| anyhow::anyhow!("Validation failed: {}", e))?;
|
||||
report.print_summary();
|
||||
if report.has_errors() {
|
||||
std::process::exit(1);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Create and start server
|
||||
let mut db = RustDb::new(options).await?;
|
||||
db.start().await?;
|
||||
|
||||
@@ -0,0 +1,256 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as smartdb from '../ts/index.js';
|
||||
import { MongoClient, Db } from 'mongodb';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let tmpDir: string;
|
||||
let server: smartdb.SmartdbServer;
|
||||
let client: MongoClient;
|
||||
let db: Db;
|
||||
|
||||
function makeTmpDir(): string {
|
||||
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-compact-test-'));
|
||||
}
|
||||
|
||||
function cleanTmpDir(dir: string): void {
|
||||
if (fs.existsSync(dir)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
function getDataFileSize(storagePath: string, dbName: string, collName: string): number {
|
||||
const dataPath = path.join(storagePath, dbName, collName, 'data.rdb');
|
||||
if (!fs.existsSync(dataPath)) return 0;
|
||||
return fs.statSync(dataPath).size;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Compaction: Setup
|
||||
// ============================================================================
|
||||
|
||||
tap.test('compaction: start server with file storage', async () => {
|
||||
tmpDir = makeTmpDir();
|
||||
server = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-compact-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
client = new MongoClient(server.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('compactdb');
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Compaction: Updates grow the data file
|
||||
// ============================================================================
|
||||
|
||||
tap.test('compaction: repeated updates grow the data file', async () => {
|
||||
const coll = db.collection('growing');
|
||||
|
||||
// Insert a document
|
||||
await coll.insertOne({ key: 'target', counter: 0, payload: 'x'.repeat(200) });
|
||||
|
||||
const sizeAfterInsert = getDataFileSize(tmpDir, 'compactdb', 'growing');
|
||||
expect(sizeAfterInsert).toBeGreaterThan(0);
|
||||
|
||||
// Update the same document 50 times — each update appends a new record
|
||||
for (let i = 1; i <= 50; i++) {
|
||||
await coll.updateOne(
|
||||
{ key: 'target' },
|
||||
{ $set: { counter: i, payload: 'y'.repeat(200) } }
|
||||
);
|
||||
}
|
||||
|
||||
const sizeAfterUpdates = getDataFileSize(tmpDir, 'compactdb', 'growing');
|
||||
// Compaction may have run during updates, so we can't assert the file is
|
||||
// much larger. What matters is the data is correct.
|
||||
|
||||
// The collection still has just 1 document
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(1);
|
||||
|
||||
const doc = await coll.findOne({ key: 'target' });
|
||||
expect(doc!.counter).toEqual(50);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Compaction: Deletes create tombstones
|
||||
// ============================================================================
|
||||
|
||||
tap.test('compaction: insert-then-delete creates dead space', async () => {
|
||||
const coll = db.collection('tombstones');
|
||||
|
||||
// Insert 100 documents
|
||||
const docs = [];
|
||||
for (let i = 0; i < 100; i++) {
|
||||
docs.push({ idx: i, data: 'delete-me-' + 'z'.repeat(100) });
|
||||
}
|
||||
await coll.insertMany(docs);
|
||||
|
||||
const sizeAfterInsert = getDataFileSize(tmpDir, 'compactdb', 'tombstones');
|
||||
|
||||
// Delete all 100
|
||||
await coll.deleteMany({});
|
||||
|
||||
const sizeAfterDelete = getDataFileSize(tmpDir, 'compactdb', 'tombstones');
|
||||
// File may have been compacted during deletes (dead > 50% threshold),
|
||||
// but the operation itself should succeed regardless of file size.
|
||||
// After deleting all docs, the file might be very small (just header + compacted).
|
||||
|
||||
// But count is 0
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(0);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Compaction: Data integrity after compaction trigger
|
||||
// ============================================================================
|
||||
|
||||
tap.test('compaction: data file shrinks after heavy updates trigger compaction', async () => {
|
||||
const coll = db.collection('shrinktest');
|
||||
|
||||
// Insert 10 documents with large payloads
|
||||
const docs = [];
|
||||
for (let i = 0; i < 10; i++) {
|
||||
docs.push({ idx: i, data: 'a'.repeat(500) });
|
||||
}
|
||||
await coll.insertMany(docs);
|
||||
|
||||
const sizeAfterInsert = getDataFileSize(tmpDir, 'compactdb', 'shrinktest');
|
||||
|
||||
// Update each document 20 times (creates 200 dead records vs 10 live)
|
||||
// This should trigger compaction (dead > 50% threshold)
|
||||
for (let round = 0; round < 20; round++) {
|
||||
for (let i = 0; i < 10; i++) {
|
||||
await coll.updateOne(
|
||||
{ idx: i },
|
||||
{ $set: { data: `round-${round}-` + 'b'.repeat(500) } }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// After compaction, file should be smaller than the pre-compaction peak
|
||||
// (We can't measure the peak exactly, but the final size should be reasonable)
|
||||
const sizeAfterCompaction = getDataFileSize(tmpDir, 'compactdb', 'shrinktest');
|
||||
|
||||
// The file should not be 20x the insert size since compaction should have run
|
||||
// With 10 live records of ~530 bytes each, the file should be roughly that
|
||||
// plus header overhead. Without compaction it would be 210 * ~530 bytes.
|
||||
const maxExpectedSize = sizeAfterInsert * 5; // generous upper bound
|
||||
expect(sizeAfterCompaction).toBeLessThanOrEqual(maxExpectedSize);
|
||||
|
||||
// All documents should still be readable and correct
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(10);
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const doc = await coll.findOne({ idx: i });
|
||||
expect(doc).toBeTruthy();
|
||||
expect(doc!.data.startsWith('round-19-')).toBeTrue();
|
||||
}
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Compaction: Persistence after compaction + restart
|
||||
// ============================================================================
|
||||
|
||||
tap.test('compaction: data survives compaction + restart', async () => {
|
||||
await client.close();
|
||||
await server.stop();
|
||||
|
||||
server = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-compact-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
client = new MongoClient(server.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('compactdb');
|
||||
|
||||
// Verify shrinktest data
|
||||
const coll = db.collection('shrinktest');
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(10);
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const doc = await coll.findOne({ idx: i });
|
||||
expect(doc).toBeTruthy();
|
||||
expect(doc!.data.startsWith('round-19-')).toBeTrue();
|
||||
}
|
||||
|
||||
// Verify growing collection
|
||||
const growing = db.collection('growing');
|
||||
const growDoc = await growing.findOne({ key: 'target' });
|
||||
expect(growDoc).toBeTruthy();
|
||||
expect(growDoc!.counter).toEqual(50);
|
||||
|
||||
// Verify tombstones collection is empty
|
||||
const tombCount = await db.collection('tombstones').countDocuments();
|
||||
expect(tombCount).toEqual(0);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Compaction: Mixed operations stress test
|
||||
// ============================================================================
|
||||
|
||||
tap.test('compaction: mixed insert-update-delete stress test', async () => {
|
||||
const coll = db.collection('stress');
|
||||
|
||||
// Phase 1: Insert 200 documents
|
||||
const batch = [];
|
||||
for (let i = 0; i < 200; i++) {
|
||||
batch.push({ idx: i, value: `initial-${i}`, alive: true });
|
||||
}
|
||||
await coll.insertMany(batch);
|
||||
|
||||
// Phase 2: Update every even-indexed document
|
||||
for (let i = 0; i < 200; i += 2) {
|
||||
await coll.updateOne({ idx: i }, { $set: { value: `updated-${i}` } });
|
||||
}
|
||||
|
||||
// Phase 3: Delete every document where idx % 3 === 0
|
||||
await coll.deleteMany({ idx: { $in: Array.from({ length: 67 }, (_, k) => k * 3) } });
|
||||
|
||||
// Verify: documents where idx % 3 !== 0 should remain
|
||||
const remaining = await coll.find({}).toArray();
|
||||
for (const doc of remaining) {
|
||||
expect(doc.idx % 3).not.toEqual(0);
|
||||
if (doc.idx % 2 === 0) {
|
||||
expect(doc.value).toEqual(`updated-${doc.idx}`);
|
||||
} else {
|
||||
expect(doc.value).toEqual(`initial-${doc.idx}`);
|
||||
}
|
||||
}
|
||||
|
||||
// Count should be 200 - 67 = 133
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(133);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Cleanup
|
||||
// ============================================================================
|
||||
|
||||
tap.test('compaction: cleanup', async () => {
|
||||
await client.close();
|
||||
await server.stop();
|
||||
cleanTmpDir(tmpDir);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,394 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as smartdb from '../ts/index.js';
|
||||
import { MongoClient, Db } from 'mongodb';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let tmpDir: string;
|
||||
let server: smartdb.SmartdbServer;
|
||||
let client: MongoClient;
|
||||
let db: Db;
|
||||
|
||||
function makeTmpDir(): string {
|
||||
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-test-'));
|
||||
}
|
||||
|
||||
function cleanTmpDir(dir: string): void {
|
||||
if (fs.existsSync(dir)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Startup
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: should start server with file storage', async () => {
|
||||
tmpDir = makeTmpDir();
|
||||
server = new smartdb.SmartdbServer({
|
||||
port: 27118,
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
expect(server.running).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('file-storage: should connect MongoClient', async () => {
|
||||
client = new MongoClient('mongodb://127.0.0.1:27118', {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('filetest');
|
||||
expect(db).toBeTruthy();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Data files are created on disk
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: inserting creates data files on disk', async () => {
|
||||
const coll = db.collection('diskcheck');
|
||||
await coll.insertOne({ name: 'disk-test', value: 42 });
|
||||
|
||||
// The storage directory should now contain a database directory
|
||||
const dbDir = path.join(tmpDir, 'filetest');
|
||||
expect(fs.existsSync(dbDir)).toBeTrue();
|
||||
|
||||
// Collection directory with data.rdb should exist
|
||||
const collDir = path.join(dbDir, 'diskcheck');
|
||||
expect(fs.existsSync(collDir)).toBeTrue();
|
||||
|
||||
const dataFile = path.join(collDir, 'data.rdb');
|
||||
expect(fs.existsSync(dataFile)).toBeTrue();
|
||||
|
||||
// data.rdb should have the SMARTDB magic header
|
||||
const header = Buffer.alloc(8);
|
||||
const fd = fs.openSync(dataFile, 'r');
|
||||
fs.readSync(fd, header, 0, 8, 0);
|
||||
fs.closeSync(fd);
|
||||
expect(header.toString('ascii')).toEqual('SMARTDB\0');
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Full CRUD cycle
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: insertOne returns valid id', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const result = await coll.insertOne({ name: 'Alice', age: 30 });
|
||||
expect(result.acknowledged).toBeTrue();
|
||||
expect(result.insertedId).toBeTruthy();
|
||||
});
|
||||
|
||||
tap.test('file-storage: insertMany returns all ids', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const result = await coll.insertMany([
|
||||
{ name: 'Bob', age: 25 },
|
||||
{ name: 'Charlie', age: 35 },
|
||||
{ name: 'Diana', age: 28 },
|
||||
{ name: 'Eve', age: 32 },
|
||||
]);
|
||||
expect(result.insertedCount).toEqual(4);
|
||||
});
|
||||
|
||||
tap.test('file-storage: findOne retrieves correct document', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const doc = await coll.findOne({ name: 'Alice' });
|
||||
expect(doc).toBeTruthy();
|
||||
expect(doc!.name).toEqual('Alice');
|
||||
expect(doc!.age).toEqual(30);
|
||||
});
|
||||
|
||||
tap.test('file-storage: find with filter returns correct subset', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const docs = await coll.find({ age: { $gte: 30 } }).toArray();
|
||||
expect(docs.length).toEqual(3); // Alice(30), Charlie(35), Eve(32)
|
||||
expect(docs.every(d => d.age >= 30)).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('file-storage: updateOne modifies document', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const result = await coll.updateOne(
|
||||
{ name: 'Alice' },
|
||||
{ $set: { age: 31, updated: true } }
|
||||
);
|
||||
expect(result.modifiedCount).toEqual(1);
|
||||
|
||||
const doc = await coll.findOne({ name: 'Alice' });
|
||||
expect(doc!.age).toEqual(31);
|
||||
expect(doc!.updated).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('file-storage: deleteOne removes document', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const result = await coll.deleteOne({ name: 'Eve' });
|
||||
expect(result.deletedCount).toEqual(1);
|
||||
|
||||
const doc = await coll.findOne({ name: 'Eve' });
|
||||
expect(doc).toBeNull();
|
||||
});
|
||||
|
||||
tap.test('file-storage: count reflects current state', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(4); // 5 inserted - 1 deleted = 4
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Persistence across server restart
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: stop server for restart test', async () => {
|
||||
await client.close();
|
||||
await server.stop();
|
||||
expect(server.running).toBeFalse();
|
||||
});
|
||||
|
||||
tap.test('file-storage: restart server with same data path', async () => {
|
||||
server = new smartdb.SmartdbServer({
|
||||
port: 27118,
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
expect(server.running).toBeTrue();
|
||||
|
||||
client = new MongoClient('mongodb://127.0.0.1:27118', {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('filetest');
|
||||
});
|
||||
|
||||
tap.test('file-storage: data persists after restart', async () => {
|
||||
const coll = db.collection('crud');
|
||||
|
||||
// Alice should still be there with updated age
|
||||
const alice = await coll.findOne({ name: 'Alice' });
|
||||
expect(alice).toBeTruthy();
|
||||
expect(alice!.age).toEqual(31);
|
||||
expect(alice!.updated).toBeTrue();
|
||||
|
||||
// Bob, Charlie, Diana should be there
|
||||
const bob = await coll.findOne({ name: 'Bob' });
|
||||
expect(bob).toBeTruthy();
|
||||
expect(bob!.age).toEqual(25);
|
||||
|
||||
const charlie = await coll.findOne({ name: 'Charlie' });
|
||||
expect(charlie).toBeTruthy();
|
||||
|
||||
const diana = await coll.findOne({ name: 'Diana' });
|
||||
expect(diana).toBeTruthy();
|
||||
|
||||
// Eve should still be deleted
|
||||
const eve = await coll.findOne({ name: 'Eve' });
|
||||
expect(eve).toBeNull();
|
||||
});
|
||||
|
||||
tap.test('file-storage: count is correct after restart', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(4);
|
||||
});
|
||||
|
||||
tap.test('file-storage: can write new data after restart', async () => {
|
||||
const coll = db.collection('crud');
|
||||
const result = await coll.insertOne({ name: 'Frank', age: 45 });
|
||||
expect(result.acknowledged).toBeTrue();
|
||||
|
||||
const doc = await coll.findOne({ name: 'Frank' });
|
||||
expect(doc).toBeTruthy();
|
||||
expect(doc!.age).toEqual(45);
|
||||
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(5);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Multiple collections in same database
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: multiple collections are independent', async () => {
|
||||
const products = db.collection('products');
|
||||
const orders = db.collection('orders');
|
||||
|
||||
await products.insertMany([
|
||||
{ sku: 'A001', name: 'Widget', price: 9.99 },
|
||||
{ sku: 'A002', name: 'Gadget', price: 19.99 },
|
||||
]);
|
||||
|
||||
await orders.insertMany([
|
||||
{ orderId: 1, sku: 'A001', qty: 3 },
|
||||
{ orderId: 2, sku: 'A002', qty: 1 },
|
||||
{ orderId: 3, sku: 'A001', qty: 2 },
|
||||
]);
|
||||
|
||||
const productCount = await products.countDocuments();
|
||||
const orderCount = await orders.countDocuments();
|
||||
expect(productCount).toEqual(2);
|
||||
expect(orderCount).toEqual(3);
|
||||
|
||||
// Deleting from one collection doesn't affect the other
|
||||
await products.deleteOne({ sku: 'A001' });
|
||||
expect(await products.countDocuments()).toEqual(1);
|
||||
expect(await orders.countDocuments()).toEqual(3);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Multiple databases
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: multiple databases are independent', async () => {
|
||||
const db2 = client.db('filetest2');
|
||||
const coll2 = db2.collection('items');
|
||||
|
||||
await coll2.insertOne({ name: 'cross-db-test', source: 'db2' });
|
||||
|
||||
// db2 has 1 doc
|
||||
const count2 = await coll2.countDocuments();
|
||||
expect(count2).toEqual(1);
|
||||
|
||||
// original db is unaffected
|
||||
const crudCount = await db.collection('crud').countDocuments();
|
||||
expect(crudCount).toEqual(5);
|
||||
|
||||
await db2.dropDatabase();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Large batch insert and retrieval
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: bulk insert 1000 documents', async () => {
|
||||
const coll = db.collection('bulk');
|
||||
const docs = [];
|
||||
for (let i = 0; i < 1000; i++) {
|
||||
docs.push({ index: i, data: `value-${i}`, timestamp: Date.now() });
|
||||
}
|
||||
const result = await coll.insertMany(docs);
|
||||
expect(result.insertedCount).toEqual(1000);
|
||||
});
|
||||
|
||||
tap.test('file-storage: find all 1000 documents', async () => {
|
||||
const coll = db.collection('bulk');
|
||||
const docs = await coll.find({}).toArray();
|
||||
expect(docs.length).toEqual(1000);
|
||||
});
|
||||
|
||||
tap.test('file-storage: range query on 1000 documents', async () => {
|
||||
const coll = db.collection('bulk');
|
||||
const docs = await coll.find({ index: { $gte: 500, $lt: 600 } }).toArray();
|
||||
expect(docs.length).toEqual(100);
|
||||
expect(docs.every(d => d.index >= 500 && d.index < 600)).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('file-storage: sorted retrieval with limit', async () => {
|
||||
const coll = db.collection('bulk');
|
||||
const docs = await coll.find({}).sort({ index: -1 }).limit(10).toArray();
|
||||
expect(docs.length).toEqual(10);
|
||||
expect(docs[0].index).toEqual(999);
|
||||
expect(docs[9].index).toEqual(990);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Update many and verify persistence
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: updateMany on bulk collection', async () => {
|
||||
const coll = db.collection('bulk');
|
||||
const result = await coll.updateMany(
|
||||
{ index: { $lt: 100 } },
|
||||
{ $set: { batch: 'first-hundred' } }
|
||||
);
|
||||
expect(result.modifiedCount).toEqual(100);
|
||||
|
||||
const updated = await coll.find({ batch: 'first-hundred' }).toArray();
|
||||
expect(updated.length).toEqual(100);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Delete many and verify
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: deleteMany removes correct documents', async () => {
|
||||
const coll = db.collection('bulk');
|
||||
const result = await coll.deleteMany({ index: { $gte: 900 } });
|
||||
expect(result.deletedCount).toEqual(100);
|
||||
|
||||
const remaining = await coll.countDocuments();
|
||||
expect(remaining).toEqual(900);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Persistence of bulk data across restart
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: stop server for bulk restart test', async () => {
|
||||
await client.close();
|
||||
await server.stop();
|
||||
expect(server.running).toBeFalse();
|
||||
});
|
||||
|
||||
tap.test('file-storage: restart and verify bulk data', async () => {
|
||||
server = new smartdb.SmartdbServer({
|
||||
port: 27118,
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
client = new MongoClient('mongodb://127.0.0.1:27118', {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('filetest');
|
||||
|
||||
const coll = db.collection('bulk');
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(900);
|
||||
|
||||
// Verify the updateMany persisted
|
||||
const firstHundred = await coll.find({ batch: 'first-hundred' }).toArray();
|
||||
expect(firstHundred.length).toEqual(100);
|
||||
|
||||
// Verify deleted docs are gone
|
||||
const over900 = await coll.find({ index: { $gte: 900 } }).toArray();
|
||||
expect(over900.length).toEqual(0);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// File Storage: Index persistence
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: default indexes.json exists on disk', async () => {
|
||||
// The indexes.json is created when the collection is first created,
|
||||
// containing the default _id_ index spec.
|
||||
const indexFile = path.join(tmpDir, 'filetest', 'crud', 'indexes.json');
|
||||
expect(fs.existsSync(indexFile)).toBeTrue();
|
||||
|
||||
const indexData = JSON.parse(fs.readFileSync(indexFile, 'utf-8'));
|
||||
const names = indexData.map((i: any) => i.name);
|
||||
expect(names).toContain('_id_');
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Cleanup
|
||||
// ============================================================================
|
||||
|
||||
tap.test('file-storage: cleanup', async () => {
|
||||
await client.close();
|
||||
await server.stop();
|
||||
expect(server.running).toBeFalse();
|
||||
cleanTmpDir(tmpDir);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,235 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as smartdb from '../ts/index.js';
|
||||
import { MongoClient, Db } from 'mongodb';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let tmpDir: string;
|
||||
let localDb: smartdb.LocalSmartDb;
|
||||
let client: MongoClient;
|
||||
let db: Db;
|
||||
|
||||
function makeTmpDir(): string {
|
||||
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-local-test-'));
|
||||
}
|
||||
|
||||
function cleanTmpDir(dir: string): void {
|
||||
if (fs.existsSync(dir)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// LocalSmartDb: Lifecycle
|
||||
// ============================================================================
|
||||
|
||||
tap.test('localsmartdb: should start with just a folder path', async () => {
|
||||
tmpDir = makeTmpDir();
|
||||
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
|
||||
const info = await localDb.start();
|
||||
|
||||
expect(localDb.running).toBeTrue();
|
||||
expect(info.socketPath).toBeTruthy();
|
||||
expect(info.connectionUri).toBeTruthy();
|
||||
expect(info.connectionUri.startsWith('mongodb://')).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('localsmartdb: should connect via returned connectionUri', async () => {
|
||||
const info = localDb.getConnectionInfo();
|
||||
client = new MongoClient(info.connectionUri, {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('localtest');
|
||||
expect(db).toBeTruthy();
|
||||
});
|
||||
|
||||
tap.test('localsmartdb: should reject double start', async () => {
|
||||
let threw = false;
|
||||
try {
|
||||
await localDb.start();
|
||||
} catch {
|
||||
threw = true;
|
||||
}
|
||||
expect(threw).toBeTrue();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// LocalSmartDb: CRUD via Unix socket
|
||||
// ============================================================================
|
||||
|
||||
tap.test('localsmartdb: insert and find documents', async () => {
|
||||
const coll = db.collection('notes');
|
||||
await coll.insertMany([
|
||||
{ title: 'Note 1', body: 'First note', priority: 1 },
|
||||
{ title: 'Note 2', body: 'Second note', priority: 2 },
|
||||
{ title: 'Note 3', body: 'Third note', priority: 3 },
|
||||
]);
|
||||
|
||||
const all = await coll.find({}).toArray();
|
||||
expect(all.length).toEqual(3);
|
||||
|
||||
const high = await coll.findOne({ priority: 3 });
|
||||
expect(high).toBeTruthy();
|
||||
expect(high!.title).toEqual('Note 3');
|
||||
});
|
||||
|
||||
tap.test('localsmartdb: update and verify', async () => {
|
||||
const coll = db.collection('notes');
|
||||
await coll.updateOne(
|
||||
{ title: 'Note 2' },
|
||||
{ $set: { body: 'Updated second note', edited: true } }
|
||||
);
|
||||
|
||||
const doc = await coll.findOne({ title: 'Note 2' });
|
||||
expect(doc!.body).toEqual('Updated second note');
|
||||
expect(doc!.edited).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('localsmartdb: delete and verify', async () => {
|
||||
const coll = db.collection('notes');
|
||||
await coll.deleteOne({ title: 'Note 1' });
|
||||
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(2);
|
||||
|
||||
const deleted = await coll.findOne({ title: 'Note 1' });
|
||||
expect(deleted).toBeNull();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// LocalSmartDb: Persistence across restart
|
||||
// ============================================================================
|
||||
|
||||
tap.test('localsmartdb: stop for restart', async () => {
|
||||
await client.close();
|
||||
await localDb.stop();
|
||||
expect(localDb.running).toBeFalse();
|
||||
});
|
||||
|
||||
tap.test('localsmartdb: restart with same folder', async () => {
|
||||
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
|
||||
const info = await localDb.start();
|
||||
expect(localDb.running).toBeTrue();
|
||||
|
||||
client = new MongoClient(info.connectionUri, {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('localtest');
|
||||
});
|
||||
|
||||
tap.test('localsmartdb: data persists after restart', async () => {
|
||||
const coll = db.collection('notes');
|
||||
|
||||
const count = await coll.countDocuments();
|
||||
expect(count).toEqual(2); // 3 inserted - 1 deleted
|
||||
|
||||
const note2 = await coll.findOne({ title: 'Note 2' });
|
||||
expect(note2!.body).toEqual('Updated second note');
|
||||
expect(note2!.edited).toBeTrue();
|
||||
|
||||
const note3 = await coll.findOne({ title: 'Note 3' });
|
||||
expect(note3!.priority).toEqual(3);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// LocalSmartDb: Custom socket path
|
||||
// ============================================================================
|
||||
|
||||
tap.test('localsmartdb: works with custom socket path', async () => {
|
||||
await client.close();
|
||||
await localDb.stop();
|
||||
|
||||
const customSocket = path.join(os.tmpdir(), `smartdb-custom-${Date.now()}.sock`);
|
||||
const tmpDir2 = makeTmpDir();
|
||||
const localDb2 = new smartdb.LocalSmartDb({
|
||||
folderPath: tmpDir2,
|
||||
socketPath: customSocket,
|
||||
});
|
||||
|
||||
const info = await localDb2.start();
|
||||
expect(info.socketPath).toEqual(customSocket);
|
||||
|
||||
const client2 = new MongoClient(info.connectionUri, {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client2.connect();
|
||||
const testDb = client2.db('customsock');
|
||||
await testDb.collection('test').insertOne({ x: 1 });
|
||||
const doc = await testDb.collection('test').findOne({ x: 1 });
|
||||
expect(doc).toBeTruthy();
|
||||
|
||||
await client2.close();
|
||||
await localDb2.stop();
|
||||
cleanTmpDir(tmpDir2);
|
||||
|
||||
// Reconnect original for remaining tests
|
||||
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
|
||||
const origInfo = await localDb.start();
|
||||
client = new MongoClient(origInfo.connectionUri, {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
db = client.db('localtest');
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// LocalSmartDb: getConnectionUri and getServer helpers
|
||||
// ============================================================================
|
||||
|
||||
tap.test('localsmartdb: getConnectionUri returns valid uri', async () => {
|
||||
const uri = localDb.getConnectionUri();
|
||||
expect(uri.startsWith('mongodb://')).toBeTrue();
|
||||
});
|
||||
|
||||
tap.test('localsmartdb: getServer returns the SmartdbServer', async () => {
|
||||
const srv = localDb.getServer();
|
||||
expect(srv).toBeTruthy();
|
||||
expect(srv.running).toBeTrue();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// LocalSmartDb: Data isolation between databases
|
||||
// ============================================================================
|
||||
|
||||
tap.test('localsmartdb: databases are isolated', async () => {
|
||||
const dbA = client.db('isoA');
|
||||
const dbB = client.db('isoB');
|
||||
|
||||
await dbA.collection('shared').insertOne({ source: 'A', val: 1 });
|
||||
await dbB.collection('shared').insertOne({ source: 'B', val: 2 });
|
||||
|
||||
const docsA = await dbA.collection('shared').find({}).toArray();
|
||||
const docsB = await dbB.collection('shared').find({}).toArray();
|
||||
|
||||
expect(docsA.length).toEqual(1);
|
||||
expect(docsA[0].source).toEqual('A');
|
||||
expect(docsB.length).toEqual(1);
|
||||
expect(docsB[0].source).toEqual('B');
|
||||
|
||||
await dbA.dropDatabase();
|
||||
await dbB.dropDatabase();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Cleanup
|
||||
// ============================================================================
|
||||
|
||||
tap.test('localsmartdb: cleanup', async () => {
|
||||
await client.close();
|
||||
await localDb.stop();
|
||||
expect(localDb.running).toBeFalse();
|
||||
cleanTmpDir(tmpDir);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -0,0 +1,269 @@
|
||||
import { expect, tap } from '@git.zone/tstest/tapbundle';
|
||||
import * as smartdb from '../ts/index.js';
|
||||
import { MongoClient, Db } from 'mongodb';
|
||||
import * as fs from 'fs';
|
||||
import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let tmpDir: string;
|
||||
|
||||
function makeTmpDir(): string {
|
||||
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-migration-test-'));
|
||||
}
|
||||
|
||||
function cleanTmpDir(dir: string): void {
|
||||
if (fs.existsSync(dir)) {
|
||||
fs.rmSync(dir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a v0 (legacy JSON) storage layout:
|
||||
* {base}/{db}/{coll}.json
|
||||
* {base}/{db}/{coll}.indexes.json
|
||||
*/
|
||||
function createV0Layout(basePath: string, dbName: string, collName: string, docs: any[]): void {
|
||||
const dbDir = path.join(basePath, dbName);
|
||||
fs.mkdirSync(dbDir, { recursive: true });
|
||||
|
||||
// Convert docs to the extended JSON format that the old Rust engine wrote:
|
||||
// ObjectId is stored as { "$oid": "hex" }
|
||||
const jsonDocs = docs.map(doc => {
|
||||
const clone = { ...doc };
|
||||
if (!clone._id) {
|
||||
// Generate a fake ObjectId-like hex string
|
||||
const hex = [...Array(24)].map(() => Math.floor(Math.random() * 16).toString(16)).join('');
|
||||
clone._id = { '$oid': hex };
|
||||
}
|
||||
return clone;
|
||||
});
|
||||
|
||||
const collPath = path.join(dbDir, `${collName}.json`);
|
||||
fs.writeFileSync(collPath, JSON.stringify(jsonDocs, null, 2));
|
||||
|
||||
const indexPath = path.join(dbDir, `${collName}.indexes.json`);
|
||||
fs.writeFileSync(indexPath, JSON.stringify([
|
||||
{ name: '_id_', key: { _id: 1 } },
|
||||
], null, 2));
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Migration: v0 → v1 basic
|
||||
// ============================================================================
|
||||
|
||||
tap.test('migration: detects v0 format and migrates on startup', async () => {
|
||||
tmpDir = makeTmpDir();
|
||||
|
||||
// Create v0 layout with test data
|
||||
createV0Layout(tmpDir, 'mydb', 'users', [
|
||||
{ name: 'Alice', age: 30, email: 'alice@test.com' },
|
||||
{ name: 'Bob', age: 25, email: 'bob@test.com' },
|
||||
{ name: 'Charlie', age: 35, email: 'charlie@test.com' },
|
||||
]);
|
||||
|
||||
createV0Layout(tmpDir, 'mydb', 'products', [
|
||||
{ sku: 'W001', name: 'Widget', price: 9.99 },
|
||||
{ sku: 'G001', name: 'Gadget', price: 19.99 },
|
||||
]);
|
||||
|
||||
// Verify v0 files exist
|
||||
expect(fs.existsSync(path.join(tmpDir, 'mydb', 'users.json'))).toBeTrue();
|
||||
expect(fs.existsSync(path.join(tmpDir, 'mydb', 'products.json'))).toBeTrue();
|
||||
|
||||
// Start server — migration should run automatically
|
||||
const server = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-mig-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
// v1 directories should now exist
|
||||
expect(fs.existsSync(path.join(tmpDir, 'mydb', 'users', 'data.rdb'))).toBeTrue();
|
||||
expect(fs.existsSync(path.join(tmpDir, 'mydb', 'products', 'data.rdb'))).toBeTrue();
|
||||
|
||||
// v0 files should still exist (not deleted)
|
||||
expect(fs.existsSync(path.join(tmpDir, 'mydb', 'users.json'))).toBeTrue();
|
||||
expect(fs.existsSync(path.join(tmpDir, 'mydb', 'products.json'))).toBeTrue();
|
||||
|
||||
// Connect and verify data is accessible
|
||||
const client = new MongoClient(server.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
const db = client.db('mydb');
|
||||
|
||||
// Users collection
|
||||
const users = await db.collection('users').find({}).toArray();
|
||||
expect(users.length).toEqual(3);
|
||||
const alice = users.find(u => u.name === 'Alice');
|
||||
expect(alice).toBeTruthy();
|
||||
expect(alice!.age).toEqual(30);
|
||||
expect(alice!.email).toEqual('alice@test.com');
|
||||
|
||||
// Products collection
|
||||
const products = await db.collection('products').find({}).toArray();
|
||||
expect(products.length).toEqual(2);
|
||||
const widget = products.find(p => p.sku === 'W001');
|
||||
expect(widget).toBeTruthy();
|
||||
expect(widget!.price).toEqual(9.99);
|
||||
|
||||
await client.close();
|
||||
await server.stop();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Migration: migrated data survives another restart
|
||||
// ============================================================================
|
||||
|
||||
tap.test('migration: migrated data persists across restart', async () => {
|
||||
const server = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-mig-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
const client = new MongoClient(server.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
const db = client.db('mydb');
|
||||
|
||||
const users = await db.collection('users').find({}).toArray();
|
||||
expect(users.length).toEqual(3);
|
||||
|
||||
const products = await db.collection('products').find({}).toArray();
|
||||
expect(products.length).toEqual(2);
|
||||
|
||||
await client.close();
|
||||
await server.stop();
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Migration: can write new data after migration
|
||||
// ============================================================================
|
||||
|
||||
tap.test('migration: new writes work after migration', async () => {
|
||||
const server = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-mig-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
const client = new MongoClient(server.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
const db = client.db('mydb');
|
||||
|
||||
// Insert new documents
|
||||
await db.collection('users').insertOne({ name: 'Diana', age: 28 });
|
||||
const count = await db.collection('users').countDocuments();
|
||||
expect(count).toEqual(4);
|
||||
|
||||
// Update existing migrated document
|
||||
await db.collection('users').updateOne(
|
||||
{ name: 'Alice' },
|
||||
{ $set: { age: 31 } }
|
||||
);
|
||||
const alice = await db.collection('users').findOne({ name: 'Alice' });
|
||||
expect(alice!.age).toEqual(31);
|
||||
|
||||
// Delete a migrated document
|
||||
await db.collection('products').deleteOne({ sku: 'G001' });
|
||||
const prodCount = await db.collection('products').countDocuments();
|
||||
expect(prodCount).toEqual(1);
|
||||
|
||||
await client.close();
|
||||
await server.stop();
|
||||
cleanTmpDir(tmpDir);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Migration: skips already-migrated data
|
||||
// ============================================================================
|
||||
|
||||
tap.test('migration: no-op for v1 format', async () => {
|
||||
tmpDir = makeTmpDir();
|
||||
|
||||
// Start fresh to create v1 layout
|
||||
const server = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-mig-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
const client = new MongoClient(server.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
const db = client.db('v1test');
|
||||
await db.collection('items').insertOne({ x: 1 });
|
||||
await client.close();
|
||||
await server.stop();
|
||||
|
||||
// Restart — migration should detect v1 and skip
|
||||
const server2 = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-mig-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server2.start();
|
||||
|
||||
const client2 = new MongoClient(server2.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client2.connect();
|
||||
const db2 = client2.db('v1test');
|
||||
const doc = await db2.collection('items').findOne({ x: 1 });
|
||||
expect(doc).toBeTruthy();
|
||||
|
||||
await client2.close();
|
||||
await server2.stop();
|
||||
cleanTmpDir(tmpDir);
|
||||
});
|
||||
|
||||
// ============================================================================
|
||||
// Migration: empty storage is handled gracefully
|
||||
// ============================================================================
|
||||
|
||||
tap.test('migration: empty storage directory works', async () => {
|
||||
tmpDir = makeTmpDir();
|
||||
|
||||
const server = new smartdb.SmartdbServer({
|
||||
socketPath: path.join(os.tmpdir(), `smartdb-mig-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`),
|
||||
storage: 'file',
|
||||
storagePath: tmpDir,
|
||||
});
|
||||
await server.start();
|
||||
|
||||
const client = new MongoClient(server.getConnectionUri(), {
|
||||
directConnection: true,
|
||||
serverSelectionTimeoutMS: 5000,
|
||||
});
|
||||
await client.connect();
|
||||
|
||||
// Should work fine with empty storage
|
||||
const db = client.db('emptytest');
|
||||
await db.collection('first').insertOne({ hello: 'world' });
|
||||
const doc = await db.collection('first').findOne({ hello: 'world' });
|
||||
expect(doc).toBeTruthy();
|
||||
|
||||
await client.close();
|
||||
await server.stop();
|
||||
cleanTmpDir(tmpDir);
|
||||
});
|
||||
|
||||
export default tap.start();
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: '@push.rocks/smartdb',
|
||||
version: '2.2.0',
|
||||
version: '2.5.3',
|
||||
description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.'
|
||||
}
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import * as crypto from 'crypto';
|
||||
import * as fs from 'fs/promises';
|
||||
import * as net from 'net';
|
||||
import * as path from 'path';
|
||||
import * as os from 'os';
|
||||
import { SmartdbServer } from '../ts_smartdb/index.js';
|
||||
@@ -66,6 +68,55 @@ export class LocalSmartDb {
|
||||
return path.join(os.tmpdir(), `smartdb-${randomId}.sock`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a Unix socket is alive by attempting to connect.
|
||||
*/
|
||||
private static isSocketAlive(socketPath: string): Promise<boolean> {
|
||||
return new Promise((resolve) => {
|
||||
const client = net.createConnection({ path: socketPath }, () => {
|
||||
client.destroy();
|
||||
resolve(true);
|
||||
});
|
||||
client.on('error', () => {
|
||||
resolve(false);
|
||||
});
|
||||
client.setTimeout(500, () => {
|
||||
client.destroy();
|
||||
resolve(false);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove stale smartdb-*.sock files from /tmp.
|
||||
* A socket is considered stale if connecting to it fails.
|
||||
*/
|
||||
private static async cleanStaleSockets(): Promise<void> {
|
||||
const tmpDir = os.tmpdir();
|
||||
let entries: string[];
|
||||
try {
|
||||
entries = await fs.readdir(tmpDir);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
const socketFiles = entries.filter(
|
||||
(f) => f.startsWith('smartdb-') && f.endsWith('.sock')
|
||||
);
|
||||
for (const name of socketFiles) {
|
||||
const fullPath = path.join(tmpDir, name);
|
||||
try {
|
||||
const stat = await fs.stat(fullPath);
|
||||
if (!stat.isSocket()) continue;
|
||||
const alive = await LocalSmartDb.isSocketAlive(fullPath);
|
||||
if (!alive) {
|
||||
await fs.unlink(fullPath);
|
||||
}
|
||||
} catch {
|
||||
// File may have been removed already; ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the local SmartDB server and return connection info
|
||||
*/
|
||||
@@ -74,6 +125,9 @@ export class LocalSmartDb {
|
||||
throw new Error('LocalSmartDb is already running');
|
||||
}
|
||||
|
||||
// Clean up stale sockets from previous crashed instances
|
||||
await LocalSmartDb.cleanStaleSockets();
|
||||
|
||||
// Run storage migration before starting the Rust engine
|
||||
const migrator = new StorageMigrator(this.options.folderPath);
|
||||
await migrator.run();
|
||||
|
||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user