feat(storage): add offline data validation and strengthen storage/index integrity checks
This commit is contained in:
@@ -3,7 +3,7 @@ use std::collections::HashMap;
|
||||
use bson::{doc, oid::ObjectId, Bson, Document};
|
||||
use rustdb_index::IndexEngine;
|
||||
use rustdb_storage::OpType;
|
||||
use tracing::{debug, warn};
|
||||
use tracing::debug;
|
||||
|
||||
use crate::context::CommandContext;
|
||||
use crate::error::{CommandError, CommandResult};
|
||||
@@ -62,6 +62,23 @@ pub async fn handle(
|
||||
doc.insert("_id", ObjectId::new());
|
||||
}
|
||||
|
||||
// Pre-check unique index constraints BEFORE storage write.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints(&doc) {
|
||||
let err_msg = e.to_string();
|
||||
write_errors.push(doc! {
|
||||
"index": idx as i32,
|
||||
"code": 11000_i32,
|
||||
"codeName": "DuplicateKey",
|
||||
"errmsg": &err_msg,
|
||||
});
|
||||
if ordered {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Attempt storage insert.
|
||||
match ctx.storage.insert_one(db, coll, doc.clone()).await {
|
||||
Ok(id_str) => {
|
||||
@@ -81,10 +98,10 @@ pub async fn handle(
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
if let Err(e) = engine.on_insert(&doc) {
|
||||
warn!(
|
||||
tracing::error!(
|
||||
namespace = %ns_key,
|
||||
error = %e,
|
||||
"index update failed after successful insert"
|
||||
"index update failed after successful insert (pre-check passed but insert failed)"
|
||||
);
|
||||
}
|
||||
inserted_count += 1;
|
||||
|
||||
@@ -150,6 +150,22 @@ async fn handle_update(
|
||||
updated.get("_id").unwrap().clone()
|
||||
};
|
||||
|
||||
// Pre-check unique index constraints before upsert insert.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints(&updated) {
|
||||
write_errors.push(doc! {
|
||||
"index": idx as i32,
|
||||
"code": 11000_i32,
|
||||
"codeName": "DuplicateKey",
|
||||
"errmsg": e.to_string(),
|
||||
});
|
||||
if ordered {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the new document.
|
||||
match ctx.storage.insert_one(db, coll, updated.clone()).await {
|
||||
Ok(id_str) => {
|
||||
@@ -168,7 +184,9 @@ async fn handle_update(
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
let _ = engine.on_insert(&updated);
|
||||
if let Err(e) = engine.on_insert(&updated) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after upsert insert");
|
||||
}
|
||||
|
||||
total_n += 1;
|
||||
upserted_list.push(doc! {
|
||||
@@ -216,6 +234,22 @@ async fn handle_update(
|
||||
array_filters.as_deref(),
|
||||
) {
|
||||
Ok(updated_doc) => {
|
||||
// Pre-check unique index constraints before storage write.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints_for_update(matched_doc, &updated_doc) {
|
||||
write_errors.push(doc! {
|
||||
"index": idx as i32,
|
||||
"code": 11000_i32,
|
||||
"codeName": "DuplicateKey",
|
||||
"errmsg": e.to_string(),
|
||||
});
|
||||
if ordered {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let id_str = extract_id_string(matched_doc);
|
||||
match ctx
|
||||
.storage
|
||||
@@ -235,7 +269,9 @@ async fn handle_update(
|
||||
|
||||
// Update index.
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
let _ = engine.on_update(matched_doc, &updated_doc);
|
||||
if let Err(e) = engine.on_update(matched_doc, &updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after update");
|
||||
}
|
||||
}
|
||||
total_n += 1;
|
||||
// Check if the document actually changed.
|
||||
@@ -444,6 +480,13 @@ async fn handle_find_and_modify(
|
||||
)
|
||||
.map_err(|e| CommandError::InternalError(e.to_string()))?;
|
||||
|
||||
// Pre-check unique index constraints before storage write.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints_for_update(&original_doc, &updated_doc) {
|
||||
return Err(CommandError::StorageError(e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let id_str = extract_id_string(&original_doc);
|
||||
ctx.storage
|
||||
.update_by_id(db, coll, &id_str, updated_doc.clone())
|
||||
@@ -461,7 +504,9 @@ async fn handle_find_and_modify(
|
||||
|
||||
// Update index.
|
||||
if let Some(mut engine) = ctx.indexes.get_mut(&ns_key) {
|
||||
let _ = engine.on_update(&original_doc, &updated_doc);
|
||||
if let Err(e) = engine.on_update(&original_doc, &updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify update");
|
||||
}
|
||||
}
|
||||
|
||||
let return_doc = if return_new {
|
||||
@@ -505,6 +550,13 @@ async fn handle_find_and_modify(
|
||||
updated_doc.get("_id").unwrap().clone()
|
||||
};
|
||||
|
||||
// Pre-check unique index constraints before upsert insert.
|
||||
if let Some(engine) = ctx.indexes.get(&ns_key) {
|
||||
if let Err(e) = engine.check_unique_constraints(&updated_doc) {
|
||||
return Err(CommandError::StorageError(e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let inserted_id_str = ctx.storage
|
||||
.insert_one(db, coll, updated_doc.clone())
|
||||
.await?;
|
||||
@@ -525,7 +577,9 @@ async fn handle_find_and_modify(
|
||||
.indexes
|
||||
.entry(ns_key.clone())
|
||||
.or_insert_with(IndexEngine::new);
|
||||
let _ = engine.on_insert(&updated_doc);
|
||||
if let Err(e) = engine.on_insert(&updated_doc) {
|
||||
tracing::error!(namespace = %ns_key, error = %e, "index update failed after findAndModify upsert");
|
||||
}
|
||||
}
|
||||
|
||||
let value = if return_new {
|
||||
|
||||
Reference in New Issue
Block a user