From 459adc077ac4a9f0a2ee254649354dbc50dbd9e8 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Tue, 14 Apr 2026 10:44:45 +0000 Subject: [PATCH] feat(update): add aggregation pipeline updates and enforce immutable _id handling --- changelog.md | 7 + readme.md | 2 +- rust/crates/rustdb-commands/src/error.rs | 4 + .../src/handlers/update_handler.rs | 182 +++++++++++++----- rust/crates/rustdb-query/src/aggregation.rs | 163 +++++++++++++--- rust/crates/rustdb-query/src/update.rs | 128 ++++++++++-- test/test.smartdb.ts | 84 +++++++- ts/00_commitinfo_data.ts | 2 +- 8 files changed, 470 insertions(+), 102 deletions(-) diff --git a/changelog.md b/changelog.md index d6d5ad2..6db37b2 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-04-14 - 2.7.0 - feat(update) +add aggregation pipeline updates and enforce immutable _id handling + +- support aggregation pipeline syntax in update and findOneAndUpdate operations, including upserts +- add $unset stage support for aggregation-based document transformations +- return an ImmutableField error when updates attempt to change _id and preserve _id when omitted from replacements + ## 2026-04-05 - 2.6.2 - fix(readme) align architecture diagram formatting in the documentation diff --git a/readme.md b/readme.md index 06dcfee..cfc6de3 100644 --- a/readme.md +++ b/readme.md @@ -159,7 +159,7 @@ The debug dashboard gives you: ## 📝 Operation Log & Point-in-Time Revert -Every write operation (insert, update, delete) is automatically recorded in an in-memory **operation log (OpLog)** with full before/after document snapshots. This enables: +Every write operation (insert, update, delete) is automatically recorded in an in-memory **operation log (OpLog)** with full before/after document snapshots. The OpLog lives in RAM and resets on restart — it covers the current session only. This enables: - **Change tracking** — see exactly what changed, when, and in which collection - **Field-level diffs** — compare previous and new document states diff --git a/rust/crates/rustdb-commands/src/error.rs b/rust/crates/rustdb-commands/src/error.rs index 70cf16b..b652727 100644 --- a/rust/crates/rustdb-commands/src/error.rs +++ b/rust/crates/rustdb-commands/src/error.rs @@ -27,6 +27,9 @@ pub enum CommandError { #[error("duplicate key: {0}")] DuplicateKey(String), + #[error("immutable field: {0}")] + ImmutableField(String), + #[error("internal error: {0}")] InternalError(String), } @@ -43,6 +46,7 @@ impl CommandError { CommandError::NamespaceNotFound(_) => (26, "NamespaceNotFound"), CommandError::NamespaceExists(_) => (48, "NamespaceExists"), CommandError::DuplicateKey(_) => (11000, "DuplicateKey"), + CommandError::ImmutableField(_) => (66, "ImmutableField"), CommandError::InternalError(_) => (1, "InternalError"), }; diff --git a/rust/crates/rustdb-commands/src/handlers/update_handler.rs b/rust/crates/rustdb-commands/src/handlers/update_handler.rs index f371636..564c589 100644 --- a/rust/crates/rustdb-commands/src/handlers/update_handler.rs +++ b/rust/crates/rustdb-commands/src/handlers/update_handler.rs @@ -21,6 +21,11 @@ pub async fn handle( } } +enum TUpdateSpec { + Document(Document), + Pipeline(Vec), +} + /// Handle the `update` command. async fn handle_update( cmd: &Document, @@ -78,21 +83,22 @@ async fn handle_update( }; let update = match update_spec.get("u") { - Some(Bson::Document(d)) => d.clone(), - Some(Bson::Array(_pipeline)) => { - // Aggregation pipeline updates are not yet supported; treat as error. - write_errors.push(doc! { - "index": idx as i32, - "code": 14_i32, - "codeName": "TypeMismatch", - "errmsg": "aggregation pipeline updates not yet supported", - }); - if ordered { - break; + Some(update_value) => match parse_update_spec(update_value) { + Ok(parsed) => parsed, + Err(err) => { + write_errors.push(doc! { + "index": idx as i32, + "code": 14_i32, + "codeName": "TypeMismatch", + "errmsg": err, + }); + if ordered { + break; + } + continue; } - continue; - } - _ => { + }, + None => { write_errors.push(doc! { "index": idx as i32, "code": 14_i32, @@ -137,21 +143,12 @@ async fn handle_update( let new_doc = build_upsert_doc(&filter); // Apply update operators or replacement. - match UpdateEngine::apply_update(&new_doc, &update, array_filters.as_deref()) { + match apply_update_spec(&new_doc, &update, array_filters.as_deref()) { Ok(mut updated) => { - // Apply $setOnInsert if present. - if let Some(Bson::Document(soi)) = update.get("$setOnInsert") { - UpdateEngine::apply_set_on_insert(&mut updated, soi); - } + apply_set_on_insert_if_present(&update, &mut updated); // Ensure _id exists. - let new_id = if !updated.contains_key("_id") { - let oid = ObjectId::new(); - updated.insert("_id", oid); - Bson::ObjectId(oid) - } else { - updated.get("_id").unwrap().clone() - }; + let new_id = ensure_document_id(&mut updated); // Pre-check unique index constraints before upsert insert. if let Some(engine) = ctx.indexes.get(&ns_key) { @@ -229,12 +226,21 @@ async fn handle_update( }; for matched_doc in &docs_to_update { - match UpdateEngine::apply_update( - matched_doc, - &update, - array_filters.as_deref(), - ) { - Ok(updated_doc) => { + match apply_update_spec(matched_doc, &update, array_filters.as_deref()) { + Ok(mut updated_doc) => { + if let Err(e) = ensure_immutable_id(matched_doc, &mut updated_doc) { + write_errors.push(doc! { + "index": idx as i32, + "code": 66_i32, + "codeName": "ImmutableField", + "errmsg": e.to_string(), + }); + if ordered { + break; + } + continue; + } + // Pre-check unique index constraints before storage write. if let Some(engine) = ctx.indexes.get(&ns_key) { if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) { @@ -361,8 +367,11 @@ async fn handle_find_and_modify( }; let update_doc = match cmd.get("update") { - Some(Bson::Document(d)) => Some(d.clone()), - _ => None, + Some(update_value) => Some( + parse_update_spec(update_value) + .map_err(CommandError::InvalidArgument)? + ), + None => None, }; let remove = match cmd.get("remove") { @@ -477,12 +486,14 @@ async fn handle_find_and_modify( if let Some(original_doc) = target { // Update the matched document. - let updated_doc = UpdateEngine::apply_update( + let mut updated_doc = apply_update_spec( &original_doc, &update, array_filters.as_deref(), ) - .map_err(|e| CommandError::InternalError(e.to_string()))?; + .map_err(CommandError::InternalError)?; + + ensure_immutable_id(&original_doc, &mut updated_doc)?; // Pre-check unique index constraints before storage write. if let Some(engine) = ctx.indexes.get(&ns_key) { @@ -533,26 +544,17 @@ async fn handle_find_and_modify( // Upsert: create a new document. let new_doc = build_upsert_doc(&query); - let mut updated_doc = UpdateEngine::apply_update( + let mut updated_doc = apply_update_spec( &new_doc, &update, array_filters.as_deref(), ) - .map_err(|e| CommandError::InternalError(e.to_string()))?; + .map_err(CommandError::InternalError)?; - // Apply $setOnInsert if present. - if let Some(Bson::Document(soi)) = update.get("$setOnInsert") { - UpdateEngine::apply_set_on_insert(&mut updated_doc, soi); - } + apply_set_on_insert_if_present(&update, &mut updated_doc); // Ensure _id. - let upserted_id = if !updated_doc.contains_key("_id") { - let oid = ObjectId::new(); - updated_doc.insert("_id", oid); - Bson::ObjectId(oid) - } else { - updated_doc.get("_id").unwrap().clone() - }; + let upserted_id = ensure_document_id(&mut updated_doc); // Pre-check unique index constraints before upsert insert. if let Some(engine) = ctx.indexes.get(&ns_key) { @@ -667,6 +669,88 @@ fn build_upsert_doc(filter: &Document) -> Document { doc } +fn parse_update_spec(update_value: &Bson) -> Result { + match update_value { + Bson::Document(d) => Ok(TUpdateSpec::Document(d.clone())), + Bson::Array(stages) => { + if stages.is_empty() { + return Err("aggregation pipeline update cannot be empty".into()); + } + + let mut pipeline = Vec::with_capacity(stages.len()); + for stage in stages { + match stage { + Bson::Document(d) => pipeline.push(d.clone()), + _ => { + return Err( + "aggregation pipeline update stages must be documents".into(), + ); + } + } + } + + Ok(TUpdateSpec::Pipeline(pipeline)) + } + _ => Err("missing or invalid 'u' field in update spec".into()), + } +} + +fn apply_update_spec( + doc: &Document, + update: &TUpdateSpec, + array_filters: Option<&[Document]>, +) -> Result { + match update { + TUpdateSpec::Document(update_doc) => UpdateEngine::apply_update(doc, update_doc, array_filters) + .map_err(|e| e.to_string()), + TUpdateSpec::Pipeline(pipeline) => { + if array_filters.is_some_and(|filters| !filters.is_empty()) { + return Err( + "arrayFilters are not supported with aggregation pipeline updates" + .into(), + ); + } + + UpdateEngine::apply_pipeline_update(doc, pipeline).map_err(|e| e.to_string()) + } + } +} + +fn apply_set_on_insert_if_present(update: &TUpdateSpec, doc: &mut Document) { + if let TUpdateSpec::Document(update_doc) = update { + if let Some(Bson::Document(soi)) = update_doc.get("$setOnInsert") { + UpdateEngine::apply_set_on_insert(doc, soi); + } + } +} + +fn ensure_document_id(doc: &mut Document) -> Bson { + if let Some(id) = doc.get("_id") { + id.clone() + } else { + let oid = ObjectId::new(); + doc.insert("_id", oid); + Bson::ObjectId(oid) + } +} + +fn ensure_immutable_id(original_doc: &Document, updated_doc: &mut Document) -> CommandResult<()> { + if let Some(original_id) = original_doc.get("_id") { + match updated_doc.get("_id") { + Some(updated_id) if updated_id == original_id => Ok(()), + Some(_) => Err(CommandError::ImmutableField( + "cannot modify immutable field '_id'".into(), + )), + None => { + updated_doc.insert("_id", original_id.clone()); + Ok(()) + } + } + } else { + Ok(()) + } +} + /// Extract _id as a string for storage operations. fn extract_id_string(doc: &Document) -> String { match doc.get("_id") { diff --git a/rust/crates/rustdb-query/src/aggregation.rs b/rust/crates/rustdb-query/src/aggregation.rs index 8699ac2..740dc01 100644 --- a/rust/crates/rustdb-query/src/aggregation.rs +++ b/rust/crates/rustdb-query/src/aggregation.rs @@ -2,10 +2,10 @@ use bson::{Bson, Document}; use std::collections::HashMap; use crate::error::QueryError; +use crate::field_path::{get_nested_value, remove_nested_value}; use crate::matcher::QueryMatcher; -use crate::sort::sort_documents; use crate::projection::apply_projection; -use crate::field_path::get_nested_value; +use crate::sort::sort_documents; /// Aggregation pipeline engine. pub struct AggregationEngine; @@ -42,6 +42,7 @@ impl AggregationEngine { "$count" => Self::stage_count(current, stage_spec)?, "$addFields" | "$set" => Self::stage_add_fields(current, stage_spec)?, "$replaceRoot" | "$replaceWith" => Self::stage_replace_root(current, stage_spec)?, + "$unset" => Self::stage_unset(current, stage_spec)?, "$lookup" => Self::stage_lookup(current, stage_spec, resolver, db)?, "$facet" => Self::stage_facet(current, stage_spec, resolver, db)?, "$unionWith" => Self::stage_union_with(current, stage_spec, resolver, db)?, @@ -60,7 +61,11 @@ impl AggregationEngine { fn stage_match(docs: Vec, spec: &Bson) -> Result, QueryError> { let filter = match spec { Bson::Document(d) => d, - _ => return Err(QueryError::AggregationError("$match requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$match requires a document".into(), + )) + } }; Ok(QueryMatcher::filter(&docs, filter)) } @@ -68,15 +73,26 @@ impl AggregationEngine { fn stage_project(docs: Vec, spec: &Bson) -> Result, QueryError> { let projection = match spec { Bson::Document(d) => d, - _ => return Err(QueryError::AggregationError("$project requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$project requires a document".into(), + )) + } }; - Ok(docs.into_iter().map(|doc| apply_projection(&doc, projection)).collect()) + Ok(docs + .into_iter() + .map(|doc| apply_projection(&doc, projection)) + .collect()) } fn stage_sort(mut docs: Vec, spec: &Bson) -> Result, QueryError> { let sort_spec = match spec { Bson::Document(d) => d, - _ => return Err(QueryError::AggregationError("$sort requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$sort requires a document".into(), + )) + } }; sort_documents(&mut docs, sort_spec); Ok(docs) @@ -97,7 +113,11 @@ impl AggregationEngine { fn stage_group(docs: Vec, spec: &Bson) -> Result, QueryError> { let group_spec = match spec { Bson::Document(d) => d, - _ => return Err(QueryError::AggregationError("$group requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$group requires a document".into(), + )) + } }; let id_expr = group_spec.get("_id").cloned().unwrap_or(Bson::Null); @@ -158,13 +178,18 @@ impl AggregationEngine { let (path, preserve_null) = match spec { Bson::String(s) => (s.trim_start_matches('$').to_string(), false), Bson::Document(d) => { - let path = d.get_str("path") + let path = d + .get_str("path") .map(|s| s.trim_start_matches('$').to_string()) .map_err(|_| QueryError::AggregationError("$unwind requires 'path'".into()))?; let preserve = d.get_bool("preserveNullAndEmptyArrays").unwrap_or(false); (path, preserve) } - _ => return Err(QueryError::AggregationError("$unwind requires a string or document".into())), + _ => { + return Err(QueryError::AggregationError( + "$unwind requires a string or document".into(), + )) + } }; let mut result = Vec::new(); @@ -206,7 +231,11 @@ impl AggregationEngine { fn stage_count(docs: Vec, spec: &Bson) -> Result, QueryError> { let field = match spec { Bson::String(s) => s.clone(), - _ => return Err(QueryError::AggregationError("$count requires a string".into())), + _ => { + return Err(QueryError::AggregationError( + "$count requires a string".into(), + )) + } }; Ok(vec![bson::doc! { field: docs.len() as i64 }]) } @@ -214,7 +243,11 @@ impl AggregationEngine { fn stage_add_fields(docs: Vec, spec: &Bson) -> Result, QueryError> { let fields = match spec { Bson::Document(d) => d, - _ => return Err(QueryError::AggregationError("$addFields requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$addFields requires a document".into(), + )) + } }; Ok(docs @@ -231,9 +264,16 @@ impl AggregationEngine { fn stage_replace_root(docs: Vec, spec: &Bson) -> Result, QueryError> { let new_root_expr = match spec { - Bson::Document(d) => d.get("newRoot").cloned().unwrap_or(Bson::Document(d.clone())), + Bson::Document(d) => d + .get("newRoot") + .cloned() + .unwrap_or(Bson::Document(d.clone())), Bson::String(s) => Bson::String(s.clone()), - _ => return Err(QueryError::AggregationError("$replaceRoot requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$replaceRoot requires a document".into(), + )) + } }; let mut result = Vec::new(); @@ -246,6 +286,40 @@ impl AggregationEngine { Ok(result) } + fn stage_unset(docs: Vec, spec: &Bson) -> Result, QueryError> { + let fields: Vec = match spec { + Bson::String(s) => vec![s.clone()], + Bson::Array(arr) => arr + .iter() + .map(|value| match value { + Bson::String(field) => Ok(field.clone()), + _ => Err(QueryError::AggregationError( + "$unset array entries must be strings".into(), + )), + }) + .collect::, _>>()?, + _ => { + return Err(QueryError::AggregationError( + "$unset requires a string or array of strings".into(), + )); + } + }; + + Ok(docs + .into_iter() + .map(|mut doc| { + for field in &fields { + if field.contains('.') { + remove_nested_value(&mut doc, field); + } else { + doc.remove(field); + } + } + doc + }) + .collect()) + } + fn stage_lookup( docs: Vec, spec: &Bson, @@ -254,20 +328,29 @@ impl AggregationEngine { ) -> Result, QueryError> { let lookup = match spec { Bson::Document(d) => d, - _ => return Err(QueryError::AggregationError("$lookup requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$lookup requires a document".into(), + )) + } }; - let from = lookup.get_str("from") + let from = lookup + .get_str("from") .map_err(|_| QueryError::AggregationError("$lookup requires 'from'".into()))?; - let local_field = lookup.get_str("localField") + let local_field = lookup + .get_str("localField") .map_err(|_| QueryError::AggregationError("$lookup requires 'localField'".into()))?; - let foreign_field = lookup.get_str("foreignField") + let foreign_field = lookup + .get_str("foreignField") .map_err(|_| QueryError::AggregationError("$lookup requires 'foreignField'".into()))?; - let as_field = lookup.get_str("as") + let as_field = lookup + .get_str("as") .map_err(|_| QueryError::AggregationError("$lookup requires 'as'".into()))?; - let resolver = resolver - .ok_or_else(|| QueryError::AggregationError("$lookup requires a collection resolver".into()))?; + let resolver = resolver.ok_or_else(|| { + QueryError::AggregationError("$lookup requires a collection resolver".into()) + })?; let foreign_docs = resolver.resolve(db, from)?; Ok(docs @@ -299,7 +382,11 @@ impl AggregationEngine { ) -> Result, QueryError> { let facets = match spec { Bson::Document(d) => d, - _ => return Err(QueryError::AggregationError("$facet requires a document".into())), + _ => { + return Err(QueryError::AggregationError( + "$facet requires a document".into(), + )) + } }; let mut result = Document::new(); @@ -337,22 +424,32 @@ impl AggregationEngine { let (coll, pipeline) = match spec { Bson::String(s) => (s.as_str(), None), Bson::Document(d) => { - let coll = d.get_str("coll") - .map_err(|_| QueryError::AggregationError("$unionWith requires 'coll'".into()))?; + let coll = d.get_str("coll").map_err(|_| { + QueryError::AggregationError("$unionWith requires 'coll'".into()) + })?; let pipeline = d.get_array("pipeline").ok().map(|arr| { arr.iter() .filter_map(|s| { - if let Bson::Document(d) = s { Some(d.clone()) } else { None } + if let Bson::Document(d) = s { + Some(d.clone()) + } else { + None + } }) .collect::>() }); (coll, pipeline) } - _ => return Err(QueryError::AggregationError("$unionWith requires a string or document".into())), + _ => { + return Err(QueryError::AggregationError( + "$unionWith requires a string or document".into(), + )) + } }; - let resolver = resolver - .ok_or_else(|| QueryError::AggregationError("$unionWith requires a collection resolver".into()))?; + let resolver = resolver.ok_or_else(|| { + QueryError::AggregationError("$unionWith requires a collection resolver".into()) + })?; let mut other_docs = resolver.resolve(db, coll)?; if let Some(p) = pipeline { @@ -476,7 +573,11 @@ fn accumulate_min(docs: &[Document], expr: &Bson) -> Bson { None => val, Some(current) => { if let (Some(cv), Some(vv)) = (bson_to_f64(¤t), bson_to_f64(&val)) { - if vv < cv { val } else { current } + if vv < cv { + val + } else { + current + } } else { current } @@ -499,7 +600,11 @@ fn accumulate_max(docs: &[Document], expr: &Bson) -> Bson { None => val, Some(current) => { if let (Some(cv), Some(vv)) = (bson_to_f64(¤t), bson_to_f64(&val)) { - if vv > cv { val } else { current } + if vv > cv { + val + } else { + current + } } else { current } diff --git a/rust/crates/rustdb-query/src/update.rs b/rust/crates/rustdb-query/src/update.rs index 40374b1..7c6ff46 100644 --- a/rust/crates/rustdb-query/src/update.rs +++ b/rust/crates/rustdb-query/src/update.rs @@ -1,7 +1,8 @@ -use bson::{Bson, Document, doc}; +use bson::{doc, Bson, Document}; +use crate::aggregation::AggregationEngine; use crate::error::QueryError; -use crate::field_path::{get_nested_value, set_nested_value, remove_nested_value}; +use crate::field_path::{get_nested_value, remove_nested_value, set_nested_value}; use crate::matcher::QueryMatcher; /// Update engine — applies update operators to documents. @@ -56,6 +57,46 @@ impl UpdateEngine { Ok(result) } + /// Apply an aggregation pipeline update specification to a document. + pub fn apply_pipeline_update( + doc: &Document, + pipeline: &[Document], + ) -> Result { + if pipeline.is_empty() { + return Err(QueryError::InvalidUpdate( + "aggregation pipeline update cannot be empty".into(), + )); + } + + for stage in pipeline { + let (stage_name, _) = stage.iter().next().ok_or_else(|| { + QueryError::InvalidUpdate( + "aggregation pipeline update stages must not be empty".into(), + ) + })?; + + if !matches!( + stage_name.as_str(), + "$addFields" | "$set" | "$project" | "$unset" | "$replaceRoot" | "$replaceWith" + ) { + return Err(QueryError::InvalidUpdate(format!( + "Unsupported aggregation pipeline update stage: {}", + stage_name + ))); + } + } + + let mut results = AggregationEngine::aggregate(vec![doc.clone()], pipeline, None, "") + .map_err(|e| QueryError::InvalidUpdate(e.to_string()))?; + + match results.len() { + 1 => Ok(results.remove(0)), + _ => Err(QueryError::InvalidUpdate( + "aggregation pipeline update must produce exactly one document".into(), + )), + } + } + /// Apply $setOnInsert fields (used during upsert only). pub fn apply_set_on_insert(doc: &mut Document, fields: &Document) { for (key, value) in fields { @@ -252,16 +293,14 @@ impl UpdateEngine { for (key, spec) in fields { let value = match spec { Bson::Boolean(true) => Bson::DateTime(now), - Bson::Document(d) => { - match d.get_str("$type").unwrap_or("date") { - "date" => Bson::DateTime(now), - "timestamp" => Bson::Timestamp(bson::Timestamp { - time: (now.timestamp_millis() / 1000) as u32, - increment: 0, - }), - _ => Bson::DateTime(now), - } - } + Bson::Document(d) => match d.get_str("$type").unwrap_or("date") { + "date" => Bson::DateTime(now), + "timestamp" => Bson::Timestamp(bson::Timestamp { + time: (now.timestamp_millis() / 1000) as u32, + increment: 0, + }), + _ => Bson::DateTime(now), + }, _ => continue, }; @@ -282,7 +321,9 @@ impl UpdateEngine { Bson::Document(d) if d.contains_key("$each") => { let each = match d.get("$each") { Some(Bson::Array(a)) => a.clone(), - _ => return Err(QueryError::InvalidUpdate("$each must be an array".into())), + _ => { + return Err(QueryError::InvalidUpdate("$each must be an array".into())) + } }; let position = d.get("$position").and_then(|v| match v { @@ -325,11 +366,21 @@ impl UpdateEngine { continue; } match direction { - Bson::Int32(-1) | Bson::Int64(-1) => { arr.remove(0); } - Bson::Int32(1) | Bson::Int64(1) => { arr.pop(); } - Bson::Double(f) if *f == 1.0 => { arr.pop(); } - Bson::Double(f) if *f == -1.0 => { arr.remove(0); } - _ => { arr.pop(); } + Bson::Int32(-1) | Bson::Int64(-1) => { + arr.remove(0); + } + Bson::Int32(1) | Bson::Int64(1) => { + arr.pop(); + } + Bson::Double(f) if *f == 1.0 => { + arr.pop(); + } + Bson::Double(f) if *f == -1.0 => { + arr.remove(0); + } + _ => { + arr.pop(); + } } } } @@ -455,7 +506,11 @@ impl UpdateEngine { let ascending = *dir > 0; arr.sort_by(|a, b| { let ord = partial_cmp_bson(a, b); - if ascending { ord } else { ord.reverse() } + if ascending { + ord + } else { + ord.reverse() + } }); } Bson::Document(spec) => { @@ -465,8 +520,16 @@ impl UpdateEngine { Bson::Int32(n) => *n > 0, _ => true, }; - let a_val = if let Bson::Document(d) = a { d.get(field) } else { None }; - let b_val = if let Bson::Document(d) = b { d.get(field) } else { None }; + let a_val = if let Bson::Document(d) = a { + d.get(field) + } else { + None + }; + let b_val = if let Bson::Document(d) = b { + d.get(field) + } else { + None + }; let ord = match (a_val, b_val) { (Some(av), Some(bv)) => partial_cmp_bson(av, bv), (Some(_), None) => std::cmp::Ordering::Greater, @@ -572,4 +635,27 @@ mod tests { let tags = result.get_array("tags").unwrap(); assert_eq!(tags.len(), 2); // no duplicate } + + #[test] + fn test_pipeline_update() { + let doc = doc! { "_id": 1, "name": "Alice", "age": 30, "legacy": true }; + let pipeline = vec![ + doc! { "$set": { "displayName": "$name", "status": "updated" } }, + doc! { "$unset": ["legacy"] }, + ]; + + let result = UpdateEngine::apply_pipeline_update(&doc, &pipeline).unwrap(); + assert_eq!(result.get_str("displayName").unwrap(), "Alice"); + assert_eq!(result.get_str("status").unwrap(), "updated"); + assert!(result.get("legacy").is_none()); + } + + #[test] + fn test_pipeline_update_rejects_unsupported_stage() { + let doc = doc! { "_id": 1, "name": "Alice" }; + let pipeline = vec![doc! { "$match": { "name": "Alice" } }]; + + let result = UpdateEngine::apply_pipeline_update(&doc, &pipeline); + assert!(matches!(result, Err(QueryError::InvalidUpdate(_)))); + } } diff --git a/test/test.smartdb.ts b/test/test.smartdb.ts index 52347e5..a41dcc7 100644 --- a/test/test.smartdb.ts +++ b/test/test.smartdb.ts @@ -1,6 +1,6 @@ import { expect, tap } from '@git.zone/tstest/tapbundle'; import * as smartdb from '../ts/index.js'; -import { MongoClient, Db, Collection } from 'mongodb'; +import { MongoClient, Db, Collection, ObjectId } from 'mongodb'; let server: smartdb.SmartdbServer; let client: MongoClient; @@ -252,6 +252,71 @@ tap.test('smartdb: update - upsert creates new document', async () => { expect(inserted!.email).toEqual('new@example.com'); }); +tap.test('smartdb: update - aggregation pipeline updateOne', async () => { + const collection = db.collection('users'); + await collection.insertOne({ name: 'PipelineUser', source: 'alpha', legacy: true, visits: 2 }); + + const result = await collection.updateOne( + { name: 'PipelineUser' }, + [ + { $set: { sourceCopy: '$source', pipelineStatus: 'updated' } }, + { $unset: ['legacy'] }, + ] + ); + + expect(result.matchedCount).toEqual(1); + expect(result.modifiedCount).toEqual(1); + + const updated = await collection.findOne({ name: 'PipelineUser' }); + expect(updated).toBeTruthy(); + expect(updated!.sourceCopy).toEqual('alpha'); + expect(updated!.pipelineStatus).toEqual('updated'); + expect(updated!.legacy).toBeUndefined(); +}); + +tap.test('smartdb: update - aggregation pipeline upsert', async () => { + const collection = db.collection('users'); + const result = await collection.updateOne( + { name: 'PipelineUpsert' }, + [ + { $set: { email: 'pipeline@example.com', status: 'new', mirroredName: '$name' } }, + ], + { upsert: true } + ); + + expect(result.upsertedCount).toEqual(1); + + const inserted = await collection.findOne({ name: 'PipelineUpsert' }); + expect(inserted).toBeTruthy(); + expect(inserted!.email).toEqual('pipeline@example.com'); + expect(inserted!.status).toEqual('new'); + expect(inserted!.mirroredName).toEqual('PipelineUpsert'); +}); + +tap.test('smartdb: update - cannot modify immutable _id through pipeline', async () => { + const collection = db.collection('users'); + const inserted = await collection.insertOne({ name: 'ImmutableIdUser' }); + + let threw = false; + try { + await collection.updateOne( + { _id: inserted.insertedId }, + [ + { $set: { _id: new ObjectId() } }, + ] + ); + } catch (err: any) { + threw = true; + expect(err.code).toEqual(66); + } + + expect(threw).toBeTrue(); + + const persisted = await collection.findOne({ _id: inserted.insertedId }); + expect(persisted).toBeTruthy(); + expect(persisted!.name).toEqual('ImmutableIdUser'); +}); + // ============================================================================ // Cursor Tests // ============================================================================ @@ -306,6 +371,23 @@ tap.test('smartdb: findOneAndUpdate - returns updated document', async () => { expect(result!.status).toEqual('active'); }); +tap.test('smartdb: findOneAndUpdate - supports aggregation pipeline updates', async () => { + const collection = db.collection('users'); + await collection.insertOne({ name: 'PipelineFindAndModify', sourceName: 'Finder' }); + + const result = await collection.findOneAndUpdate( + { name: 'PipelineFindAndModify' }, + [ + { $set: { displayName: '$sourceName', mode: 'pipeline' } }, + ], + { returnDocument: 'after' } + ); + + expect(result).toBeTruthy(); + expect(result!.displayName).toEqual('Finder'); + expect(result!.mode).toEqual('pipeline'); +}); + tap.test('smartdb: findOneAndDelete - returns deleted document', async () => { const collection = db.collection('users'); diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index ed8b3b9..dee7351 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: '@push.rocks/smartdb', - version: '2.6.2', + version: '2.7.0', description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.' }