12 Commits

17 changed files with 1234 additions and 158 deletions
+38
View File
@@ -1,5 +1,43 @@
# Changelog # Changelog
## 2026-04-14 - 2.7.0 - feat(update)
add aggregation pipeline updates and enforce immutable _id handling
- support aggregation pipeline syntax in update and findOneAndUpdate operations, including upserts
- add $unset stage support for aggregation-based document transformations
- return an ImmutableField error when updates attempt to change _id and preserve _id when omitted from replacements
## 2026-04-05 - 2.6.2 - fix(readme)
align architecture diagram formatting in the documentation
- Adjusts spacing and box alignment in the README architecture diagram for clearer presentation.
## 2026-04-05 - 2.6.1 - fix(readme)
correct ASCII diagram spacing in architecture overview
- Adjusts alignment in the README architecture diagram for clearer visual formatting.
## 2026-04-05 - 2.6.0 - feat(readme)
document index enforcement, storage reliability, and data integrity validation features
- Add documentation for engine-level unique index enforcement and duplicate key behavior
- Describe storage engine reliability features including WAL, CRC32 checks, compaction, hint file staleness detection, and stale socket cleanup
- Add usage documentation for the offline data integrity validation CLI
## 2026-04-05 - 2.5.9 - fix(rustdb-storage)
run collection compaction during file storage initialization after crashes
- Triggers compaction for all loaded collections before starting the periodic background compaction task.
- Helps clean up dead weight left from before a crash during startup.
## 2026-04-05 - 2.5.8 - fix(rustdb-storage)
detect stale hint files using data file size metadata and add restart persistence regression tests
- Store the current data.rdb size in hint file headers and validate it on load to rebuild KeyDir when hints are stale or written in the old format.
- Persist updated hint metadata after compaction and shutdown to avoid missing appended tombstones after restart.
- Add validation reporting for stale hint files based on recorded versus actual data file size.
- Add regression tests covering delete persistence across restarts, missing hint recovery, stale socket cleanup, and unique index enforcement persistence.
## 2026-04-05 - 2.5.7 - fix(repo) ## 2026-04-05 - 2.5.7 - fix(repo)
no changes to commit no changes to commit
+1 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@push.rocks/smartdb", "name": "@push.rocks/smartdb",
"version": "2.5.7", "version": "2.7.0",
"private": false, "private": false,
"description": "A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.", "description": "A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.",
"exports": { "exports": {
+61 -26
View File
@@ -44,38 +44,38 @@ SmartDB uses a **sidecar binary** pattern — TypeScript handles lifecycle, Rust
``` ```
┌──────────────────────────────────────────────────────────────┐ ┌──────────────────────────────────────────────────────────────┐
│ Your Application │ Your Application │
│ (TypeScript / Node.js) │ (TypeScript / Node.js) │
│ ┌───────────────── ┌───────────────────────────┐ │ ┌──────────────────┐ ┌───────────────────────────┐ │
│ │ SmartdbServer │────▶│ RustDbBridge (IPC) │ │ │ SmartdbServer │────▶│ RustDbBridge (IPC) │ │
│ │ or LocalSmartDb │ │ @push.rocks/smartrust │ │ │ or LocalSmartDb │ │ @push.rocks/smartrust │ │
│ └───────────────── └───────────┬───────────────┘ │ └──────────────────┘ └───────────┬───────────────┘ │
└─────────────────────────────────────────────────────────────┘ └─────────────────────────────────────────────────────────────┘
│ spawn + JSON IPC │ spawn + JSON IPC
┌──────────────────────────────────────────────────────────────┐ ┌──────────────────────────────────────────────────────────────┐
│ rustdb binary 🦀 │ rustdb binary
│ │ │ │
│ ┌──────────────┐ ┌──────────────┐ ┌───────────────┐ │ │ ┌──────────────┐ ┌──────────────┐ ┌───────────────┐
│ │ Wire Protocol│→ │Command Router│→ │ Handlers │ │ │ │ Wire Protocol│→ │Command Router│→ │ Handlers │
│ │ (OP_MSG) │ │ (40+ cmds) │ │ Find,Insert.. │ │ │ │ (OP_MSG) │ │ (40+ cmds) │ │ Find,Insert.. │
│ └──────────────┘ └──────────────┘ └───────┬───────┘ │ │ └──────────────┘ └──────────────┘ └───────┬───────┘
│ │ │ │
│ ┌─────────┐ ┌────────┐ ┌───────────┐ ┌──────┴──────┐ │ │ ┌─────────┐ ┌────────┐ ┌───────────┐ ┌──────┴──────┐
│ │ Query │ │ Update │ │Aggregation│ │ Index │ │ │ │ Query │ │ Update │ │Aggregation│ │ Index │
│ │ Matcher │ │ Engine │ │ Engine │ │ Engine │ │ │ │ Matcher │ │ Engine │ │ Engine │ │ Engine │
│ └─────────┘ └────────┘ └───────────┘ └─────────────┘ │ │ └─────────┘ └────────┘ └───────────┘ └─────────────┘
│ │ │ │
│ ┌──────────────────┐ ┌──────────────────┐ ┌──────────┐ │ │ ┌──────────────────┐ ┌──────────────────┐ ┌──────────┐
│ │ MemoryStorage │ │ FileStorage │ │ OpLog │ │ │ │ MemoryStorage │ │ FileStorage │ │ OpLog │
│ └──────────────────┘ └──────────────────┘ └──────────┘ │ │ └──────────────────┘ └──────────────────┘ └──────────┘
└──────────────────────────────────────────────────────────────┘ └──────────────────────────────────────────────────────────────┘
│ TCP / Unix Socket (wire protocol) │ TCP / Unix Socket (wire protocol)
┌─────────────┴────────────────────────────────────────────────┐ ┌─────────────┴────────────────────────────────────────────────┐
│ MongoClient (mongodb npm driver) │ MongoClient (mongodb npm driver) │
│ Connects directly to Rust binary │ Connects directly to Rust binary │
└──────────────────────────────────────────────────────────────┘ └──────────────────────────────────────────────────────────────┘
``` ```
@@ -159,7 +159,7 @@ The debug dashboard gives you:
## 📝 Operation Log & Point-in-Time Revert ## 📝 Operation Log & Point-in-Time Revert
Every write operation (insert, update, delete) is automatically recorded in an in-memory **operation log (OpLog)** with full before/after document snapshots. This enables: Every write operation (insert, update, delete) is automatically recorded in an in-memory **operation log (OpLog)** with full before/after document snapshots. The OpLog lives in RAM and resets on restart — it covers the current session only. This enables:
- **Change tracking** — see exactly what changed, when, and in which collection - **Change tracking** — see exactly what changed, when, and in which collection
- **Field-level diffs** — compare previous and new document states - **Field-level diffs** — compare previous and new document states
@@ -429,6 +429,8 @@ await collection.dropIndex('email_1');
await collection.dropIndexes(); // drop all except _id await collection.dropIndexes(); // drop all except _id
``` ```
> 🛡️ **Unique indexes are enforced at the engine level.** Duplicate values are rejected with a `DuplicateKey` error (code 11000) *before* the document is written to disk — on `insertOne`, `updateOne`, `findAndModify`, and upserts. Index definitions are persisted to `indexes.json` and automatically restored on restart.
### Database & Admin ### Database & Admin
```typescript ```typescript
@@ -497,6 +499,39 @@ The Rust engine is organized as a Cargo workspace with 8 focused crates:
Cross-compiled for `linux_amd64` and `linux_arm64` via [@git.zone/tsrust](https://www.npmjs.com/package/@git.zone/tsrust). Cross-compiled for `linux_amd64` and `linux_arm64` via [@git.zone/tsrust](https://www.npmjs.com/package/@git.zone/tsrust).
### Storage Engine Reliability 🔒
The Bitcask-style file storage engine includes several reliability features:
- **Write-ahead log (WAL)** — every write is logged before being applied, with crash recovery on restart
- **CRC32 checksums** — every record is integrity-checked on read
- **Automatic compaction** — dead records are reclaimed when they exceed 50% of file size, runs on startup and after every write
- **Hint file staleness detection** — the hint file records the data file size at write time; if data.rdb changed since (e.g. crash after a delete), the engine falls back to a full scan to ensure tombstones are not lost
- **Stale socket cleanup** — orphaned `/tmp/smartdb-*.sock` files from crashed instances are automatically cleaned up on startup
### Data Integrity CLI 🔍
The Rust binary includes an offline integrity checker:
```bash
# Check all collections in a data directory
./dist_rust/rustdb_linux_amd64 --validate-data /path/to/data
# Output:
# === SmartDB Data Integrity Report ===
#
# Database: mydb
# Collection: users
# Header: OK
# Records: 1,234 (1,200 live, 34 tombstones)
# Data size: 2.1 MB
# Duplicates: 0
# CRC errors: 0
# Hint file: OK
```
Checks file headers, record CRC32 checksums, duplicate `_id` entries, and hint file consistency. Exit code 1 if any errors are found.
--- ---
## Testing Example ## Testing Example
@@ -541,7 +576,7 @@ export default tap.start();
## License and Legal Information ## License and Legal Information
This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [LICENSE](./LICENSE) file. This repository contains open-source code licensed under the MIT License. A copy of the license can be found in the [license](./license) file.
**Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file. **Please note:** The MIT License does not grant permission to use the trade names, trademarks, service marks, or product names of the project, except as required for reasonable and customary use in describing the origin of the work and reproducing the content of the NOTICE file.
+4
View File
@@ -27,6 +27,9 @@ pub enum CommandError {
#[error("duplicate key: {0}")] #[error("duplicate key: {0}")]
DuplicateKey(String), DuplicateKey(String),
#[error("immutable field: {0}")]
ImmutableField(String),
#[error("internal error: {0}")] #[error("internal error: {0}")]
InternalError(String), InternalError(String),
} }
@@ -43,6 +46,7 @@ impl CommandError {
CommandError::NamespaceNotFound(_) => (26, "NamespaceNotFound"), CommandError::NamespaceNotFound(_) => (26, "NamespaceNotFound"),
CommandError::NamespaceExists(_) => (48, "NamespaceExists"), CommandError::NamespaceExists(_) => (48, "NamespaceExists"),
CommandError::DuplicateKey(_) => (11000, "DuplicateKey"), CommandError::DuplicateKey(_) => (11000, "DuplicateKey"),
CommandError::ImmutableField(_) => (66, "ImmutableField"),
CommandError::InternalError(_) => (1, "InternalError"), CommandError::InternalError(_) => (1, "InternalError"),
}; };
@@ -21,6 +21,11 @@ pub async fn handle(
} }
} }
enum TUpdateSpec {
Document(Document),
Pipeline(Vec<Document>),
}
/// Handle the `update` command. /// Handle the `update` command.
async fn handle_update( async fn handle_update(
cmd: &Document, cmd: &Document,
@@ -78,21 +83,22 @@ async fn handle_update(
}; };
let update = match update_spec.get("u") { let update = match update_spec.get("u") {
Some(Bson::Document(d)) => d.clone(), Some(update_value) => match parse_update_spec(update_value) {
Some(Bson::Array(_pipeline)) => { Ok(parsed) => parsed,
// Aggregation pipeline updates are not yet supported; treat as error. Err(err) => {
write_errors.push(doc! { write_errors.push(doc! {
"index": idx as i32, "index": idx as i32,
"code": 14_i32, "code": 14_i32,
"codeName": "TypeMismatch", "codeName": "TypeMismatch",
"errmsg": "aggregation pipeline updates not yet supported", "errmsg": err,
}); });
if ordered { if ordered {
break; break;
}
continue;
} }
continue; },
} None => {
_ => {
write_errors.push(doc! { write_errors.push(doc! {
"index": idx as i32, "index": idx as i32,
"code": 14_i32, "code": 14_i32,
@@ -137,21 +143,12 @@ async fn handle_update(
let new_doc = build_upsert_doc(&filter); let new_doc = build_upsert_doc(&filter);
// Apply update operators or replacement. // Apply update operators or replacement.
match UpdateEngine::apply_update(&new_doc, &update, array_filters.as_deref()) { match apply_update_spec(&new_doc, &update, array_filters.as_deref()) {
Ok(mut updated) => { Ok(mut updated) => {
// Apply $setOnInsert if present. apply_set_on_insert_if_present(&update, &mut updated);
if let Some(Bson::Document(soi)) = update.get("$setOnInsert") {
UpdateEngine::apply_set_on_insert(&mut updated, soi);
}
// Ensure _id exists. // Ensure _id exists.
let new_id = if !updated.contains_key("_id") { let new_id = ensure_document_id(&mut updated);
let oid = ObjectId::new();
updated.insert("_id", oid);
Bson::ObjectId(oid)
} else {
updated.get("_id").unwrap().clone()
};
// Pre-check unique index constraints before upsert insert. // Pre-check unique index constraints before upsert insert.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
@@ -229,12 +226,21 @@ async fn handle_update(
}; };
for matched_doc in &docs_to_update { for matched_doc in &docs_to_update {
match UpdateEngine::apply_update( match apply_update_spec(matched_doc, &update, array_filters.as_deref()) {
matched_doc, Ok(mut updated_doc) => {
&update, if let Err(e) = ensure_immutable_id(matched_doc, &mut updated_doc) {
array_filters.as_deref(), write_errors.push(doc! {
) { "index": idx as i32,
Ok(updated_doc) => { "code": 66_i32,
"codeName": "ImmutableField",
"errmsg": e.to_string(),
});
if ordered {
break;
}
continue;
}
// Pre-check unique index constraints before storage write. // Pre-check unique index constraints before storage write.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) { if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) {
@@ -361,8 +367,11 @@ async fn handle_find_and_modify(
}; };
let update_doc = match cmd.get("update") { let update_doc = match cmd.get("update") {
Some(Bson::Document(d)) => Some(d.clone()), Some(update_value) => Some(
_ => None, parse_update_spec(update_value)
.map_err(CommandError::InvalidArgument)?
),
None => None,
}; };
let remove = match cmd.get("remove") { let remove = match cmd.get("remove") {
@@ -477,12 +486,14 @@ async fn handle_find_and_modify(
if let Some(original_doc) = target { if let Some(original_doc) = target {
// Update the matched document. // Update the matched document.
let updated_doc = UpdateEngine::apply_update( let mut updated_doc = apply_update_spec(
&original_doc, &original_doc,
&update, &update,
array_filters.as_deref(), array_filters.as_deref(),
) )
.map_err(|e| CommandError::InternalError(e.to_string()))?; .map_err(CommandError::InternalError)?;
ensure_immutable_id(&original_doc, &mut updated_doc)?;
// Pre-check unique index constraints before storage write. // Pre-check unique index constraints before storage write.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
@@ -533,26 +544,17 @@ async fn handle_find_and_modify(
// Upsert: create a new document. // Upsert: create a new document.
let new_doc = build_upsert_doc(&query); let new_doc = build_upsert_doc(&query);
let mut updated_doc = UpdateEngine::apply_update( let mut updated_doc = apply_update_spec(
&new_doc, &new_doc,
&update, &update,
array_filters.as_deref(), array_filters.as_deref(),
) )
.map_err(|e| CommandError::InternalError(e.to_string()))?; .map_err(CommandError::InternalError)?;
// Apply $setOnInsert if present. apply_set_on_insert_if_present(&update, &mut updated_doc);
if let Some(Bson::Document(soi)) = update.get("$setOnInsert") {
UpdateEngine::apply_set_on_insert(&mut updated_doc, soi);
}
// Ensure _id. // Ensure _id.
let upserted_id = if !updated_doc.contains_key("_id") { let upserted_id = ensure_document_id(&mut updated_doc);
let oid = ObjectId::new();
updated_doc.insert("_id", oid);
Bson::ObjectId(oid)
} else {
updated_doc.get("_id").unwrap().clone()
};
// Pre-check unique index constraints before upsert insert. // Pre-check unique index constraints before upsert insert.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
@@ -667,6 +669,88 @@ fn build_upsert_doc(filter: &Document) -> Document {
doc doc
} }
fn parse_update_spec(update_value: &Bson) -> Result<TUpdateSpec, String> {
match update_value {
Bson::Document(d) => Ok(TUpdateSpec::Document(d.clone())),
Bson::Array(stages) => {
if stages.is_empty() {
return Err("aggregation pipeline update cannot be empty".into());
}
let mut pipeline = Vec::with_capacity(stages.len());
for stage in stages {
match stage {
Bson::Document(d) => pipeline.push(d.clone()),
_ => {
return Err(
"aggregation pipeline update stages must be documents".into(),
);
}
}
}
Ok(TUpdateSpec::Pipeline(pipeline))
}
_ => Err("missing or invalid 'u' field in update spec".into()),
}
}
fn apply_update_spec(
doc: &Document,
update: &TUpdateSpec,
array_filters: Option<&[Document]>,
) -> Result<Document, String> {
match update {
TUpdateSpec::Document(update_doc) => UpdateEngine::apply_update(doc, update_doc, array_filters)
.map_err(|e| e.to_string()),
TUpdateSpec::Pipeline(pipeline) => {
if array_filters.is_some_and(|filters| !filters.is_empty()) {
return Err(
"arrayFilters are not supported with aggregation pipeline updates"
.into(),
);
}
UpdateEngine::apply_pipeline_update(doc, pipeline).map_err(|e| e.to_string())
}
}
}
fn apply_set_on_insert_if_present(update: &TUpdateSpec, doc: &mut Document) {
if let TUpdateSpec::Document(update_doc) = update {
if let Some(Bson::Document(soi)) = update_doc.get("$setOnInsert") {
UpdateEngine::apply_set_on_insert(doc, soi);
}
}
}
fn ensure_document_id(doc: &mut Document) -> Bson {
if let Some(id) = doc.get("_id") {
id.clone()
} else {
let oid = ObjectId::new();
doc.insert("_id", oid);
Bson::ObjectId(oid)
}
}
fn ensure_immutable_id(original_doc: &Document, updated_doc: &mut Document) -> CommandResult<()> {
if let Some(original_id) = original_doc.get("_id") {
match updated_doc.get("_id") {
Some(updated_id) if updated_id == original_id => Ok(()),
Some(_) => Err(CommandError::ImmutableField(
"cannot modify immutable field '_id'".into(),
)),
None => {
updated_doc.insert("_id", original_id.clone());
Ok(())
}
}
} else {
Ok(())
}
}
/// Extract _id as a string for storage operations. /// Extract _id as a string for storage operations.
fn extract_id_string(doc: &Document) -> String { fn extract_id_string(doc: &Document) -> String {
match doc.get("_id") { match doc.get("_id") {
+134 -29
View File
@@ -2,10 +2,10 @@ use bson::{Bson, Document};
use std::collections::HashMap; use std::collections::HashMap;
use crate::error::QueryError; use crate::error::QueryError;
use crate::field_path::{get_nested_value, remove_nested_value};
use crate::matcher::QueryMatcher; use crate::matcher::QueryMatcher;
use crate::sort::sort_documents;
use crate::projection::apply_projection; use crate::projection::apply_projection;
use crate::field_path::get_nested_value; use crate::sort::sort_documents;
/// Aggregation pipeline engine. /// Aggregation pipeline engine.
pub struct AggregationEngine; pub struct AggregationEngine;
@@ -42,6 +42,7 @@ impl AggregationEngine {
"$count" => Self::stage_count(current, stage_spec)?, "$count" => Self::stage_count(current, stage_spec)?,
"$addFields" | "$set" => Self::stage_add_fields(current, stage_spec)?, "$addFields" | "$set" => Self::stage_add_fields(current, stage_spec)?,
"$replaceRoot" | "$replaceWith" => Self::stage_replace_root(current, stage_spec)?, "$replaceRoot" | "$replaceWith" => Self::stage_replace_root(current, stage_spec)?,
"$unset" => Self::stage_unset(current, stage_spec)?,
"$lookup" => Self::stage_lookup(current, stage_spec, resolver, db)?, "$lookup" => Self::stage_lookup(current, stage_spec, resolver, db)?,
"$facet" => Self::stage_facet(current, stage_spec, resolver, db)?, "$facet" => Self::stage_facet(current, stage_spec, resolver, db)?,
"$unionWith" => Self::stage_union_with(current, stage_spec, resolver, db)?, "$unionWith" => Self::stage_union_with(current, stage_spec, resolver, db)?,
@@ -60,7 +61,11 @@ impl AggregationEngine {
fn stage_match(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_match(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let filter = match spec { let filter = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$match requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$match requires a document".into(),
))
}
}; };
Ok(QueryMatcher::filter(&docs, filter)) Ok(QueryMatcher::filter(&docs, filter))
} }
@@ -68,15 +73,26 @@ impl AggregationEngine {
fn stage_project(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_project(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let projection = match spec { let projection = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$project requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$project requires a document".into(),
))
}
}; };
Ok(docs.into_iter().map(|doc| apply_projection(&doc, projection)).collect()) Ok(docs
.into_iter()
.map(|doc| apply_projection(&doc, projection))
.collect())
} }
fn stage_sort(mut docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_sort(mut docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let sort_spec = match spec { let sort_spec = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$sort requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$sort requires a document".into(),
))
}
}; };
sort_documents(&mut docs, sort_spec); sort_documents(&mut docs, sort_spec);
Ok(docs) Ok(docs)
@@ -97,7 +113,11 @@ impl AggregationEngine {
fn stage_group(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_group(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let group_spec = match spec { let group_spec = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$group requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$group requires a document".into(),
))
}
}; };
let id_expr = group_spec.get("_id").cloned().unwrap_or(Bson::Null); let id_expr = group_spec.get("_id").cloned().unwrap_or(Bson::Null);
@@ -158,13 +178,18 @@ impl AggregationEngine {
let (path, preserve_null) = match spec { let (path, preserve_null) = match spec {
Bson::String(s) => (s.trim_start_matches('$').to_string(), false), Bson::String(s) => (s.trim_start_matches('$').to_string(), false),
Bson::Document(d) => { Bson::Document(d) => {
let path = d.get_str("path") let path = d
.get_str("path")
.map(|s| s.trim_start_matches('$').to_string()) .map(|s| s.trim_start_matches('$').to_string())
.map_err(|_| QueryError::AggregationError("$unwind requires 'path'".into()))?; .map_err(|_| QueryError::AggregationError("$unwind requires 'path'".into()))?;
let preserve = d.get_bool("preserveNullAndEmptyArrays").unwrap_or(false); let preserve = d.get_bool("preserveNullAndEmptyArrays").unwrap_or(false);
(path, preserve) (path, preserve)
} }
_ => return Err(QueryError::AggregationError("$unwind requires a string or document".into())), _ => {
return Err(QueryError::AggregationError(
"$unwind requires a string or document".into(),
))
}
}; };
let mut result = Vec::new(); let mut result = Vec::new();
@@ -206,7 +231,11 @@ impl AggregationEngine {
fn stage_count(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_count(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let field = match spec { let field = match spec {
Bson::String(s) => s.clone(), Bson::String(s) => s.clone(),
_ => return Err(QueryError::AggregationError("$count requires a string".into())), _ => {
return Err(QueryError::AggregationError(
"$count requires a string".into(),
))
}
}; };
Ok(vec![bson::doc! { field: docs.len() as i64 }]) Ok(vec![bson::doc! { field: docs.len() as i64 }])
} }
@@ -214,7 +243,11 @@ impl AggregationEngine {
fn stage_add_fields(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_add_fields(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let fields = match spec { let fields = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$addFields requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$addFields requires a document".into(),
))
}
}; };
Ok(docs Ok(docs
@@ -231,9 +264,16 @@ impl AggregationEngine {
fn stage_replace_root(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_replace_root(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let new_root_expr = match spec { let new_root_expr = match spec {
Bson::Document(d) => d.get("newRoot").cloned().unwrap_or(Bson::Document(d.clone())), Bson::Document(d) => d
.get("newRoot")
.cloned()
.unwrap_or(Bson::Document(d.clone())),
Bson::String(s) => Bson::String(s.clone()), Bson::String(s) => Bson::String(s.clone()),
_ => return Err(QueryError::AggregationError("$replaceRoot requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$replaceRoot requires a document".into(),
))
}
}; };
let mut result = Vec::new(); let mut result = Vec::new();
@@ -246,6 +286,40 @@ impl AggregationEngine {
Ok(result) Ok(result)
} }
fn stage_unset(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let fields: Vec<String> = match spec {
Bson::String(s) => vec![s.clone()],
Bson::Array(arr) => arr
.iter()
.map(|value| match value {
Bson::String(field) => Ok(field.clone()),
_ => Err(QueryError::AggregationError(
"$unset array entries must be strings".into(),
)),
})
.collect::<Result<Vec<_>, _>>()?,
_ => {
return Err(QueryError::AggregationError(
"$unset requires a string or array of strings".into(),
));
}
};
Ok(docs
.into_iter()
.map(|mut doc| {
for field in &fields {
if field.contains('.') {
remove_nested_value(&mut doc, field);
} else {
doc.remove(field);
}
}
doc
})
.collect())
}
fn stage_lookup( fn stage_lookup(
docs: Vec<Document>, docs: Vec<Document>,
spec: &Bson, spec: &Bson,
@@ -254,20 +328,29 @@ impl AggregationEngine {
) -> Result<Vec<Document>, QueryError> { ) -> Result<Vec<Document>, QueryError> {
let lookup = match spec { let lookup = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$lookup requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$lookup requires a document".into(),
))
}
}; };
let from = lookup.get_str("from") let from = lookup
.get_str("from")
.map_err(|_| QueryError::AggregationError("$lookup requires 'from'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'from'".into()))?;
let local_field = lookup.get_str("localField") let local_field = lookup
.get_str("localField")
.map_err(|_| QueryError::AggregationError("$lookup requires 'localField'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'localField'".into()))?;
let foreign_field = lookup.get_str("foreignField") let foreign_field = lookup
.get_str("foreignField")
.map_err(|_| QueryError::AggregationError("$lookup requires 'foreignField'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'foreignField'".into()))?;
let as_field = lookup.get_str("as") let as_field = lookup
.get_str("as")
.map_err(|_| QueryError::AggregationError("$lookup requires 'as'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'as'".into()))?;
let resolver = resolver let resolver = resolver.ok_or_else(|| {
.ok_or_else(|| QueryError::AggregationError("$lookup requires a collection resolver".into()))?; QueryError::AggregationError("$lookup requires a collection resolver".into())
})?;
let foreign_docs = resolver.resolve(db, from)?; let foreign_docs = resolver.resolve(db, from)?;
Ok(docs Ok(docs
@@ -299,7 +382,11 @@ impl AggregationEngine {
) -> Result<Vec<Document>, QueryError> { ) -> Result<Vec<Document>, QueryError> {
let facets = match spec { let facets = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$facet requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$facet requires a document".into(),
))
}
}; };
let mut result = Document::new(); let mut result = Document::new();
@@ -337,22 +424,32 @@ impl AggregationEngine {
let (coll, pipeline) = match spec { let (coll, pipeline) = match spec {
Bson::String(s) => (s.as_str(), None), Bson::String(s) => (s.as_str(), None),
Bson::Document(d) => { Bson::Document(d) => {
let coll = d.get_str("coll") let coll = d.get_str("coll").map_err(|_| {
.map_err(|_| QueryError::AggregationError("$unionWith requires 'coll'".into()))?; QueryError::AggregationError("$unionWith requires 'coll'".into())
})?;
let pipeline = d.get_array("pipeline").ok().map(|arr| { let pipeline = d.get_array("pipeline").ok().map(|arr| {
arr.iter() arr.iter()
.filter_map(|s| { .filter_map(|s| {
if let Bson::Document(d) = s { Some(d.clone()) } else { None } if let Bson::Document(d) = s {
Some(d.clone())
} else {
None
}
}) })
.collect::<Vec<Document>>() .collect::<Vec<Document>>()
}); });
(coll, pipeline) (coll, pipeline)
} }
_ => return Err(QueryError::AggregationError("$unionWith requires a string or document".into())), _ => {
return Err(QueryError::AggregationError(
"$unionWith requires a string or document".into(),
))
}
}; };
let resolver = resolver let resolver = resolver.ok_or_else(|| {
.ok_or_else(|| QueryError::AggregationError("$unionWith requires a collection resolver".into()))?; QueryError::AggregationError("$unionWith requires a collection resolver".into())
})?;
let mut other_docs = resolver.resolve(db, coll)?; let mut other_docs = resolver.resolve(db, coll)?;
if let Some(p) = pipeline { if let Some(p) = pipeline {
@@ -476,7 +573,11 @@ fn accumulate_min(docs: &[Document], expr: &Bson) -> Bson {
None => val, None => val,
Some(current) => { Some(current) => {
if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) { if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) {
if vv < cv { val } else { current } if vv < cv {
val
} else {
current
}
} else { } else {
current current
} }
@@ -499,7 +600,11 @@ fn accumulate_max(docs: &[Document], expr: &Bson) -> Bson {
None => val, None => val,
Some(current) => { Some(current) => {
if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) { if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) {
if vv > cv { val } else { current } if vv > cv {
val
} else {
current
}
} else { } else {
current current
} }
+107 -21
View File
@@ -1,7 +1,8 @@
use bson::{Bson, Document, doc}; use bson::{doc, Bson, Document};
use crate::aggregation::AggregationEngine;
use crate::error::QueryError; use crate::error::QueryError;
use crate::field_path::{get_nested_value, set_nested_value, remove_nested_value}; use crate::field_path::{get_nested_value, remove_nested_value, set_nested_value};
use crate::matcher::QueryMatcher; use crate::matcher::QueryMatcher;
/// Update engine — applies update operators to documents. /// Update engine — applies update operators to documents.
@@ -56,6 +57,46 @@ impl UpdateEngine {
Ok(result) Ok(result)
} }
/// Apply an aggregation pipeline update specification to a document.
pub fn apply_pipeline_update(
doc: &Document,
pipeline: &[Document],
) -> Result<Document, QueryError> {
if pipeline.is_empty() {
return Err(QueryError::InvalidUpdate(
"aggregation pipeline update cannot be empty".into(),
));
}
for stage in pipeline {
let (stage_name, _) = stage.iter().next().ok_or_else(|| {
QueryError::InvalidUpdate(
"aggregation pipeline update stages must not be empty".into(),
)
})?;
if !matches!(
stage_name.as_str(),
"$addFields" | "$set" | "$project" | "$unset" | "$replaceRoot" | "$replaceWith"
) {
return Err(QueryError::InvalidUpdate(format!(
"Unsupported aggregation pipeline update stage: {}",
stage_name
)));
}
}
let mut results = AggregationEngine::aggregate(vec![doc.clone()], pipeline, None, "")
.map_err(|e| QueryError::InvalidUpdate(e.to_string()))?;
match results.len() {
1 => Ok(results.remove(0)),
_ => Err(QueryError::InvalidUpdate(
"aggregation pipeline update must produce exactly one document".into(),
)),
}
}
/// Apply $setOnInsert fields (used during upsert only). /// Apply $setOnInsert fields (used during upsert only).
pub fn apply_set_on_insert(doc: &mut Document, fields: &Document) { pub fn apply_set_on_insert(doc: &mut Document, fields: &Document) {
for (key, value) in fields { for (key, value) in fields {
@@ -252,16 +293,14 @@ impl UpdateEngine {
for (key, spec) in fields { for (key, spec) in fields {
let value = match spec { let value = match spec {
Bson::Boolean(true) => Bson::DateTime(now), Bson::Boolean(true) => Bson::DateTime(now),
Bson::Document(d) => { Bson::Document(d) => match d.get_str("$type").unwrap_or("date") {
match d.get_str("$type").unwrap_or("date") { "date" => Bson::DateTime(now),
"date" => Bson::DateTime(now), "timestamp" => Bson::Timestamp(bson::Timestamp {
"timestamp" => Bson::Timestamp(bson::Timestamp { time: (now.timestamp_millis() / 1000) as u32,
time: (now.timestamp_millis() / 1000) as u32, increment: 0,
increment: 0, }),
}), _ => Bson::DateTime(now),
_ => Bson::DateTime(now), },
}
}
_ => continue, _ => continue,
}; };
@@ -282,7 +321,9 @@ impl UpdateEngine {
Bson::Document(d) if d.contains_key("$each") => { Bson::Document(d) if d.contains_key("$each") => {
let each = match d.get("$each") { let each = match d.get("$each") {
Some(Bson::Array(a)) => a.clone(), Some(Bson::Array(a)) => a.clone(),
_ => return Err(QueryError::InvalidUpdate("$each must be an array".into())), _ => {
return Err(QueryError::InvalidUpdate("$each must be an array".into()))
}
}; };
let position = d.get("$position").and_then(|v| match v { let position = d.get("$position").and_then(|v| match v {
@@ -325,11 +366,21 @@ impl UpdateEngine {
continue; continue;
} }
match direction { match direction {
Bson::Int32(-1) | Bson::Int64(-1) => { arr.remove(0); } Bson::Int32(-1) | Bson::Int64(-1) => {
Bson::Int32(1) | Bson::Int64(1) => { arr.pop(); } arr.remove(0);
Bson::Double(f) if *f == 1.0 => { arr.pop(); } }
Bson::Double(f) if *f == -1.0 => { arr.remove(0); } Bson::Int32(1) | Bson::Int64(1) => {
_ => { arr.pop(); } arr.pop();
}
Bson::Double(f) if *f == 1.0 => {
arr.pop();
}
Bson::Double(f) if *f == -1.0 => {
arr.remove(0);
}
_ => {
arr.pop();
}
} }
} }
} }
@@ -455,7 +506,11 @@ impl UpdateEngine {
let ascending = *dir > 0; let ascending = *dir > 0;
arr.sort_by(|a, b| { arr.sort_by(|a, b| {
let ord = partial_cmp_bson(a, b); let ord = partial_cmp_bson(a, b);
if ascending { ord } else { ord.reverse() } if ascending {
ord
} else {
ord.reverse()
}
}); });
} }
Bson::Document(spec) => { Bson::Document(spec) => {
@@ -465,8 +520,16 @@ impl UpdateEngine {
Bson::Int32(n) => *n > 0, Bson::Int32(n) => *n > 0,
_ => true, _ => true,
}; };
let a_val = if let Bson::Document(d) = a { d.get(field) } else { None }; let a_val = if let Bson::Document(d) = a {
let b_val = if let Bson::Document(d) = b { d.get(field) } else { None }; d.get(field)
} else {
None
};
let b_val = if let Bson::Document(d) = b {
d.get(field)
} else {
None
};
let ord = match (a_val, b_val) { let ord = match (a_val, b_val) {
(Some(av), Some(bv)) => partial_cmp_bson(av, bv), (Some(av), Some(bv)) => partial_cmp_bson(av, bv),
(Some(_), None) => std::cmp::Ordering::Greater, (Some(_), None) => std::cmp::Ordering::Greater,
@@ -572,4 +635,27 @@ mod tests {
let tags = result.get_array("tags").unwrap(); let tags = result.get_array("tags").unwrap();
assert_eq!(tags.len(), 2); // no duplicate assert_eq!(tags.len(), 2); // no duplicate
} }
#[test]
fn test_pipeline_update() {
let doc = doc! { "_id": 1, "name": "Alice", "age": 30, "legacy": true };
let pipeline = vec![
doc! { "$set": { "displayName": "$name", "status": "updated" } },
doc! { "$unset": ["legacy"] },
];
let result = UpdateEngine::apply_pipeline_update(&doc, &pipeline).unwrap();
assert_eq!(result.get_str("displayName").unwrap(), "Alice");
assert_eq!(result.get_str("status").unwrap(), "updated");
assert!(result.get("legacy").is_none());
}
#[test]
fn test_pipeline_update_rejects_unsupported_stage() {
let doc = doc! { "_id": 1, "name": "Alice" };
let pipeline = vec![doc! { "$match": { "name": "Alice" } }];
let result = UpdateEngine::apply_pipeline_update(&doc, &pipeline);
assert!(matches!(result, Err(QueryError::InvalidUpdate(_))));
}
} }
+51 -21
View File
@@ -178,7 +178,8 @@ impl CollectionState {
tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir); tracing::warn!("compaction failed for {:?}: {e}", self.coll_dir);
} else { } else {
// Persist hint file after successful compaction to prevent stale hints // Persist hint file after successful compaction to prevent stale hints
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path()) { let current_size = self.data_file_size.load(Ordering::Relaxed);
if let Err(e) = self.keydir.persist_to_hint_file(&self.hint_path(), current_size) {
tracing::warn!("failed to persist hint after compaction for {:?}: {e}", self.coll_dir); tracing::warn!("failed to persist hint after compaction for {:?}: {e}", self.coll_dir);
} }
} }
@@ -257,26 +258,47 @@ impl FileStorageAdapter {
// Try loading from hint file first, fall back to data file scan // Try loading from hint file first, fall back to data file scan
let (keydir, dead_bytes, loaded_from_hint) = if hint_path.exists() && data_path.exists() { let (keydir, dead_bytes, loaded_from_hint) = if hint_path.exists() && data_path.exists() {
match KeyDir::load_from_hint_file(&hint_path) { match KeyDir::load_from_hint_file(&hint_path) {
Ok(Some(kd)) => { Ok(Some((kd, stored_size))) => {
// Validate hint against actual data file let actual_size = std::fs::metadata(&data_path)
let hint_valid = kd.validate_against_data_file(&data_path, 16) .map(|m| m.len())
.unwrap_or(false); .unwrap_or(0);
if hint_valid {
debug!("loaded KeyDir from hint file: {:?}", hint_path); // Check if data.rdb changed since the hint was written.
let file_size = std::fs::metadata(&data_path) // If stored_size is 0, this is an old-format hint without size tracking.
.map(|m| m.len()) let size_matches = stored_size > 0 && stored_size == actual_size;
.unwrap_or(FILE_HEADER_SIZE as u64);
let live_bytes: u64 = { if !size_matches {
let mut total = 0u64; // data.rdb size differs from hint snapshot — records were appended
kd.for_each(|_, e| total += e.record_len as u64); // (inserts, tombstones) after the hint was written. Full scan required
total // to pick up tombstones that would otherwise be invisible.
}; if stored_size == 0 {
let dead = file_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes); debug!("hint file {:?} has no size tracking, rebuilding from data file", hint_path);
(kd, dead, true) } else {
} else { tracing::warn!(
tracing::warn!("hint file {:?} is stale, rebuilding from data file", hint_path); "hint file {:?} is stale: data size changed ({} -> {}), rebuilding",
hint_path, stored_size, actual_size
);
}
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?; let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
(kd, dead, false) (kd, dead, false)
} else {
// Size matches — validate entry integrity with spot-checks
let hint_valid = kd.validate_against_data_file(&data_path, 16)
.unwrap_or(false);
if hint_valid {
debug!("loaded KeyDir from hint file: {:?}", hint_path);
let live_bytes: u64 = {
let mut total = 0u64;
kd.for_each(|_, e| total += e.record_len as u64);
total
};
let dead = actual_size.saturating_sub(FILE_HEADER_SIZE as u64).saturating_sub(live_bytes);
(kd, dead, true)
} else {
tracing::warn!("hint file {:?} failed validation, rebuilding from data file", hint_path);
let (kd, dead, _stats) = KeyDir::build_from_data_file(&data_path)?;
(kd, dead, false)
}
} }
} }
_ => { _ => {
@@ -482,6 +504,13 @@ impl StorageAdapter for FileStorageAdapter {
"FileStorageAdapter initialization complete" "FileStorageAdapter initialization complete"
); );
// Run compaction on all collections that need it (dead weight from before crash)
for entry in self.collections.iter() {
let state = entry.value();
let _guard = state.write_lock.lock().unwrap();
state.try_compact();
}
// Start periodic compaction task (runs every 24 hours) // Start periodic compaction task (runs every 24 hours)
{ {
let collections = self.collections.clone(); let collections = self.collections.clone();
@@ -510,10 +539,11 @@ impl StorageAdapter for FileStorageAdapter {
handle.abort(); handle.abort();
} }
// Persist all KeyDir hint files // Persist all KeyDir hint files with current data file sizes
for entry in self.collections.iter() { for entry in self.collections.iter() {
let state = entry.value(); let state = entry.value();
let _ = state.keydir.persist_to_hint_file(&state.hint_path()); let current_size = state.data_file_size.load(Ordering::Relaxed);
let _ = state.keydir.persist_to_hint_file(&state.hint_path(), current_size);
} }
debug!("FileStorageAdapter closed"); debug!("FileStorageAdapter closed");
Ok(()) Ok(())
+14 -7
View File
@@ -198,14 +198,17 @@ impl KeyDir {
/// Persist the KeyDir to a hint file for fast restart. /// Persist the KeyDir to a hint file for fast restart.
/// ///
/// `data_file_size` is the current size of data.rdb — stored in the hint header
/// so that on next load we can detect if data.rdb changed (stale hint).
///
/// Hint file format (after the 64-byte file header): /// Hint file format (after the 64-byte file header):
/// For each entry: [key_len:u32 LE][key bytes][offset:u64 LE][record_len:u32 LE][value_len:u32 LE][timestamp:u64 LE] /// For each entry: [key_len:u32 LE][key bytes][offset:u64 LE][record_len:u32 LE][value_len:u32 LE][timestamp:u64 LE]
pub fn persist_to_hint_file(&self, path: &Path) -> StorageResult<()> { pub fn persist_to_hint_file(&self, path: &Path, data_file_size: u64) -> StorageResult<()> {
let file = std::fs::File::create(path)?; let file = std::fs::File::create(path)?;
let mut writer = BufWriter::new(file); let mut writer = BufWriter::new(file);
// Write file header // Write file header with data_file_size for staleness detection
let hdr = FileHeader::new(FileType::Hint); let hdr = FileHeader::new_hint(data_file_size);
writer.write_all(&hdr.encode())?; writer.write_all(&hdr.encode())?;
// Write entries // Write entries
@@ -225,7 +228,9 @@ impl KeyDir {
} }
/// Load a KeyDir from a hint file. Returns None if the file doesn't exist. /// Load a KeyDir from a hint file. Returns None if the file doesn't exist.
pub fn load_from_hint_file(path: &Path) -> StorageResult<Option<Self>> { /// Returns `(keydir, stored_data_file_size)` where `stored_data_file_size` is the
/// data.rdb size recorded when the hint was written (0 = old format, unknown).
pub fn load_from_hint_file(path: &Path) -> StorageResult<Option<(Self, u64)>> {
if !path.exists() { if !path.exists() {
return Ok(None); return Ok(None);
} }
@@ -254,6 +259,7 @@ impl KeyDir {
))); )));
} }
let stored_data_file_size = hdr.data_file_size;
let keydir = KeyDir::new(); let keydir = KeyDir::new();
loop { loop {
@@ -292,7 +298,7 @@ impl KeyDir {
); );
} }
Ok(Some(keydir)) Ok(Some((keydir, stored_data_file_size)))
} }
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
@@ -517,9 +523,10 @@ mod tests {
}, },
); );
kd.persist_to_hint_file(&hint_path).unwrap(); kd.persist_to_hint_file(&hint_path, 12345).unwrap();
let loaded = KeyDir::load_from_hint_file(&hint_path).unwrap().unwrap(); let (loaded, stored_size) = KeyDir::load_from_hint_file(&hint_path).unwrap().unwrap();
assert_eq!(stored_size, 12345);
assert_eq!(loaded.len(), 2); assert_eq!(loaded.len(), 2);
let e1 = loaded.get("doc1").unwrap(); let e1 = loaded.get("doc1").unwrap();
assert_eq!(e1.offset, 64); assert_eq!(e1.offset, 64);
+21 -1
View File
@@ -79,6 +79,9 @@ pub struct FileHeader {
pub file_type: FileType, pub file_type: FileType,
pub flags: u32, pub flags: u32,
pub created_ms: u64, pub created_ms: u64,
/// For hint files: the data.rdb file size at the time the hint was written.
/// Used to detect stale hints after ungraceful shutdown. 0 = unknown (old format).
pub data_file_size: u64,
} }
impl FileHeader { impl FileHeader {
@@ -89,6 +92,18 @@ impl FileHeader {
file_type, file_type,
flags: 0, flags: 0,
created_ms: now_ms(), created_ms: now_ms(),
data_file_size: 0,
}
}
/// Create a new hint header that records the data file size.
pub fn new_hint(data_file_size: u64) -> Self {
Self {
version: FORMAT_VERSION,
file_type: FileType::Hint,
flags: 0,
created_ms: now_ms(),
data_file_size,
} }
} }
@@ -100,7 +115,8 @@ impl FileHeader {
buf[10] = self.file_type as u8; buf[10] = self.file_type as u8;
buf[11..15].copy_from_slice(&self.flags.to_le_bytes()); buf[11..15].copy_from_slice(&self.flags.to_le_bytes());
buf[15..23].copy_from_slice(&self.created_ms.to_le_bytes()); buf[15..23].copy_from_slice(&self.created_ms.to_le_bytes());
// bytes 23..64 are reserved (zeros) buf[23..31].copy_from_slice(&self.data_file_size.to_le_bytes());
// bytes 31..64 are reserved (zeros)
buf buf
} }
@@ -127,11 +143,15 @@ impl FileHeader {
let created_ms = u64::from_le_bytes([ let created_ms = u64::from_le_bytes([
buf[15], buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22], buf[15], buf[16], buf[17], buf[18], buf[19], buf[20], buf[21], buf[22],
]); ]);
let data_file_size = u64::from_le_bytes([
buf[23], buf[24], buf[25], buf[26], buf[27], buf[28], buf[29], buf[30],
]);
Ok(Self { Ok(Self {
version, version,
file_type, file_type,
flags, flags,
created_ms, created_ms,
data_file_size,
}) })
} }
} }
+7 -1
View File
@@ -295,7 +295,13 @@ fn validate_collection(db: &str, coll: &str, coll_dir: &Path) -> CollectionRepor
// Validate hint file if present // Validate hint file if present
if hint_path.exists() { if hint_path.exists() {
match KeyDir::load_from_hint_file(&hint_path) { match KeyDir::load_from_hint_file(&hint_path) {
Ok(Some(hint_kd)) => { Ok(Some((hint_kd, stored_size))) => {
if stored_size > 0 && stored_size != report.data_file_size {
report.errors.push(format!(
"hint file is stale: recorded data size {} but actual is {}",
stored_size, report.data_file_size
));
}
// Check for orphaned entries: keys in hint but not live in data // Check for orphaned entries: keys in hint but not live in data
hint_kd.for_each(|key, _entry| { hint_kd.for_each(|key, _entry| {
if !live_ids.contains(key) { if !live_ids.contains(key) {
+191
View File
@@ -0,0 +1,191 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import { MongoClient, Db } from 'mongodb';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Deletes persist across restart (tombstone + hint staleness detection)
// Covers: append_tombstone to data.rdb, hint file data_file_size tracking,
// stale hint detection on restart
// ---------------------------------------------------------------------------
let tmpDir: string;
let localDb: smartdb.LocalSmartDb;
let client: MongoClient;
let db: Db;
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-delete-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Setup
// ============================================================================
tap.test('setup: start local db and insert documents', async () => {
tmpDir = makeTmpDir();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
const coll = db.collection('items');
await coll.insertMany([
{ name: 'keep-1', value: 100 },
{ name: 'keep-2', value: 200 },
{ name: 'delete-me', value: 999 },
{ name: 'keep-3', value: 300 },
]);
const count = await coll.countDocuments();
expect(count).toEqual(4);
});
// ============================================================================
// Delete and verify
// ============================================================================
tap.test('delete-persistence: delete a document', async () => {
const coll = db.collection('items');
const result = await coll.deleteOne({ name: 'delete-me' });
expect(result.deletedCount).toEqual(1);
const remaining = await coll.countDocuments();
expect(remaining).toEqual(3);
const deleted = await coll.findOne({ name: 'delete-me' });
expect(deleted).toBeNull();
});
// ============================================================================
// Graceful restart: delete survives
// ============================================================================
tap.test('delete-persistence: graceful stop and restart', async () => {
await client.close();
await localDb.stop(); // graceful — writes hint file
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
});
tap.test('delete-persistence: deleted doc stays deleted after graceful restart', async () => {
const coll = db.collection('items');
const count = await coll.countDocuments();
expect(count).toEqual(3);
const deleted = await coll.findOne({ name: 'delete-me' });
expect(deleted).toBeNull();
// The remaining docs are intact
const keep1 = await coll.findOne({ name: 'keep-1' });
expect(keep1).toBeTruthy();
expect(keep1!.value).toEqual(100);
});
// ============================================================================
// Simulate ungraceful restart: delete after hint write, then restart
// The hint file data_file_size check should detect the stale hint
// ============================================================================
tap.test('delete-persistence: insert and delete more docs, then restart', async () => {
const coll = db.collection('items');
// Insert a new doc
await coll.insertOne({ name: 'temporary', value: 777 });
expect(await coll.countDocuments()).toEqual(4);
// Delete it
await coll.deleteOne({ name: 'temporary' });
expect(await coll.countDocuments()).toEqual(3);
const gone = await coll.findOne({ name: 'temporary' });
expect(gone).toBeNull();
});
tap.test('delete-persistence: stop and restart again', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
});
tap.test('delete-persistence: all deletes survived second restart', async () => {
const coll = db.collection('items');
const count = await coll.countDocuments();
expect(count).toEqual(3);
// Both deletes are permanent
expect(await coll.findOne({ name: 'delete-me' })).toBeNull();
expect(await coll.findOne({ name: 'temporary' })).toBeNull();
// Survivors intact
const names = (await coll.find({}).toArray()).map(d => d.name).sort();
expect(names).toEqual(['keep-1', 'keep-2', 'keep-3']);
});
// ============================================================================
// Delete all docs and verify empty after restart
// ============================================================================
tap.test('delete-persistence: delete all remaining docs', async () => {
const coll = db.collection('items');
await coll.deleteMany({});
expect(await coll.countDocuments()).toEqual(0);
});
tap.test('delete-persistence: restart with empty collection', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('deletetest');
});
tap.test('delete-persistence: collection is empty after restart', async () => {
const coll = db.collection('items');
const count = await coll.countDocuments();
expect(count).toEqual(0);
});
// ============================================================================
// Cleanup
// ============================================================================
tap.test('delete-persistence: cleanup', async () => {
await client.close();
await localDb.stop();
cleanTmpDir(tmpDir);
});
export default tap.start();
+126
View File
@@ -0,0 +1,126 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import { MongoClient, Db } from 'mongodb';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Missing data.rdb header recovery + startup logging
// Covers: ensure_data_header, BuildStats, info-level startup logging
// ---------------------------------------------------------------------------
let tmpDir: string;
let localDb: smartdb.LocalSmartDb;
let client: MongoClient;
let db: Db;
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-header-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Setup: create data, then corrupt it
// ============================================================================
tap.test('setup: start, insert data, stop', async () => {
tmpDir = makeTmpDir();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('headertest');
const coll = db.collection('docs');
await coll.insertMany([
{ key: 'a', val: 1 },
{ key: 'b', val: 2 },
{ key: 'c', val: 3 },
]);
await client.close();
await localDb.stop();
});
// ============================================================================
// Delete hint file and restart: should rebuild from data.rdb scan
// ============================================================================
tap.test('header-recovery: delete hint file and restart', async () => {
// Find and delete hint files
const dbDir = path.join(tmpDir, 'headertest', 'docs');
const hintPath = path.join(dbDir, 'keydir.hint');
if (fs.existsSync(hintPath)) {
fs.unlinkSync(hintPath);
}
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('headertest');
});
tap.test('header-recovery: data intact after hint deletion', async () => {
const coll = db.collection('docs');
const count = await coll.countDocuments();
expect(count).toEqual(3);
const a = await coll.findOne({ key: 'a' });
expect(a!.val).toEqual(1);
});
// ============================================================================
// Write new data after restart, stop, restart again
// ============================================================================
tap.test('header-recovery: write after hint-less restart', async () => {
const coll = db.collection('docs');
await coll.insertOne({ key: 'd', val: 4 });
expect(await coll.countDocuments()).toEqual(4);
});
tap.test('header-recovery: restart and verify all data', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('headertest');
const coll = db.collection('docs');
const count = await coll.countDocuments();
expect(count).toEqual(4);
const keys = (await coll.find({}).toArray()).map(d => d.key).sort();
expect(keys).toEqual(['a', 'b', 'c', 'd']);
});
// ============================================================================
// Cleanup
// ============================================================================
tap.test('header-recovery: cleanup', async () => {
await client.close();
await localDb.stop();
cleanTmpDir(tmpDir);
});
export default tap.start();
+83 -1
View File
@@ -1,6 +1,6 @@
import { expect, tap } from '@git.zone/tstest/tapbundle'; import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js'; import * as smartdb from '../ts/index.js';
import { MongoClient, Db, Collection } from 'mongodb'; import { MongoClient, Db, Collection, ObjectId } from 'mongodb';
let server: smartdb.SmartdbServer; let server: smartdb.SmartdbServer;
let client: MongoClient; let client: MongoClient;
@@ -252,6 +252,71 @@ tap.test('smartdb: update - upsert creates new document', async () => {
expect(inserted!.email).toEqual('new@example.com'); expect(inserted!.email).toEqual('new@example.com');
}); });
tap.test('smartdb: update - aggregation pipeline updateOne', async () => {
const collection = db.collection('users');
await collection.insertOne({ name: 'PipelineUser', source: 'alpha', legacy: true, visits: 2 });
const result = await collection.updateOne(
{ name: 'PipelineUser' },
[
{ $set: { sourceCopy: '$source', pipelineStatus: 'updated' } },
{ $unset: ['legacy'] },
]
);
expect(result.matchedCount).toEqual(1);
expect(result.modifiedCount).toEqual(1);
const updated = await collection.findOne({ name: 'PipelineUser' });
expect(updated).toBeTruthy();
expect(updated!.sourceCopy).toEqual('alpha');
expect(updated!.pipelineStatus).toEqual('updated');
expect(updated!.legacy).toBeUndefined();
});
tap.test('smartdb: update - aggregation pipeline upsert', async () => {
const collection = db.collection('users');
const result = await collection.updateOne(
{ name: 'PipelineUpsert' },
[
{ $set: { email: 'pipeline@example.com', status: 'new', mirroredName: '$name' } },
],
{ upsert: true }
);
expect(result.upsertedCount).toEqual(1);
const inserted = await collection.findOne({ name: 'PipelineUpsert' });
expect(inserted).toBeTruthy();
expect(inserted!.email).toEqual('pipeline@example.com');
expect(inserted!.status).toEqual('new');
expect(inserted!.mirroredName).toEqual('PipelineUpsert');
});
tap.test('smartdb: update - cannot modify immutable _id through pipeline', async () => {
const collection = db.collection('users');
const inserted = await collection.insertOne({ name: 'ImmutableIdUser' });
let threw = false;
try {
await collection.updateOne(
{ _id: inserted.insertedId },
[
{ $set: { _id: new ObjectId() } },
]
);
} catch (err: any) {
threw = true;
expect(err.code).toEqual(66);
}
expect(threw).toBeTrue();
const persisted = await collection.findOne({ _id: inserted.insertedId });
expect(persisted).toBeTruthy();
expect(persisted!.name).toEqual('ImmutableIdUser');
});
// ============================================================================ // ============================================================================
// Cursor Tests // Cursor Tests
// ============================================================================ // ============================================================================
@@ -306,6 +371,23 @@ tap.test('smartdb: findOneAndUpdate - returns updated document', async () => {
expect(result!.status).toEqual('active'); expect(result!.status).toEqual('active');
}); });
tap.test('smartdb: findOneAndUpdate - supports aggregation pipeline updates', async () => {
const collection = db.collection('users');
await collection.insertOne({ name: 'PipelineFindAndModify', sourceName: 'Finder' });
const result = await collection.findOneAndUpdate(
{ name: 'PipelineFindAndModify' },
[
{ $set: { displayName: '$sourceName', mode: 'pipeline' } },
],
{ returnDocument: 'after' }
);
expect(result).toBeTruthy();
expect(result!.displayName).toEqual('Finder');
expect(result!.mode).toEqual('pipeline');
});
tap.test('smartdb: findOneAndDelete - returns deleted document', async () => { tap.test('smartdb: findOneAndDelete - returns deleted document', async () => {
const collection = db.collection('users'); const collection = db.collection('users');
+82
View File
@@ -0,0 +1,82 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import * as fs from 'fs';
import * as net from 'net';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Stale socket cleanup on startup
// Covers: LocalSmartDb.cleanStaleSockets(), isSocketAlive()
// ---------------------------------------------------------------------------
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-socket-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Stale socket cleanup: active sockets are preserved
// ============================================================================
tap.test('stale-sockets: does not remove active sockets', async () => {
const tmpDir = makeTmpDir();
const activeSocketPath = path.join(os.tmpdir(), `smartdb-active-${Date.now()}.sock`);
// Create an active socket (server still listening)
const activeServer = net.createServer();
await new Promise<void>((resolve) => activeServer.listen(activeSocketPath, resolve));
expect(fs.existsSync(activeSocketPath)).toBeTrue();
// Start LocalSmartDb — should NOT remove the active socket
const localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
await localDb.start();
expect(fs.existsSync(activeSocketPath)).toBeTrue();
// Cleanup
await localDb.stop();
await new Promise<void>((resolve) => activeServer.close(() => resolve()));
try { fs.unlinkSync(activeSocketPath); } catch {}
cleanTmpDir(tmpDir);
});
// ============================================================================
// Stale socket cleanup: startup works with no stale sockets
// ============================================================================
tap.test('stale-sockets: startup works cleanly with no stale sockets', async () => {
const tmpDir = makeTmpDir();
const localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
expect(localDb.running).toBeTrue();
expect(info.socketPath).toBeTruthy();
await localDb.stop();
cleanTmpDir(tmpDir);
});
// ============================================================================
// Stale socket cleanup: the socket file for the current instance is cleaned on stop
// ============================================================================
tap.test('stale-sockets: own socket file is removed on stop', async () => {
const tmpDir = makeTmpDir();
const localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
expect(fs.existsSync(info.socketPath)).toBeTrue();
await localDb.stop();
// Socket file should be gone after graceful stop
expect(fs.existsSync(info.socketPath)).toBeFalse();
cleanTmpDir(tmpDir);
});
export default tap.start();
+180
View File
@@ -0,0 +1,180 @@
import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js';
import { MongoClient, Db } from 'mongodb';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
// ---------------------------------------------------------------------------
// Test: Unique index enforcement via wire protocol
// Covers: unique index pre-check, createIndexes persistence, index restoration
// ---------------------------------------------------------------------------
let tmpDir: string;
let localDb: smartdb.LocalSmartDb;
let client: MongoClient;
let db: Db;
function makeTmpDir(): string {
return fs.mkdtempSync(path.join(os.tmpdir(), 'smartdb-unique-test-'));
}
function cleanTmpDir(dir: string): void {
if (fs.existsSync(dir)) {
fs.rmSync(dir, { recursive: true, force: true });
}
}
// ============================================================================
// Setup
// ============================================================================
tap.test('setup: start local db', async () => {
tmpDir = makeTmpDir();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('uniquetest');
});
// ============================================================================
// Unique index enforcement on insert
// ============================================================================
tap.test('unique-index: createIndex with unique: true', async () => {
const coll = db.collection('users');
await coll.insertOne({ email: 'alice@example.com', name: 'Alice' });
const indexName = await coll.createIndex({ email: 1 }, { unique: true });
expect(indexName).toBeTruthy();
});
tap.test('unique-index: reject duplicate on insertOne', async () => {
const coll = db.collection('users');
let threw = false;
try {
await coll.insertOne({ email: 'alice@example.com', name: 'Alice2' });
} catch (err: any) {
threw = true;
expect(err.code).toEqual(11000);
}
expect(threw).toBeTrue();
// Verify only 1 document exists
const count = await coll.countDocuments();
expect(count).toEqual(1);
});
tap.test('unique-index: allow insert with different unique value', async () => {
const coll = db.collection('users');
await coll.insertOne({ email: 'bob@example.com', name: 'Bob' });
const count = await coll.countDocuments();
expect(count).toEqual(2);
});
// ============================================================================
// Unique index enforcement on update
// ============================================================================
tap.test('unique-index: reject duplicate on updateOne that changes unique field', async () => {
const coll = db.collection('users');
let threw = false;
try {
await coll.updateOne(
{ email: 'bob@example.com' },
{ $set: { email: 'alice@example.com' } }
);
} catch (err: any) {
threw = true;
expect(err.code).toEqual(11000);
}
expect(threw).toBeTrue();
// Bob's email should be unchanged
const bob = await coll.findOne({ name: 'Bob' });
expect(bob!.email).toEqual('bob@example.com');
});
tap.test('unique-index: allow update that keeps same unique value', async () => {
const coll = db.collection('users');
await coll.updateOne(
{ email: 'bob@example.com' },
{ $set: { name: 'Robert' } }
);
const bob = await coll.findOne({ email: 'bob@example.com' });
expect(bob!.name).toEqual('Robert');
});
// ============================================================================
// Unique index enforcement on upsert
// ============================================================================
tap.test('unique-index: reject duplicate on upsert insert', async () => {
const coll = db.collection('users');
let threw = false;
try {
await coll.updateOne(
{ email: 'new@example.com' },
{ $set: { email: 'alice@example.com', name: 'Imposter' } },
{ upsert: true }
);
} catch (err: any) {
threw = true;
}
expect(threw).toBeTrue();
});
// ============================================================================
// Unique index survives restart (persistence + restoration)
// ============================================================================
tap.test('unique-index: stop and restart', async () => {
await client.close();
await localDb.stop();
localDb = new smartdb.LocalSmartDb({ folderPath: tmpDir });
const info = await localDb.start();
client = new MongoClient(info.connectionUri, {
directConnection: true,
serverSelectionTimeoutMS: 5000,
});
await client.connect();
db = client.db('uniquetest');
});
tap.test('unique-index: enforcement persists after restart', async () => {
const coll = db.collection('users');
// Data should still be there
const count = await coll.countDocuments();
expect(count).toEqual(2);
// Unique constraint should still be enforced without calling createIndex again
let threw = false;
try {
await coll.insertOne({ email: 'alice@example.com', name: 'Alice3' });
} catch (err: any) {
threw = true;
expect(err.code).toEqual(11000);
}
expect(threw).toBeTrue();
// Count unchanged
const countAfter = await coll.countDocuments();
expect(countAfter).toEqual(2);
});
// ============================================================================
// Cleanup
// ============================================================================
tap.test('unique-index: cleanup', async () => {
await client.close();
await localDb.stop();
cleanTmpDir(tmpDir);
});
export default tap.start();
+1 -1
View File
@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdb', name: '@push.rocks/smartdb',
version: '2.5.7', version: '2.7.0',
description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.' description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.'
} }