208 lines
5.5 KiB
Rust
208 lines
5.5 KiB
Rust
use std::collections::HashSet;
|
|
|
|
use bson::{doc, Bson, Document};
|
|
use rustdb_query::QueryMatcher;
|
|
use rustdb_storage::OpType;
|
|
use tracing::debug;
|
|
|
|
use crate::context::CommandContext;
|
|
use crate::error::{CommandError, CommandResult};
|
|
|
|
/// Handle the `delete` command.
|
|
pub async fn handle(
|
|
cmd: &Document,
|
|
db: &str,
|
|
ctx: &CommandContext,
|
|
) -> CommandResult<Document> {
|
|
let coll = cmd
|
|
.get_str("delete")
|
|
.map_err(|_| CommandError::InvalidArgument("missing 'delete' field".into()))?;
|
|
|
|
let deletes = cmd
|
|
.get_array("deletes")
|
|
.map_err(|_| CommandError::InvalidArgument("missing 'deletes' array".into()))?;
|
|
|
|
// Ordered flag (default true).
|
|
let ordered = match cmd.get("ordered") {
|
|
Some(Bson::Boolean(b)) => *b,
|
|
_ => true,
|
|
};
|
|
|
|
debug!(
|
|
db = db,
|
|
collection = coll,
|
|
count = deletes.len(),
|
|
"delete command"
|
|
);
|
|
|
|
let ns_key = format!("{}.{}", db, coll);
|
|
let mut total_deleted: i32 = 0;
|
|
let mut write_errors: Vec<Document> = Vec::new();
|
|
|
|
for (idx, del_spec) in deletes.iter().enumerate() {
|
|
let del_doc = match del_spec {
|
|
Bson::Document(d) => d,
|
|
_ => {
|
|
write_errors.push(doc! {
|
|
"index": idx as i32,
|
|
"code": 14_i32,
|
|
"codeName": "TypeMismatch",
|
|
"errmsg": "delete spec must be a document",
|
|
});
|
|
if ordered {
|
|
break;
|
|
}
|
|
continue;
|
|
}
|
|
};
|
|
|
|
// Extract filter (q) and limit.
|
|
let filter = match del_doc.get_document("q") {
|
|
Ok(f) => f.clone(),
|
|
Err(_) => Document::new(), // empty filter matches everything
|
|
};
|
|
|
|
let limit = match del_doc.get("limit") {
|
|
Some(Bson::Int32(n)) => *n,
|
|
Some(Bson::Int64(n)) => *n as i32,
|
|
Some(Bson::Double(n)) => *n as i32,
|
|
_ => 0, // default: delete all matches
|
|
};
|
|
|
|
match delete_matching(db, coll, &ns_key, &filter, limit, ctx).await {
|
|
Ok(count) => {
|
|
total_deleted += count;
|
|
}
|
|
Err(e) => {
|
|
write_errors.push(doc! {
|
|
"index": idx as i32,
|
|
"code": 1_i32,
|
|
"codeName": "InternalError",
|
|
"errmsg": e.to_string(),
|
|
});
|
|
if ordered {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build response.
|
|
let mut response = doc! {
|
|
"n": total_deleted,
|
|
"ok": 1.0,
|
|
};
|
|
|
|
if !write_errors.is_empty() {
|
|
response.insert(
|
|
"writeErrors",
|
|
write_errors
|
|
.into_iter()
|
|
.map(Bson::Document)
|
|
.collect::<Vec<_>>(),
|
|
);
|
|
}
|
|
|
|
Ok(response)
|
|
}
|
|
|
|
/// Find and delete documents matching a filter, returning the number deleted.
|
|
async fn delete_matching(
|
|
db: &str,
|
|
coll: &str,
|
|
ns_key: &str,
|
|
filter: &Document,
|
|
limit: i32,
|
|
ctx: &CommandContext,
|
|
) -> Result<i32, CommandError> {
|
|
// Check if the collection exists; if not, nothing to delete.
|
|
match ctx.storage.collection_exists(db, coll).await {
|
|
Ok(false) => return Ok(0),
|
|
Err(_) => return Ok(0),
|
|
Ok(true) => {}
|
|
}
|
|
|
|
// Try to use index to narrow candidates.
|
|
let candidate_ids: Option<HashSet<String>> = {
|
|
if let Some(engine) = ctx.indexes.get(ns_key) {
|
|
engine.find_candidate_ids(filter)
|
|
} else {
|
|
None
|
|
}
|
|
};
|
|
|
|
// Load candidate documents.
|
|
let docs = if let Some(ids) = candidate_ids {
|
|
if ids.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
ctx.storage
|
|
.find_by_ids(db, coll, ids)
|
|
.await
|
|
.map_err(|e| CommandError::StorageError(e.to_string()))?
|
|
} else {
|
|
ctx.storage
|
|
.find_all(db, coll)
|
|
.await
|
|
.map_err(|e| CommandError::StorageError(e.to_string()))?
|
|
};
|
|
|
|
// Apply filter to get matched documents.
|
|
let matched = QueryMatcher::filter(&docs, filter);
|
|
|
|
// Apply limit: 0 means delete all, 1 means delete only the first match.
|
|
let to_delete: &[Document] = if limit == 1 && !matched.is_empty() {
|
|
&matched[..1]
|
|
} else {
|
|
&matched
|
|
};
|
|
|
|
if to_delete.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
let mut deleted_count: i32 = 0;
|
|
|
|
for doc in to_delete {
|
|
// Extract the _id as a hex string for storage deletion.
|
|
let id_str = extract_id_string(doc)?;
|
|
|
|
ctx.storage
|
|
.delete_by_id(db, coll, &id_str)
|
|
.await
|
|
.map_err(|e| CommandError::StorageError(e.to_string()))?;
|
|
|
|
// Record in oplog.
|
|
ctx.oplog.append(
|
|
OpType::Delete,
|
|
db,
|
|
coll,
|
|
&id_str,
|
|
None,
|
|
Some(doc.clone()),
|
|
);
|
|
|
|
// Update index engine.
|
|
if let Some(mut engine) = ctx.indexes.get_mut(ns_key) {
|
|
engine.on_delete(doc);
|
|
}
|
|
|
|
deleted_count += 1;
|
|
}
|
|
|
|
Ok(deleted_count)
|
|
}
|
|
|
|
/// Extract the `_id` field from a document as a hex string suitable for the
|
|
/// storage adapter.
|
|
fn extract_id_string(doc: &Document) -> Result<String, CommandError> {
|
|
match doc.get("_id") {
|
|
Some(Bson::ObjectId(oid)) => Ok(oid.to_hex()),
|
|
Some(Bson::String(s)) => Ok(s.clone()),
|
|
Some(other) => Ok(format!("{}", other)),
|
|
None => Err(CommandError::InvalidArgument(
|
|
"document missing _id field".into(),
|
|
)),
|
|
}
|
|
}
|