feat(update): add aggregation pipeline updates and enforce immutable _id handling

This commit is contained in:
2026-04-14 10:44:45 +00:00
parent 19f18ef480
commit 459adc077a
8 changed files with 470 additions and 102 deletions
+7
View File
@@ -1,5 +1,12 @@
# Changelog # Changelog
## 2026-04-14 - 2.7.0 - feat(update)
add aggregation pipeline updates and enforce immutable _id handling
- support aggregation pipeline syntax in update and findOneAndUpdate operations, including upserts
- add $unset stage support for aggregation-based document transformations
- return an ImmutableField error when updates attempt to change _id and preserve _id when omitted from replacements
## 2026-04-05 - 2.6.2 - fix(readme) ## 2026-04-05 - 2.6.2 - fix(readme)
align architecture diagram formatting in the documentation align architecture diagram formatting in the documentation
+1 -1
View File
@@ -159,7 +159,7 @@ The debug dashboard gives you:
## 📝 Operation Log & Point-in-Time Revert ## 📝 Operation Log & Point-in-Time Revert
Every write operation (insert, update, delete) is automatically recorded in an in-memory **operation log (OpLog)** with full before/after document snapshots. This enables: Every write operation (insert, update, delete) is automatically recorded in an in-memory **operation log (OpLog)** with full before/after document snapshots. The OpLog lives in RAM and resets on restart — it covers the current session only. This enables:
- **Change tracking** — see exactly what changed, when, and in which collection - **Change tracking** — see exactly what changed, when, and in which collection
- **Field-level diffs** — compare previous and new document states - **Field-level diffs** — compare previous and new document states
+4
View File
@@ -27,6 +27,9 @@ pub enum CommandError {
#[error("duplicate key: {0}")] #[error("duplicate key: {0}")]
DuplicateKey(String), DuplicateKey(String),
#[error("immutable field: {0}")]
ImmutableField(String),
#[error("internal error: {0}")] #[error("internal error: {0}")]
InternalError(String), InternalError(String),
} }
@@ -43,6 +46,7 @@ impl CommandError {
CommandError::NamespaceNotFound(_) => (26, "NamespaceNotFound"), CommandError::NamespaceNotFound(_) => (26, "NamespaceNotFound"),
CommandError::NamespaceExists(_) => (48, "NamespaceExists"), CommandError::NamespaceExists(_) => (48, "NamespaceExists"),
CommandError::DuplicateKey(_) => (11000, "DuplicateKey"), CommandError::DuplicateKey(_) => (11000, "DuplicateKey"),
CommandError::ImmutableField(_) => (66, "ImmutableField"),
CommandError::InternalError(_) => (1, "InternalError"), CommandError::InternalError(_) => (1, "InternalError"),
}; };
@@ -21,6 +21,11 @@ pub async fn handle(
} }
} }
enum TUpdateSpec {
Document(Document),
Pipeline(Vec<Document>),
}
/// Handle the `update` command. /// Handle the `update` command.
async fn handle_update( async fn handle_update(
cmd: &Document, cmd: &Document,
@@ -78,21 +83,22 @@ async fn handle_update(
}; };
let update = match update_spec.get("u") { let update = match update_spec.get("u") {
Some(Bson::Document(d)) => d.clone(), Some(update_value) => match parse_update_spec(update_value) {
Some(Bson::Array(_pipeline)) => { Ok(parsed) => parsed,
// Aggregation pipeline updates are not yet supported; treat as error. Err(err) => {
write_errors.push(doc! { write_errors.push(doc! {
"index": idx as i32, "index": idx as i32,
"code": 14_i32, "code": 14_i32,
"codeName": "TypeMismatch", "codeName": "TypeMismatch",
"errmsg": "aggregation pipeline updates not yet supported", "errmsg": err,
}); });
if ordered { if ordered {
break; break;
}
continue;
} }
continue; },
} None => {
_ => {
write_errors.push(doc! { write_errors.push(doc! {
"index": idx as i32, "index": idx as i32,
"code": 14_i32, "code": 14_i32,
@@ -137,21 +143,12 @@ async fn handle_update(
let new_doc = build_upsert_doc(&filter); let new_doc = build_upsert_doc(&filter);
// Apply update operators or replacement. // Apply update operators or replacement.
match UpdateEngine::apply_update(&new_doc, &update, array_filters.as_deref()) { match apply_update_spec(&new_doc, &update, array_filters.as_deref()) {
Ok(mut updated) => { Ok(mut updated) => {
// Apply $setOnInsert if present. apply_set_on_insert_if_present(&update, &mut updated);
if let Some(Bson::Document(soi)) = update.get("$setOnInsert") {
UpdateEngine::apply_set_on_insert(&mut updated, soi);
}
// Ensure _id exists. // Ensure _id exists.
let new_id = if !updated.contains_key("_id") { let new_id = ensure_document_id(&mut updated);
let oid = ObjectId::new();
updated.insert("_id", oid);
Bson::ObjectId(oid)
} else {
updated.get("_id").unwrap().clone()
};
// Pre-check unique index constraints before upsert insert. // Pre-check unique index constraints before upsert insert.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
@@ -229,12 +226,21 @@ async fn handle_update(
}; };
for matched_doc in &docs_to_update { for matched_doc in &docs_to_update {
match UpdateEngine::apply_update( match apply_update_spec(matched_doc, &update, array_filters.as_deref()) {
matched_doc, Ok(mut updated_doc) => {
&update, if let Err(e) = ensure_immutable_id(matched_doc, &mut updated_doc) {
array_filters.as_deref(), write_errors.push(doc! {
) { "index": idx as i32,
Ok(updated_doc) => { "code": 66_i32,
"codeName": "ImmutableField",
"errmsg": e.to_string(),
});
if ordered {
break;
}
continue;
}
// Pre-check unique index constraints before storage write. // Pre-check unique index constraints before storage write.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) { if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) {
@@ -361,8 +367,11 @@ async fn handle_find_and_modify(
}; };
let update_doc = match cmd.get("update") { let update_doc = match cmd.get("update") {
Some(Bson::Document(d)) => Some(d.clone()), Some(update_value) => Some(
_ => None, parse_update_spec(update_value)
.map_err(CommandError::InvalidArgument)?
),
None => None,
}; };
let remove = match cmd.get("remove") { let remove = match cmd.get("remove") {
@@ -477,12 +486,14 @@ async fn handle_find_and_modify(
if let Some(original_doc) = target { if let Some(original_doc) = target {
// Update the matched document. // Update the matched document.
let updated_doc = UpdateEngine::apply_update( let mut updated_doc = apply_update_spec(
&original_doc, &original_doc,
&update, &update,
array_filters.as_deref(), array_filters.as_deref(),
) )
.map_err(|e| CommandError::InternalError(e.to_string()))?; .map_err(CommandError::InternalError)?;
ensure_immutable_id(&original_doc, &mut updated_doc)?;
// Pre-check unique index constraints before storage write. // Pre-check unique index constraints before storage write.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
@@ -533,26 +544,17 @@ async fn handle_find_and_modify(
// Upsert: create a new document. // Upsert: create a new document.
let new_doc = build_upsert_doc(&query); let new_doc = build_upsert_doc(&query);
let mut updated_doc = UpdateEngine::apply_update( let mut updated_doc = apply_update_spec(
&new_doc, &new_doc,
&update, &update,
array_filters.as_deref(), array_filters.as_deref(),
) )
.map_err(|e| CommandError::InternalError(e.to_string()))?; .map_err(CommandError::InternalError)?;
// Apply $setOnInsert if present. apply_set_on_insert_if_present(&update, &mut updated_doc);
if let Some(Bson::Document(soi)) = update.get("$setOnInsert") {
UpdateEngine::apply_set_on_insert(&mut updated_doc, soi);
}
// Ensure _id. // Ensure _id.
let upserted_id = if !updated_doc.contains_key("_id") { let upserted_id = ensure_document_id(&mut updated_doc);
let oid = ObjectId::new();
updated_doc.insert("_id", oid);
Bson::ObjectId(oid)
} else {
updated_doc.get("_id").unwrap().clone()
};
// Pre-check unique index constraints before upsert insert. // Pre-check unique index constraints before upsert insert.
if let Some(engine) = ctx.indexes.get(&ns_key) { if let Some(engine) = ctx.indexes.get(&ns_key) {
@@ -667,6 +669,88 @@ fn build_upsert_doc(filter: &Document) -> Document {
doc doc
} }
fn parse_update_spec(update_value: &Bson) -> Result<TUpdateSpec, String> {
match update_value {
Bson::Document(d) => Ok(TUpdateSpec::Document(d.clone())),
Bson::Array(stages) => {
if stages.is_empty() {
return Err("aggregation pipeline update cannot be empty".into());
}
let mut pipeline = Vec::with_capacity(stages.len());
for stage in stages {
match stage {
Bson::Document(d) => pipeline.push(d.clone()),
_ => {
return Err(
"aggregation pipeline update stages must be documents".into(),
);
}
}
}
Ok(TUpdateSpec::Pipeline(pipeline))
}
_ => Err("missing or invalid 'u' field in update spec".into()),
}
}
fn apply_update_spec(
doc: &Document,
update: &TUpdateSpec,
array_filters: Option<&[Document]>,
) -> Result<Document, String> {
match update {
TUpdateSpec::Document(update_doc) => UpdateEngine::apply_update(doc, update_doc, array_filters)
.map_err(|e| e.to_string()),
TUpdateSpec::Pipeline(pipeline) => {
if array_filters.is_some_and(|filters| !filters.is_empty()) {
return Err(
"arrayFilters are not supported with aggregation pipeline updates"
.into(),
);
}
UpdateEngine::apply_pipeline_update(doc, pipeline).map_err(|e| e.to_string())
}
}
}
fn apply_set_on_insert_if_present(update: &TUpdateSpec, doc: &mut Document) {
if let TUpdateSpec::Document(update_doc) = update {
if let Some(Bson::Document(soi)) = update_doc.get("$setOnInsert") {
UpdateEngine::apply_set_on_insert(doc, soi);
}
}
}
fn ensure_document_id(doc: &mut Document) -> Bson {
if let Some(id) = doc.get("_id") {
id.clone()
} else {
let oid = ObjectId::new();
doc.insert("_id", oid);
Bson::ObjectId(oid)
}
}
fn ensure_immutable_id(original_doc: &Document, updated_doc: &mut Document) -> CommandResult<()> {
if let Some(original_id) = original_doc.get("_id") {
match updated_doc.get("_id") {
Some(updated_id) if updated_id == original_id => Ok(()),
Some(_) => Err(CommandError::ImmutableField(
"cannot modify immutable field '_id'".into(),
)),
None => {
updated_doc.insert("_id", original_id.clone());
Ok(())
}
}
} else {
Ok(())
}
}
/// Extract _id as a string for storage operations. /// Extract _id as a string for storage operations.
fn extract_id_string(doc: &Document) -> String { fn extract_id_string(doc: &Document) -> String {
match doc.get("_id") { match doc.get("_id") {
+134 -29
View File
@@ -2,10 +2,10 @@ use bson::{Bson, Document};
use std::collections::HashMap; use std::collections::HashMap;
use crate::error::QueryError; use crate::error::QueryError;
use crate::field_path::{get_nested_value, remove_nested_value};
use crate::matcher::QueryMatcher; use crate::matcher::QueryMatcher;
use crate::sort::sort_documents;
use crate::projection::apply_projection; use crate::projection::apply_projection;
use crate::field_path::get_nested_value; use crate::sort::sort_documents;
/// Aggregation pipeline engine. /// Aggregation pipeline engine.
pub struct AggregationEngine; pub struct AggregationEngine;
@@ -42,6 +42,7 @@ impl AggregationEngine {
"$count" => Self::stage_count(current, stage_spec)?, "$count" => Self::stage_count(current, stage_spec)?,
"$addFields" | "$set" => Self::stage_add_fields(current, stage_spec)?, "$addFields" | "$set" => Self::stage_add_fields(current, stage_spec)?,
"$replaceRoot" | "$replaceWith" => Self::stage_replace_root(current, stage_spec)?, "$replaceRoot" | "$replaceWith" => Self::stage_replace_root(current, stage_spec)?,
"$unset" => Self::stage_unset(current, stage_spec)?,
"$lookup" => Self::stage_lookup(current, stage_spec, resolver, db)?, "$lookup" => Self::stage_lookup(current, stage_spec, resolver, db)?,
"$facet" => Self::stage_facet(current, stage_spec, resolver, db)?, "$facet" => Self::stage_facet(current, stage_spec, resolver, db)?,
"$unionWith" => Self::stage_union_with(current, stage_spec, resolver, db)?, "$unionWith" => Self::stage_union_with(current, stage_spec, resolver, db)?,
@@ -60,7 +61,11 @@ impl AggregationEngine {
fn stage_match(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_match(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let filter = match spec { let filter = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$match requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$match requires a document".into(),
))
}
}; };
Ok(QueryMatcher::filter(&docs, filter)) Ok(QueryMatcher::filter(&docs, filter))
} }
@@ -68,15 +73,26 @@ impl AggregationEngine {
fn stage_project(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_project(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let projection = match spec { let projection = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$project requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$project requires a document".into(),
))
}
}; };
Ok(docs.into_iter().map(|doc| apply_projection(&doc, projection)).collect()) Ok(docs
.into_iter()
.map(|doc| apply_projection(&doc, projection))
.collect())
} }
fn stage_sort(mut docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_sort(mut docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let sort_spec = match spec { let sort_spec = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$sort requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$sort requires a document".into(),
))
}
}; };
sort_documents(&mut docs, sort_spec); sort_documents(&mut docs, sort_spec);
Ok(docs) Ok(docs)
@@ -97,7 +113,11 @@ impl AggregationEngine {
fn stage_group(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_group(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let group_spec = match spec { let group_spec = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$group requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$group requires a document".into(),
))
}
}; };
let id_expr = group_spec.get("_id").cloned().unwrap_or(Bson::Null); let id_expr = group_spec.get("_id").cloned().unwrap_or(Bson::Null);
@@ -158,13 +178,18 @@ impl AggregationEngine {
let (path, preserve_null) = match spec { let (path, preserve_null) = match spec {
Bson::String(s) => (s.trim_start_matches('$').to_string(), false), Bson::String(s) => (s.trim_start_matches('$').to_string(), false),
Bson::Document(d) => { Bson::Document(d) => {
let path = d.get_str("path") let path = d
.get_str("path")
.map(|s| s.trim_start_matches('$').to_string()) .map(|s| s.trim_start_matches('$').to_string())
.map_err(|_| QueryError::AggregationError("$unwind requires 'path'".into()))?; .map_err(|_| QueryError::AggregationError("$unwind requires 'path'".into()))?;
let preserve = d.get_bool("preserveNullAndEmptyArrays").unwrap_or(false); let preserve = d.get_bool("preserveNullAndEmptyArrays").unwrap_or(false);
(path, preserve) (path, preserve)
} }
_ => return Err(QueryError::AggregationError("$unwind requires a string or document".into())), _ => {
return Err(QueryError::AggregationError(
"$unwind requires a string or document".into(),
))
}
}; };
let mut result = Vec::new(); let mut result = Vec::new();
@@ -206,7 +231,11 @@ impl AggregationEngine {
fn stage_count(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_count(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let field = match spec { let field = match spec {
Bson::String(s) => s.clone(), Bson::String(s) => s.clone(),
_ => return Err(QueryError::AggregationError("$count requires a string".into())), _ => {
return Err(QueryError::AggregationError(
"$count requires a string".into(),
))
}
}; };
Ok(vec![bson::doc! { field: docs.len() as i64 }]) Ok(vec![bson::doc! { field: docs.len() as i64 }])
} }
@@ -214,7 +243,11 @@ impl AggregationEngine {
fn stage_add_fields(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_add_fields(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let fields = match spec { let fields = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$addFields requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$addFields requires a document".into(),
))
}
}; };
Ok(docs Ok(docs
@@ -231,9 +264,16 @@ impl AggregationEngine {
fn stage_replace_root(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> { fn stage_replace_root(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let new_root_expr = match spec { let new_root_expr = match spec {
Bson::Document(d) => d.get("newRoot").cloned().unwrap_or(Bson::Document(d.clone())), Bson::Document(d) => d
.get("newRoot")
.cloned()
.unwrap_or(Bson::Document(d.clone())),
Bson::String(s) => Bson::String(s.clone()), Bson::String(s) => Bson::String(s.clone()),
_ => return Err(QueryError::AggregationError("$replaceRoot requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$replaceRoot requires a document".into(),
))
}
}; };
let mut result = Vec::new(); let mut result = Vec::new();
@@ -246,6 +286,40 @@ impl AggregationEngine {
Ok(result) Ok(result)
} }
fn stage_unset(docs: Vec<Document>, spec: &Bson) -> Result<Vec<Document>, QueryError> {
let fields: Vec<String> = match spec {
Bson::String(s) => vec![s.clone()],
Bson::Array(arr) => arr
.iter()
.map(|value| match value {
Bson::String(field) => Ok(field.clone()),
_ => Err(QueryError::AggregationError(
"$unset array entries must be strings".into(),
)),
})
.collect::<Result<Vec<_>, _>>()?,
_ => {
return Err(QueryError::AggregationError(
"$unset requires a string or array of strings".into(),
));
}
};
Ok(docs
.into_iter()
.map(|mut doc| {
for field in &fields {
if field.contains('.') {
remove_nested_value(&mut doc, field);
} else {
doc.remove(field);
}
}
doc
})
.collect())
}
fn stage_lookup( fn stage_lookup(
docs: Vec<Document>, docs: Vec<Document>,
spec: &Bson, spec: &Bson,
@@ -254,20 +328,29 @@ impl AggregationEngine {
) -> Result<Vec<Document>, QueryError> { ) -> Result<Vec<Document>, QueryError> {
let lookup = match spec { let lookup = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$lookup requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$lookup requires a document".into(),
))
}
}; };
let from = lookup.get_str("from") let from = lookup
.get_str("from")
.map_err(|_| QueryError::AggregationError("$lookup requires 'from'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'from'".into()))?;
let local_field = lookup.get_str("localField") let local_field = lookup
.get_str("localField")
.map_err(|_| QueryError::AggregationError("$lookup requires 'localField'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'localField'".into()))?;
let foreign_field = lookup.get_str("foreignField") let foreign_field = lookup
.get_str("foreignField")
.map_err(|_| QueryError::AggregationError("$lookup requires 'foreignField'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'foreignField'".into()))?;
let as_field = lookup.get_str("as") let as_field = lookup
.get_str("as")
.map_err(|_| QueryError::AggregationError("$lookup requires 'as'".into()))?; .map_err(|_| QueryError::AggregationError("$lookup requires 'as'".into()))?;
let resolver = resolver let resolver = resolver.ok_or_else(|| {
.ok_or_else(|| QueryError::AggregationError("$lookup requires a collection resolver".into()))?; QueryError::AggregationError("$lookup requires a collection resolver".into())
})?;
let foreign_docs = resolver.resolve(db, from)?; let foreign_docs = resolver.resolve(db, from)?;
Ok(docs Ok(docs
@@ -299,7 +382,11 @@ impl AggregationEngine {
) -> Result<Vec<Document>, QueryError> { ) -> Result<Vec<Document>, QueryError> {
let facets = match spec { let facets = match spec {
Bson::Document(d) => d, Bson::Document(d) => d,
_ => return Err(QueryError::AggregationError("$facet requires a document".into())), _ => {
return Err(QueryError::AggregationError(
"$facet requires a document".into(),
))
}
}; };
let mut result = Document::new(); let mut result = Document::new();
@@ -337,22 +424,32 @@ impl AggregationEngine {
let (coll, pipeline) = match spec { let (coll, pipeline) = match spec {
Bson::String(s) => (s.as_str(), None), Bson::String(s) => (s.as_str(), None),
Bson::Document(d) => { Bson::Document(d) => {
let coll = d.get_str("coll") let coll = d.get_str("coll").map_err(|_| {
.map_err(|_| QueryError::AggregationError("$unionWith requires 'coll'".into()))?; QueryError::AggregationError("$unionWith requires 'coll'".into())
})?;
let pipeline = d.get_array("pipeline").ok().map(|arr| { let pipeline = d.get_array("pipeline").ok().map(|arr| {
arr.iter() arr.iter()
.filter_map(|s| { .filter_map(|s| {
if let Bson::Document(d) = s { Some(d.clone()) } else { None } if let Bson::Document(d) = s {
Some(d.clone())
} else {
None
}
}) })
.collect::<Vec<Document>>() .collect::<Vec<Document>>()
}); });
(coll, pipeline) (coll, pipeline)
} }
_ => return Err(QueryError::AggregationError("$unionWith requires a string or document".into())), _ => {
return Err(QueryError::AggregationError(
"$unionWith requires a string or document".into(),
))
}
}; };
let resolver = resolver let resolver = resolver.ok_or_else(|| {
.ok_or_else(|| QueryError::AggregationError("$unionWith requires a collection resolver".into()))?; QueryError::AggregationError("$unionWith requires a collection resolver".into())
})?;
let mut other_docs = resolver.resolve(db, coll)?; let mut other_docs = resolver.resolve(db, coll)?;
if let Some(p) = pipeline { if let Some(p) = pipeline {
@@ -476,7 +573,11 @@ fn accumulate_min(docs: &[Document], expr: &Bson) -> Bson {
None => val, None => val,
Some(current) => { Some(current) => {
if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) { if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) {
if vv < cv { val } else { current } if vv < cv {
val
} else {
current
}
} else { } else {
current current
} }
@@ -499,7 +600,11 @@ fn accumulate_max(docs: &[Document], expr: &Bson) -> Bson {
None => val, None => val,
Some(current) => { Some(current) => {
if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) { if let (Some(cv), Some(vv)) = (bson_to_f64(&current), bson_to_f64(&val)) {
if vv > cv { val } else { current } if vv > cv {
val
} else {
current
}
} else { } else {
current current
} }
+107 -21
View File
@@ -1,7 +1,8 @@
use bson::{Bson, Document, doc}; use bson::{doc, Bson, Document};
use crate::aggregation::AggregationEngine;
use crate::error::QueryError; use crate::error::QueryError;
use crate::field_path::{get_nested_value, set_nested_value, remove_nested_value}; use crate::field_path::{get_nested_value, remove_nested_value, set_nested_value};
use crate::matcher::QueryMatcher; use crate::matcher::QueryMatcher;
/// Update engine — applies update operators to documents. /// Update engine — applies update operators to documents.
@@ -56,6 +57,46 @@ impl UpdateEngine {
Ok(result) Ok(result)
} }
/// Apply an aggregation pipeline update specification to a document.
pub fn apply_pipeline_update(
doc: &Document,
pipeline: &[Document],
) -> Result<Document, QueryError> {
if pipeline.is_empty() {
return Err(QueryError::InvalidUpdate(
"aggregation pipeline update cannot be empty".into(),
));
}
for stage in pipeline {
let (stage_name, _) = stage.iter().next().ok_or_else(|| {
QueryError::InvalidUpdate(
"aggregation pipeline update stages must not be empty".into(),
)
})?;
if !matches!(
stage_name.as_str(),
"$addFields" | "$set" | "$project" | "$unset" | "$replaceRoot" | "$replaceWith"
) {
return Err(QueryError::InvalidUpdate(format!(
"Unsupported aggregation pipeline update stage: {}",
stage_name
)));
}
}
let mut results = AggregationEngine::aggregate(vec![doc.clone()], pipeline, None, "")
.map_err(|e| QueryError::InvalidUpdate(e.to_string()))?;
match results.len() {
1 => Ok(results.remove(0)),
_ => Err(QueryError::InvalidUpdate(
"aggregation pipeline update must produce exactly one document".into(),
)),
}
}
/// Apply $setOnInsert fields (used during upsert only). /// Apply $setOnInsert fields (used during upsert only).
pub fn apply_set_on_insert(doc: &mut Document, fields: &Document) { pub fn apply_set_on_insert(doc: &mut Document, fields: &Document) {
for (key, value) in fields { for (key, value) in fields {
@@ -252,16 +293,14 @@ impl UpdateEngine {
for (key, spec) in fields { for (key, spec) in fields {
let value = match spec { let value = match spec {
Bson::Boolean(true) => Bson::DateTime(now), Bson::Boolean(true) => Bson::DateTime(now),
Bson::Document(d) => { Bson::Document(d) => match d.get_str("$type").unwrap_or("date") {
match d.get_str("$type").unwrap_or("date") { "date" => Bson::DateTime(now),
"date" => Bson::DateTime(now), "timestamp" => Bson::Timestamp(bson::Timestamp {
"timestamp" => Bson::Timestamp(bson::Timestamp { time: (now.timestamp_millis() / 1000) as u32,
time: (now.timestamp_millis() / 1000) as u32, increment: 0,
increment: 0, }),
}), _ => Bson::DateTime(now),
_ => Bson::DateTime(now), },
}
}
_ => continue, _ => continue,
}; };
@@ -282,7 +321,9 @@ impl UpdateEngine {
Bson::Document(d) if d.contains_key("$each") => { Bson::Document(d) if d.contains_key("$each") => {
let each = match d.get("$each") { let each = match d.get("$each") {
Some(Bson::Array(a)) => a.clone(), Some(Bson::Array(a)) => a.clone(),
_ => return Err(QueryError::InvalidUpdate("$each must be an array".into())), _ => {
return Err(QueryError::InvalidUpdate("$each must be an array".into()))
}
}; };
let position = d.get("$position").and_then(|v| match v { let position = d.get("$position").and_then(|v| match v {
@@ -325,11 +366,21 @@ impl UpdateEngine {
continue; continue;
} }
match direction { match direction {
Bson::Int32(-1) | Bson::Int64(-1) => { arr.remove(0); } Bson::Int32(-1) | Bson::Int64(-1) => {
Bson::Int32(1) | Bson::Int64(1) => { arr.pop(); } arr.remove(0);
Bson::Double(f) if *f == 1.0 => { arr.pop(); } }
Bson::Double(f) if *f == -1.0 => { arr.remove(0); } Bson::Int32(1) | Bson::Int64(1) => {
_ => { arr.pop(); } arr.pop();
}
Bson::Double(f) if *f == 1.0 => {
arr.pop();
}
Bson::Double(f) if *f == -1.0 => {
arr.remove(0);
}
_ => {
arr.pop();
}
} }
} }
} }
@@ -455,7 +506,11 @@ impl UpdateEngine {
let ascending = *dir > 0; let ascending = *dir > 0;
arr.sort_by(|a, b| { arr.sort_by(|a, b| {
let ord = partial_cmp_bson(a, b); let ord = partial_cmp_bson(a, b);
if ascending { ord } else { ord.reverse() } if ascending {
ord
} else {
ord.reverse()
}
}); });
} }
Bson::Document(spec) => { Bson::Document(spec) => {
@@ -465,8 +520,16 @@ impl UpdateEngine {
Bson::Int32(n) => *n > 0, Bson::Int32(n) => *n > 0,
_ => true, _ => true,
}; };
let a_val = if let Bson::Document(d) = a { d.get(field) } else { None }; let a_val = if let Bson::Document(d) = a {
let b_val = if let Bson::Document(d) = b { d.get(field) } else { None }; d.get(field)
} else {
None
};
let b_val = if let Bson::Document(d) = b {
d.get(field)
} else {
None
};
let ord = match (a_val, b_val) { let ord = match (a_val, b_val) {
(Some(av), Some(bv)) => partial_cmp_bson(av, bv), (Some(av), Some(bv)) => partial_cmp_bson(av, bv),
(Some(_), None) => std::cmp::Ordering::Greater, (Some(_), None) => std::cmp::Ordering::Greater,
@@ -572,4 +635,27 @@ mod tests {
let tags = result.get_array("tags").unwrap(); let tags = result.get_array("tags").unwrap();
assert_eq!(tags.len(), 2); // no duplicate assert_eq!(tags.len(), 2); // no duplicate
} }
#[test]
fn test_pipeline_update() {
let doc = doc! { "_id": 1, "name": "Alice", "age": 30, "legacy": true };
let pipeline = vec![
doc! { "$set": { "displayName": "$name", "status": "updated" } },
doc! { "$unset": ["legacy"] },
];
let result = UpdateEngine::apply_pipeline_update(&doc, &pipeline).unwrap();
assert_eq!(result.get_str("displayName").unwrap(), "Alice");
assert_eq!(result.get_str("status").unwrap(), "updated");
assert!(result.get("legacy").is_none());
}
#[test]
fn test_pipeline_update_rejects_unsupported_stage() {
let doc = doc! { "_id": 1, "name": "Alice" };
let pipeline = vec![doc! { "$match": { "name": "Alice" } }];
let result = UpdateEngine::apply_pipeline_update(&doc, &pipeline);
assert!(matches!(result, Err(QueryError::InvalidUpdate(_))));
}
} }
+83 -1
View File
@@ -1,6 +1,6 @@
import { expect, tap } from '@git.zone/tstest/tapbundle'; import { expect, tap } from '@git.zone/tstest/tapbundle';
import * as smartdb from '../ts/index.js'; import * as smartdb from '../ts/index.js';
import { MongoClient, Db, Collection } from 'mongodb'; import { MongoClient, Db, Collection, ObjectId } from 'mongodb';
let server: smartdb.SmartdbServer; let server: smartdb.SmartdbServer;
let client: MongoClient; let client: MongoClient;
@@ -252,6 +252,71 @@ tap.test('smartdb: update - upsert creates new document', async () => {
expect(inserted!.email).toEqual('new@example.com'); expect(inserted!.email).toEqual('new@example.com');
}); });
tap.test('smartdb: update - aggregation pipeline updateOne', async () => {
const collection = db.collection('users');
await collection.insertOne({ name: 'PipelineUser', source: 'alpha', legacy: true, visits: 2 });
const result = await collection.updateOne(
{ name: 'PipelineUser' },
[
{ $set: { sourceCopy: '$source', pipelineStatus: 'updated' } },
{ $unset: ['legacy'] },
]
);
expect(result.matchedCount).toEqual(1);
expect(result.modifiedCount).toEqual(1);
const updated = await collection.findOne({ name: 'PipelineUser' });
expect(updated).toBeTruthy();
expect(updated!.sourceCopy).toEqual('alpha');
expect(updated!.pipelineStatus).toEqual('updated');
expect(updated!.legacy).toBeUndefined();
});
tap.test('smartdb: update - aggregation pipeline upsert', async () => {
const collection = db.collection('users');
const result = await collection.updateOne(
{ name: 'PipelineUpsert' },
[
{ $set: { email: 'pipeline@example.com', status: 'new', mirroredName: '$name' } },
],
{ upsert: true }
);
expect(result.upsertedCount).toEqual(1);
const inserted = await collection.findOne({ name: 'PipelineUpsert' });
expect(inserted).toBeTruthy();
expect(inserted!.email).toEqual('pipeline@example.com');
expect(inserted!.status).toEqual('new');
expect(inserted!.mirroredName).toEqual('PipelineUpsert');
});
tap.test('smartdb: update - cannot modify immutable _id through pipeline', async () => {
const collection = db.collection('users');
const inserted = await collection.insertOne({ name: 'ImmutableIdUser' });
let threw = false;
try {
await collection.updateOne(
{ _id: inserted.insertedId },
[
{ $set: { _id: new ObjectId() } },
]
);
} catch (err: any) {
threw = true;
expect(err.code).toEqual(66);
}
expect(threw).toBeTrue();
const persisted = await collection.findOne({ _id: inserted.insertedId });
expect(persisted).toBeTruthy();
expect(persisted!.name).toEqual('ImmutableIdUser');
});
// ============================================================================ // ============================================================================
// Cursor Tests // Cursor Tests
// ============================================================================ // ============================================================================
@@ -306,6 +371,23 @@ tap.test('smartdb: findOneAndUpdate - returns updated document', async () => {
expect(result!.status).toEqual('active'); expect(result!.status).toEqual('active');
}); });
tap.test('smartdb: findOneAndUpdate - supports aggregation pipeline updates', async () => {
const collection = db.collection('users');
await collection.insertOne({ name: 'PipelineFindAndModify', sourceName: 'Finder' });
const result = await collection.findOneAndUpdate(
{ name: 'PipelineFindAndModify' },
[
{ $set: { displayName: '$sourceName', mode: 'pipeline' } },
],
{ returnDocument: 'after' }
);
expect(result).toBeTruthy();
expect(result!.displayName).toEqual('Finder');
expect(result!.mode).toEqual('pipeline');
});
tap.test('smartdb: findOneAndDelete - returns deleted document', async () => { tap.test('smartdb: findOneAndDelete - returns deleted document', async () => {
const collection = db.collection('users'); const collection = db.collection('users');
+1 -1
View File
@@ -3,6 +3,6 @@
*/ */
export const commitinfo = { export const commitinfo = {
name: '@push.rocks/smartdb', name: '@push.rocks/smartdb',
version: '2.6.2', version: '2.7.0',
description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.' description: 'A MongoDB-compatible embedded database server with wire protocol support, backed by a high-performance Rust engine.'
} }