Files
smartdb/rust/crates/rustdb-commands/src/router.rs
T

110 lines
4.1 KiB
Rust

use std::sync::Arc;
use bson::Document;
use tracing::{debug, warn};
use rustdb_wire::ParsedCommand;
use crate::context::CommandContext;
use crate::error::CommandError;
use crate::handlers;
/// Routes parsed wire protocol commands to the appropriate handler.
pub struct CommandRouter {
ctx: Arc<CommandContext>,
}
impl CommandRouter {
/// Create a new command router with the given context.
pub fn new(ctx: Arc<CommandContext>) -> Self {
Self { ctx }
}
/// Route a parsed command to the appropriate handler, returning a BSON response document.
pub async fn route(&self, cmd: &ParsedCommand) -> Document {
let db = &cmd.database;
let command_name = cmd.command_name.as_str();
debug!(command = %command_name, database = %db, "routing command");
// Extract session id if present, and touch the session.
if let Some(lsid) = cmd.command.get("lsid") {
if let Some(session_id) = rustdb_txn::SessionEngine::extract_session_id(lsid) {
self.ctx.sessions.get_or_create_session(&session_id);
}
}
let result = match command_name {
// -- handshake / monitoring --
"hello" | "ismaster" | "isMaster" => {
handlers::hello_handler::handle(&cmd.command, db, &self.ctx).await
}
// -- query commands --
"find" => {
handlers::find_handler::handle(&cmd.command, db, &self.ctx).await
}
"getMore" => {
handlers::find_handler::handle_get_more(&cmd.command, db, &self.ctx).await
}
"killCursors" => {
handlers::find_handler::handle_kill_cursors(&cmd.command, db, &self.ctx).await
}
"count" => {
handlers::find_handler::handle_count(&cmd.command, db, &self.ctx).await
}
"distinct" => {
handlers::find_handler::handle_distinct(&cmd.command, db, &self.ctx).await
}
// -- write commands --
"insert" => {
handlers::insert_handler::handle(&cmd.command, db, &self.ctx, cmd.document_sequences.as_ref()).await
}
"update" | "findAndModify" => {
handlers::update_handler::handle(&cmd.command, db, &self.ctx, command_name).await
}
"delete" => {
handlers::delete_handler::handle(&cmd.command, db, &self.ctx).await
}
// -- aggregation --
"aggregate" => {
handlers::aggregate_handler::handle(&cmd.command, db, &self.ctx).await
}
// -- index management --
"createIndexes" | "dropIndexes" | "listIndexes" => {
handlers::index_handler::handle(&cmd.command, db, &self.ctx, command_name).await
}
// -- admin commands --
"ping" | "buildInfo" | "buildinfo" | "serverStatus" | "hostInfo"
| "whatsmyuri" | "getLog" | "replSetGetStatus" | "getCmdLineOpts"
| "getParameter" | "getFreeMonitoringStatus" | "setFreeMonitoring"
| "getShardMap" | "shardingState" | "atlasVersion"
| "connectionStatus" | "listDatabases" | "listCollections"
| "create" | "drop" | "dropDatabase" | "renameCollection"
| "dbStats" | "collStats" | "validate" | "explain"
| "startSession" | "endSessions" | "killSessions"
| "commitTransaction" | "abortTransaction"
| "saslStart" | "saslContinue" | "authenticate" | "logout"
| "currentOp" | "killOp" | "top" | "profile"
| "compact" | "reIndex" | "fsync" | "connPoolSync" => {
handlers::admin_handler::handle(&cmd.command, db, &self.ctx, command_name).await
}
// -- unknown command --
other => {
warn!(command = %other, "unknown command");
Err(CommandError::NotImplemented(other.to_string()))
}
};
match result {
Ok(doc) => doc,
Err(e) => e.to_error_doc(),
}
}
}