feat(transactions): add single-node transaction support with session-aware reads, commits, aborts, and transaction metrics

This commit is contained in:
2026-04-29 22:14:46 +00:00
parent e79fe339aa
commit b72e8ed5e7
19 changed files with 913 additions and 77 deletions
@@ -7,6 +7,7 @@ use rustdb_query::{QueryMatcher, sort_documents, apply_projection, distinct_valu
use crate::context::{CommandContext, CursorState};
use crate::error::{CommandError, CommandResult};
use crate::transactions;
/// Atomic counter for generating unique cursor IDs.
static CURSOR_ID_COUNTER: AtomicI64 = AtomicI64::new(1);
@@ -80,9 +81,14 @@ pub async fn handle(
let limit = get_i64(cmd, "limit").unwrap_or(0).max(0) as usize;
let batch_size = get_i32(cmd, "batchSize").unwrap_or(101).max(0) as usize;
let single_batch = get_bool(cmd, "singleBatch").unwrap_or(false);
let txn_id = transactions::active_transaction_id(ctx, cmd);
// If the collection does not exist, return an empty cursor.
let exists = ctx.storage.collection_exists(db, coll).await?;
let exists = if txn_id.is_some() {
true
} else {
ctx.storage.collection_exists(db, coll).await?
};
if !exists {
return Ok(doc! {
"cursor": {
@@ -96,7 +102,9 @@ pub async fn handle(
// Try index-accelerated lookup.
let index_key = format!("{}.{}", db, coll);
let docs = if let Some(idx_ref) = ctx.indexes.get(&index_key) {
let docs = if let Some(ref txn_id) = txn_id {
transactions::load_transaction_docs(ctx, txn_id, db, coll).await?
} else if let Some(idx_ref) = ctx.indexes.get(&index_key) {
if let Some(candidate_ids) = idx_ref.find_candidate_ids(&filter) {
debug!(
ns = %ns,
@@ -298,9 +306,14 @@ pub async fn handle_count(
ctx: &CommandContext,
) -> CommandResult<Document> {
let coll = get_str(cmd, "count").unwrap_or("unknown");
let txn_id = transactions::active_transaction_id(ctx, cmd);
// Check collection existence.
let exists = ctx.storage.collection_exists(db, coll).await?;
let exists = if txn_id.is_some() {
true
} else {
ctx.storage.collection_exists(db, coll).await?
};
if !exists {
return Ok(doc! { "n": 0_i64, "ok": 1.0 });
}
@@ -309,6 +322,23 @@ pub async fn handle_count(
let skip = get_i64(cmd, "skip").unwrap_or(0).max(0) as usize;
let limit = get_i64(cmd, "limit").unwrap_or(0).max(0) as usize;
if let Some(ref txn_id) = txn_id {
let docs = transactions::load_transaction_docs(ctx, txn_id, db, coll).await?;
let filtered = if query.is_empty() {
docs
} else {
QueryMatcher::filter(&docs, &query)
};
let mut n = filtered.len().saturating_sub(skip);
if limit > 0 {
n = n.min(limit);
}
return Ok(doc! {
"n": n as i64,
"ok": 1.0,
});
}
let count: u64 = if query.is_empty() && skip == 0 && limit == 0 {
// Fast path: use storage-level count.
ctx.storage.count(db, coll).await?
@@ -352,15 +382,24 @@ pub async fn handle_distinct(
let key = get_str(cmd, "key").ok_or_else(|| {
CommandError::InvalidArgument("distinct requires a 'key' field".into())
})?;
let txn_id = transactions::active_transaction_id(ctx, cmd);
// Check collection existence.
let exists = ctx.storage.collection_exists(db, coll).await?;
let exists = if txn_id.is_some() {
true
} else {
ctx.storage.collection_exists(db, coll).await?
};
if !exists {
return Ok(doc! { "values": [], "ok": 1.0 });
}
let query = get_document(cmd, "query").cloned();
let docs = ctx.storage.find_all(db, coll).await?;
let docs = if let Some(txn_id) = txn_id {
transactions::load_transaction_docs(ctx, &txn_id, db, coll).await?
} else {
ctx.storage.find_all(db, coll).await?
};
let values = distinct_values(&docs, key, query.as_ref());
Ok(doc! {