use bson::{doc, Bson, Document}; use rustdb_index::IndexEngine; use tracing::debug; use crate::context::{CommandContext, ConnectionState, CursorState}; use crate::error::{CommandError, CommandResult}; use crate::transactions; /// Handle various admin / diagnostic / session / auth commands. pub async fn handle( cmd: &Document, db: &str, ctx: &CommandContext, command_name: &str, connection: &ConnectionState, ) -> CommandResult { match command_name { "ping" => Ok(doc! { "ok": 1.0 }), "buildInfo" | "buildinfo" => Ok(doc! { "version": "7.0.0", "gitVersion": "unknown", "modules": [], "sysInfo": "rustdb", "versionArray": [7_i32, 0_i32, 0_i32, 0_i32], "ok": 1.0, }), "serverStatus" => handle_server_status(ctx), "hostInfo" => Ok(doc! { "system": { "hostname": "localhost", }, "ok": 1.0, }), "whatsmyuri" => Ok(doc! { "you": "127.0.0.1:0", "ok": 1.0, }), "getLog" => { let _log_type = cmd.get_str("getLog").unwrap_or("global"); Ok(doc! { "totalLinesWritten": 0_i32, "log": [], "ok": 1.0, }) } "replSetGetStatus" => { // Not a replica set. Ok(doc! { "ok": 0.0, "errmsg": "not running with --replSet", "code": 76_i32, "codeName": "NoReplicationEnabled", }) } "getCmdLineOpts" => Ok(doc! { "argv": ["rustdb"], "parsed": {}, "ok": 1.0, }), "getParameter" => Ok(doc! { "ok": 1.0, }), "getFreeMonitoringStatus" | "setFreeMonitoring" => Ok(doc! { "state": "disabled", "ok": 1.0, }), "getShardMap" | "shardingState" => Ok(doc! { "enabled": false, "ok": 1.0, }), "atlasVersion" => Ok(doc! { "ok": 0.0, "errmsg": "not supported", "code": 59_i32, "codeName": "CommandNotFound", }), "connectionStatus" => Ok(handle_connection_status(connection)), "createUser" => handle_create_user(cmd, db, ctx).await, "updateUser" => handle_update_user(cmd, db, ctx).await, "dropUser" => handle_drop_user(cmd, db, ctx).await, "usersInfo" => handle_users_info(cmd, db, ctx).await, "grantRolesToUser" => handle_grant_roles_to_user(cmd, db, ctx).await, "revokeRolesFromUser" => handle_revoke_roles_from_user(cmd, db, ctx).await, "listDatabases" => handle_list_databases(cmd, ctx).await, "listCollections" => handle_list_collections(cmd, db, ctx).await, "create" => handle_create(cmd, db, ctx).await, "drop" => handle_drop(cmd, db, ctx).await, "dropDatabase" => handle_drop_database(db, ctx).await, "renameCollection" => handle_rename_collection(cmd, ctx).await, "collStats" | "validate" => handle_coll_stats(cmd, db, ctx, command_name).await, "dbStats" => handle_db_stats(db, ctx).await, "explain" => Ok(doc! { "queryPlanner": {}, "ok": 1.0, }), "startSession" => { let session_id = uuid::Uuid::new_v4().to_string(); ctx.sessions.get_or_create_session(&session_id); Ok(doc! { "id": { "id": &session_id }, "timeoutMinutes": 30_i32, "ok": 1.0, }) } "endSessions" | "killSessions" => { // Attempt to end listed sessions. if let Ok(sessions) = cmd .get_array("endSessions") .or_else(|_| cmd.get_array("killSessions")) { for s in sessions { if let Some(sid) = rustdb_txn::SessionEngine::extract_session_id(s) { ctx.sessions.end_session(&sid); } } } Ok(doc! { "ok": 1.0 }) } "commitTransaction" => transactions::commit_transaction_command(cmd, ctx).await, "abortTransaction" => transactions::abort_transaction_command(cmd, ctx), // Auth stubs - accept silently. "saslStart" => Ok(doc! { "conversationId": 1_i32, "done": true, "payload": bson::Binary { subtype: bson::spec::BinarySubtype::Generic, bytes: vec![] }, "ok": 1.0, }), "saslContinue" => Ok(doc! { "conversationId": 1_i32, "done": true, "payload": bson::Binary { subtype: bson::spec::BinarySubtype::Generic, bytes: vec![] }, "ok": 1.0, }), "authenticate" | "logout" => Ok(doc! { "ok": 1.0 }), "currentOp" => Ok(doc! { "inprog": [], "ok": 1.0, }), "killOp" | "top" | "profile" | "compact" | "reIndex" | "fsync" | "connPoolSync" => Ok(doc! { "ok": 1.0 }), other => { // Catch-all for any admin command we missed. Ok(doc! { "ok": 1.0, "note": format!("stub response for command: {}", other), }) } } } fn handle_server_status(ctx: &CommandContext) -> CommandResult { let oplog_stats = ctx.oplog.stats(); Ok(doc! { "host": "localhost", "version": "7.0.0", "process": "rustdb", "uptime": ctx.start_time.elapsed().as_secs() as i64, "connections": { "current": 0_i32, "available": i32::MAX, }, "logicalSessionRecordCache": { "activeSessionsCount": ctx.sessions.len() as i64, }, "transactions": { "currentActive": ctx.transactions.len() as i64, }, "oplog": { "currentSeq": oplog_stats.current_seq as i64, "totalEntries": oplog_stats.total_entries as i64, "oldestSeq": oplog_stats.oldest_seq as i64, "entriesByOp": { "insert": oplog_stats.inserts as i64, "update": oplog_stats.updates as i64, "delete": oplog_stats.deletes as i64, }, }, "security": { "authentication": ctx.auth.enabled(), "users": ctx.auth.user_count() as i64, }, "ok": 1.0, }) } fn handle_connection_status(connection: &ConnectionState) -> Document { let authenticated_users: Vec = connection .authenticated_users .iter() .map(|user| { Bson::Document(doc! { "user": user.username.clone(), "db": user.database.clone(), }) }) .collect(); let authenticated_roles: Vec = connection .authenticated_users .iter() .flat_map(|user| { user.roles .iter() .map(|role| Bson::Document(role_to_document(&user.database, role))) }) .collect(); doc! { "authInfo": { "authenticatedUsers": authenticated_users, "authenticatedUserRoles": authenticated_roles, }, "ok": 1.0, } } async fn handle_create_user( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let username = cmd .get_str("createUser") .map_err(|_| CommandError::InvalidArgument("missing 'createUser' field".into()))?; let password = cmd .get_str("pwd") .map_err(|_| CommandError::InvalidArgument("missing 'pwd' field".into()))?; let roles = parse_roles(cmd, db, "roles")?; ctx.auth .create_user(db, username, password, roles) .map_err(auth_error_to_command_error)?; Ok(doc! { "ok": 1.0 }) } async fn handle_update_user( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let username = cmd .get_str("updateUser") .map_err(|_| CommandError::InvalidArgument("missing 'updateUser' field".into()))?; let password = cmd.get_str("pwd").ok(); let roles = if cmd.contains_key("roles") { Some(parse_roles(cmd, db, "roles")?) } else { None }; ctx.auth .update_user(db, username, password, roles) .map_err(auth_error_to_command_error)?; Ok(doc! { "ok": 1.0 }) } async fn handle_drop_user( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let username = cmd .get_str("dropUser") .map_err(|_| CommandError::InvalidArgument("missing 'dropUser' field".into()))?; ctx.auth .drop_user(db, username) .map_err(auth_error_to_command_error)?; Ok(doc! { "ok": 1.0 }) } async fn handle_users_info( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let username = match cmd.get("usersInfo") { Some(Bson::String(name)) => Some(name.as_str()), Some(Bson::Document(user_doc)) => user_doc.get_str("user").ok(), _ => None, }; let users = ctx.auth.users_info(db, username); let user_docs: Vec = users .into_iter() .map(|user| { let roles: Vec = user .roles .iter() .map(|role| Bson::Document(role_to_document(&user.database, role))) .collect(); Bson::Document(doc! { "user": user.username, "db": user.database, "roles": roles, "mechanisms": ["SCRAM-SHA-256"], }) }) .collect(); Ok(doc! { "users": user_docs, "ok": 1.0 }) } async fn handle_grant_roles_to_user( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let username = cmd .get_str("grantRolesToUser") .map_err(|_| CommandError::InvalidArgument("missing 'grantRolesToUser' field".into()))?; let roles = parse_roles(cmd, db, "roles")?; ctx.auth .grant_roles(db, username, roles) .map_err(auth_error_to_command_error)?; Ok(doc! { "ok": 1.0 }) } async fn handle_revoke_roles_from_user( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let username = cmd .get_str("revokeRolesFromUser") .map_err(|_| CommandError::InvalidArgument("missing 'revokeRolesFromUser' field".into()))?; let roles = parse_roles(cmd, db, "roles")?; ctx.auth .revoke_roles(db, username, roles) .map_err(auth_error_to_command_error)?; Ok(doc! { "ok": 1.0 }) } fn parse_roles(cmd: &Document, db: &str, key: &str) -> CommandResult> { let role_values = cmd .get_array(key) .map_err(|_| CommandError::InvalidArgument(format!("missing '{key}' array")))?; let mut roles = Vec::with_capacity(role_values.len()); for role_value in role_values { match role_value { Bson::String(role) => roles.push(role.clone()), Bson::Document(role_doc) => { let role = role_doc .get_str("role") .map_err(|_| CommandError::InvalidArgument("role document missing 'role'".into()))?; let role_db = role_doc.get_str("db").unwrap_or(db); if role_db == db { roles.push(role.to_string()); } else { roles.push(format!("{role_db}.{role}")); } } _ => return Err(CommandError::InvalidArgument("roles must be strings or documents".into())), } } Ok(roles) } fn role_to_document(default_db: &str, role: &str) -> Document { if let Some((role_db, role_name)) = role.split_once('.') { doc! { "role": role_name, "db": role_db } } else { doc! { "role": role, "db": default_db } } } fn auth_error_to_command_error(error: rustdb_auth::AuthError) -> CommandError { match error { rustdb_auth::AuthError::UserAlreadyExists(message) => CommandError::DuplicateKey(message), rustdb_auth::AuthError::UserNotFound(message) => CommandError::NamespaceNotFound(message), rustdb_auth::AuthError::Persistence(message) => CommandError::InternalError(message), rustdb_auth::AuthError::AuthenticationFailed => CommandError::AuthenticationFailed, rustdb_auth::AuthError::InvalidPayload(message) => CommandError::InvalidArgument(message), rustdb_auth::AuthError::UnsupportedMechanism(message) => CommandError::InvalidArgument(message), rustdb_auth::AuthError::Disabled => CommandError::Unauthorized("authentication is disabled".into()), rustdb_auth::AuthError::UnknownConversation => { CommandError::InvalidArgument("unknown SASL conversation".into()) } } } /// Handle `listDatabases` command. async fn handle_list_databases( cmd: &Document, ctx: &CommandContext, ) -> CommandResult { let dbs = ctx.storage.list_databases().await?; let name_only = match cmd.get("nameOnly") { Some(Bson::Boolean(true)) => true, _ => false, }; let filter = match cmd.get("filter") { Some(Bson::Document(d)) => Some(d.clone()), _ => None, }; let mut db_docs: Vec = Vec::new(); let mut total_size: i64 = 0; for db_name in &dbs { let mut db_info = doc! { "name": db_name.as_str() }; if !name_only { // Estimate size by counting documents across collections. let mut db_size: i64 = 0; if let Ok(collections) = ctx.storage.list_collections(db_name).await { for coll in &collections { if let Ok(count) = ctx.storage.count(db_name, coll).await { // Rough estimate: 200 bytes per document. db_size += count as i64 * 200; } } } db_info.insert("sizeOnDisk", db_size); db_info.insert("empty", db_size == 0); total_size += db_size; } // Apply filter if specified. if let Some(ref f) = filter { if !rustdb_query::QueryMatcher::matches(&db_info, f) { continue; } } db_docs.push(Bson::Document(db_info)); } let mut response = doc! { "databases": db_docs, "ok": 1.0, }; if !name_only { response.insert("totalSize", total_size); } Ok(response) } /// Handle `listCollections` command. async fn handle_list_collections( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let collections = ctx.storage.list_collections(db).await?; let filter = match cmd.get("filter") { Some(Bson::Document(d)) => Some(d.clone()), _ => None, }; let name_only = match cmd.get("nameOnly") { Some(Bson::Boolean(true)) => true, _ => false, }; let batch_size = cmd .get_document("cursor") .ok() .and_then(|c| { c.get_i32("batchSize") .ok() .map(|v| v as usize) .or_else(|| c.get_i64("batchSize").ok().map(|v| v as usize)) }) .unwrap_or(usize::MAX); let ns = format!("{}.$cmd.listCollections", db); let mut coll_docs: Vec = Vec::new(); for coll_name in &collections { let info_doc = if name_only { doc! { "name": coll_name.as_str(), "type": "collection", } } else { doc! { "name": coll_name.as_str(), "type": "collection", "options": {}, "info": { "readOnly": false, }, "idIndex": { "v": 2_i32, "key": { "_id": 1_i32 }, "name": "_id_", }, } }; // Apply filter if specified. if let Some(ref f) = filter { if !rustdb_query::QueryMatcher::matches(&info_doc, f) { continue; } } coll_docs.push(info_doc); } if coll_docs.len() <= batch_size { let first_batch: Vec = coll_docs.into_iter().map(Bson::Document).collect(); Ok(doc! { "cursor": { "id": 0_i64, "ns": &ns, "firstBatch": first_batch, }, "ok": 1.0, }) } else { let first_batch: Vec = coll_docs[..batch_size] .iter() .cloned() .map(Bson::Document) .collect(); let remaining: Vec = coll_docs[batch_size..].to_vec(); let cursor_id = generate_cursor_id(); ctx.cursors.insert( cursor_id, CursorState { documents: remaining, position: 0, database: db.to_string(), collection: String::new(), }, ); Ok(doc! { "cursor": { "id": cursor_id, "ns": &ns, "firstBatch": first_batch, }, "ok": 1.0, }) } } /// Handle `create` command. async fn handle_create( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let coll = cmd .get_str("create") .map_err(|_| CommandError::InvalidArgument("missing 'create' field".into()))?; debug!(db = db, collection = coll, "create command"); // Create database (ignore AlreadyExists). if let Err(e) = ctx.storage.create_database(db).await { let msg = e.to_string(); if !msg.contains("AlreadyExists") && !msg.contains("already exists") { return Err(CommandError::StorageError(msg)); } } // Create collection. if let Err(e) = ctx.storage.create_collection(db, coll).await { let msg = e.to_string(); if msg.contains("AlreadyExists") || msg.contains("already exists") { return Err(CommandError::NamespaceExists(format!("{}.{}", db, coll))); } return Err(CommandError::StorageError(msg)); } // Initialize index engine for the new collection. let ns_key = format!("{}.{}", db, coll); ctx.indexes .entry(ns_key) .or_insert_with(IndexEngine::new); Ok(doc! { "ok": 1.0 }) } /// Handle `drop` command. async fn handle_drop( cmd: &Document, db: &str, ctx: &CommandContext, ) -> CommandResult { let coll = cmd .get_str("drop") .map_err(|_| CommandError::InvalidArgument("missing 'drop' field".into()))?; let ns_key = format!("{}.{}", db, coll); debug!(db = db, collection = coll, "drop command"); // Check if collection exists. match ctx.storage.collection_exists(db, coll).await { Ok(false) => { return Err(CommandError::NamespaceNotFound(format!( "ns not found: {}", ns_key ))); } Err(_) => {} _ => {} } // Drop from storage. ctx.storage.drop_collection(db, coll).await?; // Remove from indexes. ctx.indexes.remove(&ns_key); // Count of indexes that were on this collection (at least _id_). Ok(doc! { "ns": &ns_key, "nIndexesWas": 1_i32, "ok": 1.0, }) } /// Handle `dropDatabase` command. async fn handle_drop_database( db: &str, ctx: &CommandContext, ) -> CommandResult { debug!(db = db, "dropDatabase command"); // Remove all index entries for this database. let prefix = format!("{}.", db); let keys_to_remove: Vec = ctx .indexes .iter() .filter(|entry| entry.key().starts_with(&prefix)) .map(|entry| entry.key().clone()) .collect(); for key in keys_to_remove { ctx.indexes.remove(&key); } // Drop from storage. ctx.storage.drop_database(db).await?; Ok(doc! { "dropped": db, "ok": 1.0, }) } /// Handle `renameCollection` command. async fn handle_rename_collection( cmd: &Document, ctx: &CommandContext, ) -> CommandResult { let source_ns = cmd .get_str("renameCollection") .map_err(|_| CommandError::InvalidArgument("missing 'renameCollection' field".into()))?; let target_ns = cmd .get_str("to") .map_err(|_| CommandError::InvalidArgument("missing 'to' field".into()))?; let drop_target = match cmd.get("dropTarget") { Some(Bson::Boolean(b)) => *b, _ => false, }; // Parse "db.collection" format. let (source_db, source_coll) = parse_namespace(source_ns)?; let (target_db, target_coll) = parse_namespace(target_ns)?; debug!( source = source_ns, target = target_ns, drop_target = drop_target, "renameCollection command" ); // If cross-database rename, that's more complex. For now, support same-db rename. if source_db != target_db { return Err(CommandError::InvalidArgument( "cross-database renameCollection not yet supported".into(), )); } // If dropTarget, drop the target collection first. if drop_target { let _ = ctx.storage.drop_collection(target_db, target_coll).await; let target_ns_key = format!("{}.{}", target_db, target_coll); ctx.indexes.remove(&target_ns_key); } else { // Check if target already exists. if let Ok(true) = ctx.storage.collection_exists(target_db, target_coll).await { return Err(CommandError::NamespaceExists(target_ns.to_string())); } } // Rename in storage. ctx.storage .rename_collection(source_db, source_coll, target_coll) .await?; // Update index engine: move from old namespace to new. let source_ns_key = format!("{}.{}", source_db, source_coll); let target_ns_key = format!("{}.{}", target_db, target_coll); if let Some((_, engine)) = ctx.indexes.remove(&source_ns_key) { ctx.indexes.insert(target_ns_key, engine); } Ok(doc! { "ok": 1.0 }) } /// Handle `collStats` command. async fn handle_coll_stats( cmd: &Document, db: &str, ctx: &CommandContext, command_name: &str, ) -> CommandResult { let coll = cmd .get_str(command_name) .unwrap_or("unknown"); let ns = format!("{}.{}", db, coll); let count = ctx .storage .count(db, coll) .await .unwrap_or(0); let n_indexes = match ctx.indexes.get(&ns) { Some(engine) => engine.list_indexes().len() as i32, None => 1_i32, }; // Rough size estimate. let data_size = count as i64 * 200; Ok(doc! { "ns": &ns, "count": count as i64, "size": data_size, "avgObjSize": if count > 0 { 200_i64 } else { 0_i64 }, "storageSize": data_size, "nindexes": n_indexes, "totalIndexSize": 0_i64, "ok": 1.0, }) } /// Handle `dbStats` command. async fn handle_db_stats( db: &str, ctx: &CommandContext, ) -> CommandResult { let collections = ctx .storage .list_collections(db) .await .unwrap_or_default(); let num_collections = collections.len() as i32; let mut total_objects: i64 = 0; let mut total_indexes: i32 = 0; for coll in &collections { if let Ok(count) = ctx.storage.count(db, coll).await { total_objects += count as i64; } let ns_key = format!("{}.{}", db, coll); if let Some(engine) = ctx.indexes.get(&ns_key) { total_indexes += engine.list_indexes().len() as i32; } else { total_indexes += 1; // At least _id_. } } let data_size = total_objects * 200; Ok(doc! { "db": db, "collections": num_collections, "objects": total_objects, "avgObjSize": if total_objects > 0 { 200_i64 } else { 0_i64 }, "dataSize": data_size, "storageSize": data_size, "indexes": total_indexes, "indexSize": 0_i64, "ok": 1.0, }) } /// Parse a namespace string "db.collection" into (db, collection). fn parse_namespace(ns: &str) -> CommandResult<(&str, &str)> { let dot_pos = ns.find('.').ok_or_else(|| { CommandError::InvalidArgument(format!( "invalid namespace '{}': expected 'db.collection' format", ns )) })?; let db = &ns[..dot_pos]; let coll = &ns[dot_pos + 1..]; if db.is_empty() || coll.is_empty() { return Err(CommandError::InvalidArgument(format!( "invalid namespace '{}': db and collection must not be empty", ns ))); } Ok((db, coll)) } /// Generate a pseudo-random cursor ID. fn generate_cursor_id() -> i64 { use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hasher}; let s = RandomState::new(); let mut hasher = s.build_hasher(); hasher.write_u64( std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_nanos() as u64, ); let id = hasher.finish() as i64; if id == 0 { 1 } else { id.abs() } }