use anyhow::Result; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, BufReader}; use tracing::{info, error}; use crate::RustProxy; use rustproxy_config::RustProxyOptions; /// A management request from the TypeScript wrapper. #[derive(Debug, Deserialize)] pub struct ManagementRequest { pub id: String, pub method: String, #[serde(default)] pub params: serde_json::Value, } /// A management response back to the TypeScript wrapper. #[derive(Debug, Serialize)] pub struct ManagementResponse { pub id: String, pub success: bool, #[serde(skip_serializing_if = "Option::is_none")] pub result: Option, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } /// An unsolicited event from the proxy to the TypeScript wrapper. #[derive(Debug, Serialize)] pub struct ManagementEvent { pub event: String, pub data: serde_json::Value, } impl ManagementResponse { fn ok(id: String, result: serde_json::Value) -> Self { Self { id, success: true, result: Some(result), error: None, } } fn err(id: String, message: String) -> Self { Self { id, success: false, result: None, error: Some(message), } } } fn send_line(line: &str) { // Use blocking stdout write - we're writing short JSON lines use std::io::Write; let stdout = std::io::stdout(); let mut handle = stdout.lock(); let _ = handle.write_all(line.as_bytes()); let _ = handle.write_all(b"\n"); let _ = handle.flush(); } fn send_response(response: &ManagementResponse) { match serde_json::to_string(response) { Ok(json) => send_line(&json), Err(e) => error!("Failed to serialize management response: {}", e), } } fn send_event(event: &str, data: serde_json::Value) { let evt = ManagementEvent { event: event.to_string(), data, }; match serde_json::to_string(&evt) { Ok(json) => send_line(&json), Err(e) => error!("Failed to serialize management event: {}", e), } } /// Run the management loop, reading JSON commands from stdin and writing responses to stdout. pub async fn management_loop() -> Result<()> { let stdin = BufReader::new(tokio::io::stdin()); let mut lines = stdin.lines(); let mut proxy: Option = None; send_event("ready", serde_json::json!({})); loop { let line = match lines.next_line().await { Ok(Some(line)) => line, Ok(None) => { // stdin closed - parent process exited info!("Management stdin closed, shutting down"); if let Some(ref mut p) = proxy { let _ = p.stop().await; } break; } Err(e) => { error!("Error reading management stdin: {}", e); break; } }; let line = line.trim().to_string(); if line.is_empty() { continue; } let request: ManagementRequest = match serde_json::from_str(&line) { Ok(r) => r, Err(e) => { error!("Failed to parse management request: {}", e); // Send error response without an ID send_response(&ManagementResponse::err( "unknown".to_string(), format!("Failed to parse request: {}", e), )); continue; } }; let response = handle_request(&request, &mut proxy).await; send_response(&response); } Ok(()) } async fn handle_request( request: &ManagementRequest, proxy: &mut Option, ) -> ManagementResponse { let id = request.id.clone(); match request.method.as_str() { "start" => handle_start(&id, &request.params, proxy).await, "stop" => handle_stop(&id, proxy).await, "updateRoutes" => handle_update_routes(&id, &request.params, proxy).await, "getMetrics" => handle_get_metrics(&id, proxy), "getStatistics" => handle_get_statistics(&id, proxy), "provisionCertificate" => handle_provision_certificate(&id, &request.params, proxy).await, "renewCertificate" => handle_renew_certificate(&id, &request.params, proxy).await, "getCertificateStatus" => handle_get_certificate_status(&id, &request.params, proxy).await, "getListeningPorts" => handle_get_listening_ports(&id, proxy), "getNftablesStatus" => handle_get_nftables_status(&id, proxy).await, "setSocketHandlerRelay" => handle_set_socket_handler_relay(&id, &request.params, proxy).await, "addListeningPort" => handle_add_listening_port(&id, &request.params, proxy).await, "removeListeningPort" => handle_remove_listening_port(&id, &request.params, proxy).await, "loadCertificate" => handle_load_certificate(&id, &request.params, proxy).await, _ => ManagementResponse::err(id, format!("Unknown method: {}", request.method)), } } async fn handle_start( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { if proxy.is_some() { return ManagementResponse::err(id.to_string(), "Proxy is already running".to_string()); } let config = match params.get("config") { Some(config) => config, None => return ManagementResponse::err(id.to_string(), "Missing 'config' parameter".to_string()), }; let options: RustProxyOptions = match serde_json::from_value(config.clone()) { Ok(o) => o, Err(e) => return ManagementResponse::err(id.to_string(), format!("Invalid config: {}", e)), }; match RustProxy::new(options) { Ok(mut p) => { match p.start().await { Ok(()) => { send_event("started", serde_json::json!({})); *proxy = Some(p); ManagementResponse::ok(id.to_string(), serde_json::json!({})) } Err(e) => { send_event("error", serde_json::json!({"message": format!("{}", e)})); ManagementResponse::err(id.to_string(), format!("Failed to start: {}", e)) } } } Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to create proxy: {}", e)), } } async fn handle_stop( id: &str, proxy: &mut Option, ) -> ManagementResponse { match proxy.as_mut() { Some(p) => { match p.stop().await { Ok(()) => { *proxy = None; send_event("stopped", serde_json::json!({})); ManagementResponse::ok(id.to_string(), serde_json::json!({})) } Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to stop: {}", e)), } } None => ManagementResponse::ok(id.to_string(), serde_json::json!({})), } } async fn handle_update_routes( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { let p = match proxy.as_mut() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let routes = match params.get("routes") { Some(routes) => routes, None => return ManagementResponse::err(id.to_string(), "Missing 'routes' parameter".to_string()), }; let routes: Vec = match serde_json::from_value(routes.clone()) { Ok(r) => r, Err(e) => return ManagementResponse::err(id.to_string(), format!("Invalid routes: {}", e)), }; match p.update_routes(routes).await { Ok(()) => ManagementResponse::ok(id.to_string(), serde_json::json!({})), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to update routes: {}", e)), } } fn handle_get_metrics( id: &str, proxy: &Option, ) -> ManagementResponse { match proxy.as_ref() { Some(p) => { let metrics = p.get_metrics(); match serde_json::to_value(&metrics) { Ok(v) => ManagementResponse::ok(id.to_string(), v), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to serialize metrics: {}", e)), } } None => ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), } } fn handle_get_statistics( id: &str, proxy: &Option, ) -> ManagementResponse { match proxy.as_ref() { Some(p) => { let stats = p.get_statistics(); match serde_json::to_value(&stats) { Ok(v) => ManagementResponse::ok(id.to_string(), v), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to serialize statistics: {}", e)), } } None => ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), } } async fn handle_provision_certificate( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { let p = match proxy.as_mut() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let route_name = match params.get("routeName").and_then(|v| v.as_str()) { Some(name) => name.to_string(), None => return ManagementResponse::err(id.to_string(), "Missing 'routeName' parameter".to_string()), }; match p.provision_certificate(&route_name).await { Ok(()) => ManagementResponse::ok(id.to_string(), serde_json::json!({})), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to provision certificate: {}", e)), } } async fn handle_renew_certificate( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { let p = match proxy.as_mut() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let route_name = match params.get("routeName").and_then(|v| v.as_str()) { Some(name) => name.to_string(), None => return ManagementResponse::err(id.to_string(), "Missing 'routeName' parameter".to_string()), }; match p.renew_certificate(&route_name).await { Ok(()) => ManagementResponse::ok(id.to_string(), serde_json::json!({})), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to renew certificate: {}", e)), } } async fn handle_get_certificate_status( id: &str, params: &serde_json::Value, proxy: &Option, ) -> ManagementResponse { let p = match proxy.as_ref() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let route_name = match params.get("routeName").and_then(|v| v.as_str()) { Some(name) => name, None => return ManagementResponse::err(id.to_string(), "Missing 'routeName' parameter".to_string()), }; match p.get_certificate_status(route_name).await { Some(status) => ManagementResponse::ok(id.to_string(), serde_json::json!({ "domain": status.domain, "source": status.source, "expiresAt": status.expires_at, "isValid": status.is_valid, })), None => ManagementResponse::ok(id.to_string(), serde_json::Value::Null), } } fn handle_get_listening_ports( id: &str, proxy: &Option, ) -> ManagementResponse { match proxy.as_ref() { Some(p) => { let ports = p.get_listening_ports(); ManagementResponse::ok(id.to_string(), serde_json::json!({ "ports": ports })) } None => ManagementResponse::ok(id.to_string(), serde_json::json!({ "ports": [] })), } } async fn handle_get_nftables_status( id: &str, proxy: &Option, ) -> ManagementResponse { match proxy.as_ref() { Some(p) => { match p.get_nftables_status().await { Ok(status) => { match serde_json::to_value(&status) { Ok(v) => ManagementResponse::ok(id.to_string(), v), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to serialize: {}", e)), } } Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to get status: {}", e)), } } None => ManagementResponse::ok(id.to_string(), serde_json::json!({})), } } async fn handle_set_socket_handler_relay( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { let p = match proxy.as_mut() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let socket_path = params.get("socketPath") .and_then(|v| v.as_str()) .map(|s| s.to_string()); info!("setSocketHandlerRelay: socket_path={:?}", socket_path); p.set_socket_handler_relay_path(socket_path); ManagementResponse::ok(id.to_string(), serde_json::json!({})) } async fn handle_add_listening_port( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { let p = match proxy.as_mut() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let port = match params.get("port").and_then(|v| v.as_u64()) { Some(port) => port as u16, None => return ManagementResponse::err(id.to_string(), "Missing 'port' parameter".to_string()), }; match p.add_listening_port(port).await { Ok(()) => ManagementResponse::ok(id.to_string(), serde_json::json!({})), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to add port {}: {}", port, e)), } } async fn handle_remove_listening_port( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { let p = match proxy.as_mut() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let port = match params.get("port").and_then(|v| v.as_u64()) { Some(port) => port as u16, None => return ManagementResponse::err(id.to_string(), "Missing 'port' parameter".to_string()), }; match p.remove_listening_port(port).await { Ok(()) => ManagementResponse::ok(id.to_string(), serde_json::json!({})), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to remove port {}: {}", port, e)), } } async fn handle_load_certificate( id: &str, params: &serde_json::Value, proxy: &mut Option, ) -> ManagementResponse { let p = match proxy.as_mut() { Some(p) => p, None => return ManagementResponse::err(id.to_string(), "Proxy is not running".to_string()), }; let domain = match params.get("domain").and_then(|v| v.as_str()) { Some(d) => d.to_string(), None => return ManagementResponse::err(id.to_string(), "Missing 'domain' parameter".to_string()), }; let cert = match params.get("cert").and_then(|v| v.as_str()) { Some(c) => c.to_string(), None => return ManagementResponse::err(id.to_string(), "Missing 'cert' parameter".to_string()), }; let key = match params.get("key").and_then(|v| v.as_str()) { Some(k) => k.to_string(), None => return ManagementResponse::err(id.to_string(), "Missing 'key' parameter".to_string()), }; let ca = params.get("ca").and_then(|v| v.as_str()).map(|s| s.to_string()); info!("loadCertificate: domain={}", domain); // Load cert into cert manager and hot-swap TLS config match p.load_certificate(&domain, cert, key, ca).await { Ok(()) => ManagementResponse::ok(id.to_string(), serde_json::json!({})), Err(e) => ManagementResponse::err(id.to_string(), format!("Failed to load certificate for {}: {}", domain, e)), } }