use anyhow::Result; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, BufReader}; use tracing::{info, error}; use crate::RustDb; use rustdb_config::RustDbOptions; /// A management request from the TypeScript wrapper. #[derive(Debug, Deserialize)] pub struct ManagementRequest { pub id: String, pub method: String, #[serde(default)] pub params: serde_json::Value, } /// A management response back to the TypeScript wrapper. #[derive(Debug, Serialize)] pub struct ManagementResponse { pub id: String, pub success: bool, #[serde(skip_serializing_if = "Option::is_none")] pub result: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } /// An unsolicited event from the server to the TypeScript wrapper. #[derive(Debug, Serialize)] pub struct ManagementEvent { pub event: String, pub data: serde_json::Value, } impl ManagementResponse { fn ok(id: String, result: serde_json::Value) -> Self { Self { id, success: true, result: Some(result), error: None, } } fn err(id: String, message: String) -> Self { Self { id, success: false, result: None, error: Some(message), } } } fn send_line(line: &str) { use std::io::Write; let stdout = std::io::stdout(); let mut handle = stdout.lock(); let _ = handle.write_all(line.as_bytes()); let _ = handle.write_all(b"\n"); let _ = handle.flush(); } fn send_response(response: &ManagementResponse) { match serde_json::to_string(response) { Ok(json) => send_line(&json), Err(e) => error!("Failed to serialize management response: {}", e), } } fn send_event(event: &str, data: serde_json::Value) { let evt = ManagementEvent { event: event.to_string(), data, }; match serde_json::to_string(&evt) { Ok(json) => send_line(&json), Err(e) => error!("Failed to serialize management event: {}", e), } } /// Run the management loop, reading JSON commands from stdin and writing responses to stdout. pub async fn management_loop() -> Result<()> { let stdin = BufReader::new(tokio::io::stdin()); let mut lines = stdin.lines(); let mut db: Option = None; send_event("ready", serde_json::json!({})); loop { let line = match lines.next_line().await { Ok(Some(line)) => line, Ok(None) => { // stdin closed - parent process exited info!("Management stdin closed, shutting down"); if let Some(ref mut d) = db { let _ = d.stop().await; } break; } Err(e) => { error!("Error reading management stdin: {}", e); break; } }; let line = line.trim().to_string(); if line.is_empty() { continue; } let request: ManagementRequest = match serde_json::from_str(&line) { Ok(r) => r, Err(e) => { error!("Failed to parse management request: {}", e); send_response(&ManagementResponse::err( "unknown".to_string(), format!("Failed to parse request: {}", e), )); continue; } }; let response = handle_request(&request, &mut db).await; send_response(&response); } Ok(()) } async fn handle_request( request: &ManagementRequest, db: &mut Option, ) -> ManagementResponse { let id = request.id.clone(); match request.method.as_str() { "start" => handle_start(&id, &request.params, db).await, "stop" => handle_stop(&id, db).await, "getStatus" => handle_get_status(&id, db), "getMetrics" => handle_get_metrics(&id, db).await, "getOpLog" => handle_get_oplog(&id, &request.params, db), "getOpLogStats" => handle_get_oplog_stats(&id, db), "revertToSeq" => handle_revert_to_seq(&id, &request.params, db).await, "getCollections" => handle_get_collections(&id, &request.params, db).await, "getDocuments" => handle_get_documents(&id, &request.params, db).await, _ => ManagementResponse::err(id, format!("Unknown method: {}", request.method)), } } async fn handle_start( id: &str, params: &serde_json::Value, db: &mut Option, ) -> ManagementResponse { if db.is_some() { return ManagementResponse::err(id.to_string(), "Server is already running".to_string()); } let config = match params.get("config") { Some(config) => config, None => return ManagementResponse::err(id.to_string(), "Missing 'config' parameter".to_string()), }; let options: RustDbOptions = match serde_json::from_value(config.clone()) { Ok(o) => o, Err(e) => return ManagementResponse::err(id.to_string(), format!("Invalid config: {}", e)), }; let connection_uri = options.connection_uri(); match RustDb::new(options).await { Ok(mut d) => { match d.start().await { Ok(()) => { send_event("started", serde_json::json!({})); *db = Some(d); ManagementResponse::ok( id.to_string(), serde_json::json!({ "connectionUri": connection_uri }), ) } Err(e) => { send_event("error", serde_json::json!({"message": format!("{}", e)})); ManagementResponse::err(id.to_string(), format!("Failed to start: {}", e)) } } } Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to create server: {}", e)), } } async fn handle_stop( id: &str, db: &mut Option, ) -> ManagementResponse { match db.as_mut() { Some(d) => { match d.stop().await { Ok(()) => { *db = None; send_event("stopped", serde_json::json!({})); ManagementResponse::ok(id.to_string(), serde_json::json!({})) } Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to stop: {}", e)), } } None => ManagementResponse::ok(id.to_string(), serde_json::json!({})), } } fn handle_get_status( id: &str, db: &Option, ) -> ManagementResponse { match db.as_ref() { Some(_d) => ManagementResponse::ok( id.to_string(), serde_json::json!({ "running": true, }), ), None => ManagementResponse::ok( id.to_string(), serde_json::json!({ "running": false }), ), } } async fn handle_get_metrics( id: &str, db: &Option, ) -> ManagementResponse { match db.as_ref() { Some(d) => { let ctx = d.ctx(); let db_list = ctx.storage.list_databases().await.unwrap_or_default(); let mut total_collections = 0u64; for db_name in &db_list { if let Ok(colls) = ctx.storage.list_collections(db_name).await { total_collections += colls.len() as u64; } } let oplog_stats = ctx.oplog.stats(); let uptime_secs = ctx.start_time.elapsed().as_secs(); ManagementResponse::ok( id.to_string(), serde_json::json!({ "databases": db_list.len(), "collections": total_collections, "oplogEntries": oplog_stats.total_entries, "oplogCurrentSeq": oplog_stats.current_seq, "uptimeSeconds": uptime_secs, }), ) } None => ManagementResponse::err(id.to_string(), "Server is not running".to_string()), } } fn handle_get_oplog( id: &str, params: &serde_json::Value, db: &Option, ) -> ManagementResponse { let d = match db.as_ref() { Some(d) => d, None => return ManagementResponse::err(id.to_string(), "Server is not running".to_string()), }; let ctx = d.ctx(); let since_seq = params.get("sinceSeq").and_then(|v| v.as_u64()).unwrap_or(1); let limit = params.get("limit").and_then(|v| v.as_u64()).unwrap_or(100) as usize; let filter_db = params.get("db").and_then(|v| v.as_str()); let filter_coll = params.get("collection").and_then(|v| v.as_str()); let mut entries = ctx.oplog.entries_since(since_seq); // Apply filters. if let Some(fdb) = filter_db { entries.retain(|e| e.db == fdb); } if let Some(fcoll) = filter_coll { entries.retain(|e| e.collection == fcoll); } let total = entries.len(); entries.truncate(limit); // Serialize entries to JSON. let entries_json: Vec = entries .iter() .map(|e| { let doc_json = e.document.as_ref().map(|d| bson_doc_to_json(d)); let prev_json = e.previous_document.as_ref().map(|d| bson_doc_to_json(d)); serde_json::json!({ "seq": e.seq, "timestampMs": e.timestamp_ms, "op": match e.op { rustdb_storage::OpType::Insert => "insert", rustdb_storage::OpType::Update => "update", rustdb_storage::OpType::Delete => "delete", }, "db": e.db, "collection": e.collection, "documentId": e.document_id, "document": doc_json, "previousDocument": prev_json, }) }) .collect(); ManagementResponse::ok( id.to_string(), serde_json::json!({ "entries": entries_json, "currentSeq": ctx.oplog.current_seq(), "totalEntries": total, }), ) } fn handle_get_oplog_stats( id: &str, db: &Option, ) -> ManagementResponse { let d = match db.as_ref() { Some(d) => d, None => return ManagementResponse::err(id.to_string(), "Server is not running".to_string()), }; let stats = d.ctx().oplog.stats(); ManagementResponse::ok( id.to_string(), serde_json::json!({ "currentSeq": stats.current_seq, "totalEntries": stats.total_entries, "oldestSeq": stats.oldest_seq, "entriesByOp": { "insert": stats.inserts, "update": stats.updates, "delete": stats.deletes, }, }), ) } async fn handle_revert_to_seq( id: &str, params: &serde_json::Value, db: &Option, ) -> ManagementResponse { let d = match db.as_ref() { Some(d) => d, None => return ManagementResponse::err(id.to_string(), "Server is not running".to_string()), }; let target_seq = match params.get("seq").and_then(|v| v.as_u64()) { Some(s) => s, None => return ManagementResponse::err(id.to_string(), "Missing 'seq' parameter".to_string()), }; let dry_run = params.get("dryRun").and_then(|v| v.as_bool()).unwrap_or(false); let ctx = d.ctx(); let current = ctx.oplog.current_seq(); if target_seq > current { return ManagementResponse::err( id.to_string(), format!("Target seq {} is beyond current seq {}", target_seq, current), ); } // Collect entries to revert (from target+1 to current), sorted descending for reverse processing. let mut entries_to_revert = ctx.oplog.entries_range(target_seq + 1, current); entries_to_revert.reverse(); if dry_run { let entries_json: Vec = entries_to_revert .iter() .map(|e| { serde_json::json!({ "seq": e.seq, "op": match e.op { rustdb_storage::OpType::Insert => "insert", rustdb_storage::OpType::Update => "update", rustdb_storage::OpType::Delete => "delete", }, "db": e.db, "collection": e.collection, "documentId": e.document_id, }) }) .collect(); return ManagementResponse::ok( id.to_string(), serde_json::json!({ "dryRun": true, "reverted": entries_to_revert.len(), "entries": entries_json, }), ); } // Execute revert: process each entry in reverse, using storage directly. let mut reverted = 0u64; let mut errors: Vec = Vec::new(); for entry in &entries_to_revert { let result = match entry.op { rustdb_storage::OpType::Insert => { // Undo insert -> delete the document. ctx.storage.delete_by_id(&entry.db, &entry.collection, &entry.document_id).await } rustdb_storage::OpType::Update => { // Undo update -> restore the previous document. if let Some(ref prev_doc) = entry.previous_document { ctx.storage .update_by_id(&entry.db, &entry.collection, &entry.document_id, prev_doc.clone()) .await } else { errors.push(format!("seq {}: update entry missing previous_document", entry.seq)); continue; } } rustdb_storage::OpType::Delete => { // Undo delete -> re-insert the previous document. if let Some(ref prev_doc) = entry.previous_document { ctx.storage .insert_one(&entry.db, &entry.collection, prev_doc.clone()) .await .map(|_| ()) } else { errors.push(format!("seq {}: delete entry missing previous_document", entry.seq)); continue; } } }; match result { Ok(()) => reverted += 1, Err(e) => errors.push(format!("seq {}: {}", entry.seq, e)), } } // Truncate the oplog to the target sequence. ctx.oplog.truncate_after(target_seq); let mut response = serde_json::json!({ "dryRun": false, "reverted": reverted, "targetSeq": target_seq, }); if !errors.is_empty() { response["errors"] = serde_json::json!(errors); } ManagementResponse::ok(id.to_string(), response) } async fn handle_get_collections( id: &str, params: &serde_json::Value, db: &Option, ) -> ManagementResponse { let d = match db.as_ref() { Some(d) => d, None => return ManagementResponse::err(id.to_string(), "Server is not running".to_string()), }; let ctx = d.ctx(); let filter_db = params.get("db").and_then(|v| v.as_str()); let databases = match ctx.storage.list_databases().await { Ok(dbs) => dbs, Err(e) => return ManagementResponse::err(id.to_string(), format!("Failed to list databases: {}", e)), }; let mut collections: Vec = Vec::new(); for db_name in &databases { if let Some(fdb) = filter_db { if db_name != fdb { continue; } } if let Ok(colls) = ctx.storage.list_collections(db_name).await { for coll_name in colls { let count = ctx.storage.count(db_name, &coll_name).await.unwrap_or(0); collections.push(serde_json::json!({ "db": db_name, "name": coll_name, "count": count, })); } } } ManagementResponse::ok( id.to_string(), serde_json::json!({ "collections": collections }), ) } async fn handle_get_documents( id: &str, params: &serde_json::Value, db: &Option, ) -> ManagementResponse { let d = match db.as_ref() { Some(d) => d, None => return ManagementResponse::err(id.to_string(), "Server is not running".to_string()), }; let db_name = match params.get("db").and_then(|v| v.as_str()) { Some(s) => s, None => return ManagementResponse::err(id.to_string(), "Missing 'db' parameter".to_string()), }; let coll_name = match params.get("collection").and_then(|v| v.as_str()) { Some(s) => s, None => return ManagementResponse::err(id.to_string(), "Missing 'collection' parameter".to_string()), }; let limit = params.get("limit").and_then(|v| v.as_u64()).unwrap_or(50) as usize; let skip = params.get("skip").and_then(|v| v.as_u64()).unwrap_or(0) as usize; let ctx = d.ctx(); let all_docs = match ctx.storage.find_all(db_name, coll_name).await { Ok(docs) => docs, Err(e) => return ManagementResponse::err(id.to_string(), format!("Failed to find documents: {}", e)), }; let total = all_docs.len(); let docs: Vec = all_docs .into_iter() .skip(skip) .take(limit) .map(|d| bson_doc_to_json(&d)) .collect(); ManagementResponse::ok( id.to_string(), serde_json::json!({ "documents": docs, "total": total, }), ) } /// Convert a BSON Document to a serde_json::Value. fn bson_doc_to_json(doc: &bson::Document) -> serde_json::Value { // Use bson's built-in relaxed extended JSON serialization. let bson_val = bson::Bson::Document(doc.clone()); bson_val.into_relaxed_extjson() }