use std::collections::HashSet; use bson::{doc, oid::ObjectId, Bson, Document}; use rustdb_index::IndexEngine; use rustdb_query::{QueryMatcher, UpdateEngine, sort_documents, apply_projection}; use rustdb_storage::OpType; use tracing::debug; use crate::context::CommandContext; use crate::error::{CommandError, CommandResult}; /// Handle `update` and `findAndModify` commands. pub async fn handle( cmd: &Document, db: &str, ctx: &CommandContext, command_name: &str, ) -> CommandResult { match command_name { "findAndModify" | "findandmodify" => handle_find_and_modify(cmd, db, ctx).await, _ => handle_update(cmd, db, ctx).await, } } /// Handle the `update` command. async fn handle_update( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let coll = cmd .get_str("update") .map_err(|_| CommandError::InvalidArgument("missing 'update' field".into()))?; let updates = cmd .get_array("updates") .map_err(|_| CommandError::InvalidArgument("missing 'updates' array".into()))?; let ordered = match cmd.get("ordered") { Some(Bson::Boolean(b)) => *b, _ => true, }; debug!(db = db, collection = coll, count = updates.len(), "update command"); // Auto-create database and collection if needed. ensure_collection_exists(db, coll, ctx).await?; let ns_key = format!("{}.{}", db, coll); let mut total_n: i32 = 0; let mut total_n_modified: i32 = 0; let mut upserted_list: Vec = Vec::new(); let mut write_errors: Vec = Vec::new(); for (idx, update_bson) in updates.iter().enumerate() { let update_spec = match update_bson { Bson::Document(d) => d, _ => { write_errors.push(doc! { "index": idx as i32, "code": 14_i32, "codeName": "TypeMismatch", "errmsg": "update spec must be a document", }); if ordered { break; } continue; } }; let filter = match update_spec.get("q") { Some(Bson::Document(d)) => d.clone(), _ => Document::new(), }; let update = match update_spec.get("u") { Some(Bson::Document(d)) => d.clone(), Some(Bson::Array(_pipeline)) => { // Aggregation pipeline updates are not yet supported; treat as error. write_errors.push(doc! { "index": idx as i32, "code": 14_i32, "codeName": "TypeMismatch", "errmsg": "aggregation pipeline updates not yet supported", }); if ordered { break; } continue; } _ => { write_errors.push(doc! { "index": idx as i32, "code": 14_i32, "codeName": "TypeMismatch", "errmsg": "missing or invalid 'u' field in update spec", }); if ordered { break; } continue; } }; let multi = match update_spec.get("multi") { Some(Bson::Boolean(b)) => *b, _ => false, }; let upsert = match update_spec.get("upsert") { Some(Bson::Boolean(b)) => *b, _ => false, }; let array_filters: Option> = update_spec.get_array("arrayFilters").ok().map(|arr| { arr.iter() .filter_map(|v| { if let Bson::Document(d) = v { Some(d.clone()) } else { None } }) .collect() }); // Load all documents and filter. let all_docs = load_filtered_docs(db, coll, &filter, &ns_key, ctx).await?; if all_docs.is_empty() && upsert { // Upsert: create a new document. let new_doc = build_upsert_doc(&filter); // Apply update operators or replacement. match UpdateEngine::apply_update(&new_doc, &update, array_filters.as_deref()) { Ok(mut updated) => { // Apply $setOnInsert if present. if let Some(Bson::Document(soi)) = update.get("$setOnInsert") { UpdateEngine::apply_set_on_insert(&mut updated, soi); } // Ensure _id exists. let new_id = if !updated.contains_key("_id") { let oid = ObjectId::new(); updated.insert("_id", oid); Bson::ObjectId(oid) } else { updated.get("_id").unwrap().clone() }; // Insert the new document. match ctx.storage.insert_one(db, coll, updated.clone()).await { Ok(id_str) => { // Record upsert in oplog as an insert. ctx.oplog.append( OpType::Insert, db, coll, &id_str, Some(updated.clone()), None, ); // Update index. let mut engine = ctx .indexes .entry(ns_key.clone()) .or_insert_with(IndexEngine::new); let _ = engine.on_insert(&updated); total_n += 1; upserted_list.push(doc! { "index": idx as i32, "_id": new_id, }); } Err(e) => { write_errors.push(doc! { "index": idx as i32, "code": 1_i32, "codeName": "InternalError", "errmsg": e.to_string(), }); if ordered { break; } } } } Err(e) => { write_errors.push(doc! { "index": idx as i32, "code": 14_i32, "codeName": "TypeMismatch", "errmsg": e.to_string(), }); if ordered { break; } } } } else { // Update matched documents. let docs_to_update = if multi { all_docs } else { all_docs.into_iter().take(1).collect() }; for matched_doc in &docs_to_update { match UpdateEngine::apply_update( matched_doc, &update, array_filters.as_deref(), ) { Ok(updated_doc) => { let id_str = extract_id_string(matched_doc); match ctx .storage .update_by_id(db, coll, &id_str, updated_doc.clone()) .await { Ok(()) => { // Record in oplog. ctx.oplog.append( OpType::Update, db, coll, &id_str, Some(updated_doc.clone()), Some(matched_doc.clone()), ); // Update index. if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { let _ = engine.on_update(matched_doc, &updated_doc); } total_n += 1; // Check if the document actually changed. if matched_doc != &updated_doc { total_n_modified += 1; } } Err(e) => { write_errors.push(doc! { "index": idx as i32, "code": 1_i32, "codeName": "InternalError", "errmsg": e.to_string(), }); if ordered { break; } } } } Err(e) => { write_errors.push(doc! { "index": idx as i32, "code": 14_i32, "codeName": "TypeMismatch", "errmsg": e.to_string(), }); if ordered { break; } } } } } } // Build response. let mut response = doc! { "n": total_n, "nModified": total_n_modified, "ok": 1.0, }; if !upserted_list.is_empty() { response.insert( "upserted", upserted_list .into_iter() .map(Bson::Document) .collect::>(), ); } if !write_errors.is_empty() { response.insert( "writeErrors", write_errors .into_iter() .map(Bson::Document) .collect::>(), ); } Ok(response) } /// Handle the `findAndModify` command. async fn handle_find_and_modify( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let coll = cmd .get_str("findAndModify") .or_else(|_| cmd.get_str("findandmodify")) .map_err(|_| CommandError::InvalidArgument("missing 'findAndModify' field".into()))?; let query = match cmd.get("query") { Some(Bson::Document(d)) => d.clone(), _ => Document::new(), }; let sort = match cmd.get("sort") { Some(Bson::Document(d)) => Some(d.clone()), _ => None, }; let update_doc = match cmd.get("update") { Some(Bson::Document(d)) => Some(d.clone()), _ => None, }; let remove = match cmd.get("remove") { Some(Bson::Boolean(b)) => *b, _ => false, }; let return_new = match cmd.get("new") { Some(Bson::Boolean(b)) => *b, _ => false, }; let upsert = match cmd.get("upsert") { Some(Bson::Boolean(b)) => *b, _ => false, }; let fields = match cmd.get("fields") { Some(Bson::Document(d)) => Some(d.clone()), _ => None, }; let array_filters: Option> = cmd.get_array("arrayFilters").ok().map(|arr| { arr.iter() .filter_map(|v| { if let Bson::Document(d) = v { Some(d.clone()) } else { None } }) .collect() }); // Auto-create database and collection. ensure_collection_exists(db, coll, ctx).await?; let ns_key = format!("{}.{}", db, coll); // Load and filter documents. let mut matched = load_filtered_docs(db, coll, &query, &ns_key, ctx).await?; // Sort if specified. if let Some(ref sort_spec) = sort { sort_documents(&mut matched, sort_spec); } // Take the first match. let target = matched.into_iter().next(); if remove { // Remove operation. if let Some(ref doc) = target { let id_str = extract_id_string(doc); ctx.storage.delete_by_id(db, coll, &id_str).await?; // Record in oplog. ctx.oplog.append( OpType::Delete, db, coll, &id_str, None, Some(doc.clone()), ); // Update index. if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { engine.on_delete(doc); } let value = apply_fields_projection(doc, &fields); return Ok(doc! { "value": value, "lastErrorObject": { "n": 1_i32, "updatedExisting": false, }, "ok": 1.0, }); } else { return Ok(doc! { "value": Bson::Null, "lastErrorObject": { "n": 0_i32, "updatedExisting": false, }, "ok": 1.0, }); } } // Update operation. let update = match update_doc { Some(u) => u, None => { return Ok(doc! { "value": Bson::Null, "lastErrorObject": { "n": 0_i32, "updatedExisting": false, }, "ok": 1.0, }); } }; if let Some(original_doc) = target { // Update the matched document. let updated_doc = UpdateEngine::apply_update( &original_doc, &update, array_filters.as_deref(), ) .map_err(|e| CommandError::InternalError(e.to_string()))?; let id_str = extract_id_string(&original_doc); ctx.storage .update_by_id(db, coll, &id_str, updated_doc.clone()) .await?; // Record in oplog. ctx.oplog.append( OpType::Update, db, coll, &id_str, Some(updated_doc.clone()), Some(original_doc.clone()), ); // Update index. if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) { let _ = engine.on_update(&original_doc, &updated_doc); } let return_doc = if return_new { &updated_doc } else { &original_doc }; let value = apply_fields_projection(return_doc, &fields); Ok(doc! { "value": value, "lastErrorObject": { "n": 1_i32, "updatedExisting": true, }, "ok": 1.0, }) } else if upsert { // Upsert: create a new document. let new_doc = build_upsert_doc(&query); let mut updated_doc = UpdateEngine::apply_update( &new_doc, &update, array_filters.as_deref(), ) .map_err(|e| CommandError::InternalError(e.to_string()))?; // Apply $setOnInsert if present. if let Some(Bson::Document(soi)) = update.get("$setOnInsert") { UpdateEngine::apply_set_on_insert(&mut updated_doc, soi); } // Ensure _id. let upserted_id = if !updated_doc.contains_key("_id") { let oid = ObjectId::new(); updated_doc.insert("_id", oid); Bson::ObjectId(oid) } else { updated_doc.get("_id").unwrap().clone() }; let inserted_id_str = ctx.storage .insert_one(db, coll, updated_doc.clone()) .await?; // Record upsert in oplog as an insert. ctx.oplog.append( OpType::Insert, db, coll, &inserted_id_str, Some(updated_doc.clone()), None, ); // Update index. { let mut engine = ctx .indexes .entry(ns_key.clone()) .or_insert_with(IndexEngine::new); let _ = engine.on_insert(&updated_doc); } let value = if return_new { apply_fields_projection(&updated_doc, &fields) } else { Bson::Null }; Ok(doc! { "value": value, "lastErrorObject": { "n": 1_i32, "updatedExisting": false, "upserted": upserted_id, }, "ok": 1.0, }) } else { Ok(doc! { "value": Bson::Null, "lastErrorObject": { "n": 0_i32, "updatedExisting": false, }, "ok": 1.0, }) } } // ---- Helpers ---- /// Load documents from storage, optionally using index for candidate narrowing, then filter. async fn load_filtered_docs( db: &str, coll: &str, filter: &Document, ns_key: &str, ctx: &CommandContext, ) -> CommandResult> { // Try to use index to narrow candidates. let candidate_ids: Option> = ctx .indexes .get(ns_key) .and_then(|engine| engine.find_candidate_ids(filter)); let docs = if let Some(ids) = candidate_ids { if ids.is_empty() { return Ok(Vec::new()); } ctx.storage.find_by_ids(db, coll, ids).await? } else { ctx.storage.find_all(db, coll).await? }; // Apply filter. if filter.is_empty() { Ok(docs) } else { Ok(QueryMatcher::filter(&docs, filter)) } } /// Build a base document for an upsert from the filter's equality conditions. fn build_upsert_doc(filter: &Document) -> Document { let mut doc = Document::new(); for (key, value) in filter { if key.starts_with('$') { // Skip top-level operators like $and, $or. continue; } match value { Bson::Document(d) if d.keys().any(|k| k.starts_with('$')) => { // If the value has operators (e.g., $gt), extract $eq if present. if let Some(eq_val) = d.get("$eq") { doc.insert(key.clone(), eq_val.clone()); } } _ => { doc.insert(key.clone(), value.clone()); } } } doc } /// Extract _id as a string for storage operations. fn extract_id_string(doc: &Document) -> String { match doc.get("_id") { Some(Bson::ObjectId(oid)) => oid.to_hex(), Some(Bson::String(s)) => s.clone(), Some(other) => format!("{}", other), None => String::new(), } } /// Apply fields projection if specified, returning Bson. fn apply_fields_projection(doc: &Document, fields: &Option) -> Bson { match fields { Some(proj) if !proj.is_empty() => Bson::Document(apply_projection(doc, proj)), _ => Bson::Document(doc.clone()), } } /// Ensure the target database and collection exist, creating them if needed. async fn ensure_collection_exists( db: &str, coll: &str, ctx: &CommandContext, ) -> CommandResult<()> { if let Err(e) = ctx.storage.create_database(db).await { let msg = e.to_string(); if !msg.contains("AlreadyExists") && !msg.contains("already exists") { return Err(CommandError::StorageError(msg)); } } match ctx.storage.collection_exists(db, coll).await { Ok(true) => {} Ok(false) => { if let Err(e) = ctx.storage.create_collection(db, coll).await { let msg = e.to_string(); if !msg.contains("AlreadyExists") && !msg.contains("already exists") { return Err(CommandError::StorageError(msg)); } } } Err(e) => { if let Err(e2) = ctx.storage.create_collection(db, coll).await { let msg = e2.to_string(); if !msg.contains("AlreadyExists") && !msg.contains("already exists") { return Err(CommandError::StorageError(format!( "collection_exists failed: {e}; create_collection failed: {msg}" ))); } } } } Ok(()) }