Files
remoteingress/rust/crates/remoteingress-bin/src/main.rs

430 lines
14 KiB
Rust

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use clap::Parser;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::sync::Mutex;
use remoteingress_core::hub::{AllowedEdge, HubConfig, HubEvent, TunnelHub};
use remoteingress_core::edge::{EdgeConfig, EdgeEvent, TunnelEdge};
#[derive(Parser)]
#[command(name = "remoteingress-bin", version = "2.0.0")]
struct Cli {
/// Run in IPC management mode (JSON over stdin/stdout)
#[arg(long)]
management: bool,
}
// IPC message types
#[derive(Deserialize)]
struct IpcRequest {
id: String,
method: String,
params: serde_json::Value,
}
#[derive(Serialize)]
struct IpcResponse {
id: String,
success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
#[derive(Serialize)]
struct IpcEvent {
event: String,
data: serde_json::Value,
}
fn send_ipc_line(line: &str) {
// Write to stdout synchronously, since we're line-buffered
use std::io::Write;
let stdout = std::io::stdout();
let mut out = stdout.lock();
let _ = out.write_all(line.as_bytes());
let _ = out.write_all(b"\n");
let _ = out.flush();
}
fn send_event(event: &str, data: serde_json::Value) {
let evt = IpcEvent {
event: event.to_string(),
data,
};
if let Ok(json) = serde_json::to_string(&evt) {
send_ipc_line(&json);
}
}
fn send_response(id: &str, result: serde_json::Value) {
let resp = IpcResponse {
id: id.to_string(),
success: true,
result: Some(result),
error: None,
};
if let Ok(json) = serde_json::to_string(&resp) {
send_ipc_line(&json);
}
}
fn send_error(id: &str, error: &str) {
let resp = IpcResponse {
id: id.to_string(),
success: false,
result: None,
error: Some(error.to_string()),
};
if let Ok(json) = serde_json::to_string(&resp) {
send_ipc_line(&json);
}
}
#[tokio::main]
async fn main() {
// Install the ring CryptoProvider before any TLS usage
rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls ring CryptoProvider");
let cli = Cli::parse();
if !cli.management {
eprintln!("remoteingress-bin: use --management for IPC mode");
std::process::exit(1);
}
// Initialize logging to stderr (stdout is for IPC)
env_logger::Builder::from_default_env()
.target(env_logger::Target::Stderr)
.filter_level(log::LevelFilter::Info)
.init();
// Send ready event
send_event("ready", serde_json::json!({ "version": "2.0.0" }));
// State
let hub: Arc<Mutex<Option<Arc<TunnelHub>>>> = Arc::new(Mutex::new(None));
let edge: Arc<Mutex<Option<Arc<TunnelEdge>>>> = Arc::new(Mutex::new(None));
// Read commands from stdin
let stdin = tokio::io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
let request: IpcRequest = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
log::error!("Invalid IPC request: {}", e);
continue;
}
};
let hub = hub.clone();
let edge = edge.clone();
tokio::spawn(async move {
handle_request(request, hub, edge).await;
});
}
}
async fn handle_request(
req: IpcRequest,
hub: Arc<Mutex<Option<Arc<TunnelHub>>>>,
edge: Arc<Mutex<Option<Arc<TunnelEdge>>>>,
) {
match req.method.as_str() {
"ping" => {
send_response(&req.id, serde_json::json!({ "pong": true }));
}
"startHub" => {
let config: HubConfig = match serde_json::from_value(req.params.clone()) {
Ok(c) => c,
Err(e) => {
send_error(&req.id, &format!("invalid hub config: {}", e));
return;
}
};
let tunnel_hub = Arc::new(TunnelHub::new(config));
// Forward hub events to IPC
if let Some(mut event_rx) = tunnel_hub.take_event_rx().await {
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match &event {
HubEvent::EdgeConnected { edge_id, peer_addr } => {
send_event(
"edgeConnected",
serde_json::json!({ "edgeId": edge_id, "peerAddr": peer_addr }),
);
}
HubEvent::EdgeDisconnected { edge_id } => {
send_event(
"edgeDisconnected",
serde_json::json!({ "edgeId": edge_id }),
);
}
HubEvent::StreamOpened {
edge_id,
stream_id,
} => {
send_event(
"streamOpened",
serde_json::json!({
"edgeId": edge_id,
"streamId": stream_id,
}),
);
}
HubEvent::StreamClosed {
edge_id,
stream_id,
} => {
send_event(
"streamClosed",
serde_json::json!({
"edgeId": edge_id,
"streamId": stream_id,
}),
);
}
}
}
});
}
match tunnel_hub.start().await {
Ok(()) => {
*hub.lock().await = Some(tunnel_hub);
send_response(&req.id, serde_json::json!({ "started": true }));
}
Err(e) => {
send_error(&req.id, &format!("failed to start hub: {}", e));
}
}
}
"stopHub" => {
let mut h = hub.lock().await;
if let Some(hub_instance) = h.take() {
hub_instance.stop().await;
send_response(&req.id, serde_json::json!({ "stopped": true }));
} else {
send_response(
&req.id,
serde_json::json!({ "stopped": true, "wasRunning": false }),
);
}
}
"updateAllowedEdges" => {
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct UpdateEdgesParams {
edges: Vec<AllowedEdge>,
}
let params: UpdateEdgesParams = match serde_json::from_value(req.params.clone()) {
Ok(p) => p,
Err(e) => {
send_error(&req.id, &format!("invalid params: {}", e));
return;
}
};
let h = hub.lock().await;
if let Some(hub_instance) = h.as_ref() {
hub_instance.update_allowed_edges(params.edges).await;
send_response(&req.id, serde_json::json!({ "updated": true }));
} else {
send_error(&req.id, "hub not running");
}
}
"getHubStatus" => {
let h = hub.lock().await;
if let Some(hub_instance) = h.as_ref() {
let status = hub_instance.get_status().await;
send_response(
&req.id,
serde_json::to_value(&status).unwrap_or_default(),
);
} else {
send_response(
&req.id,
serde_json::json!({
"running": false,
"tunnelPort": 0,
"connectedEdges": []
}),
);
}
}
"startEdge" => {
let config: EdgeConfig = match serde_json::from_value(req.params.clone()) {
Ok(c) => c,
Err(e) => {
send_error(&req.id, &format!("invalid edge config: {}", e));
return;
}
};
let tunnel_edge = Arc::new(TunnelEdge::new(config));
// Forward edge events to IPC
if let Some(mut event_rx) = tunnel_edge.take_event_rx().await {
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match &event {
EdgeEvent::TunnelConnected => {
send_event("tunnelConnected", serde_json::json!({}));
}
EdgeEvent::TunnelDisconnected => {
send_event("tunnelDisconnected", serde_json::json!({}));
}
EdgeEvent::PublicIpDiscovered { ip } => {
send_event(
"publicIpDiscovered",
serde_json::json!({ "ip": ip }),
);
}
EdgeEvent::PortsAssigned { listen_ports } => {
send_event(
"portsAssigned",
serde_json::json!({ "listenPorts": listen_ports }),
);
}
EdgeEvent::PortsUpdated { listen_ports } => {
send_event(
"portsUpdated",
serde_json::json!({ "listenPorts": listen_ports }),
);
}
}
}
});
}
match tunnel_edge.start().await {
Ok(()) => {
*edge.lock().await = Some(tunnel_edge);
send_response(&req.id, serde_json::json!({ "started": true }));
}
Err(e) => {
send_error(&req.id, &format!("failed to start edge: {}", e));
}
}
}
"stopEdge" => {
let mut e = edge.lock().await;
if let Some(edge_instance) = e.take() {
edge_instance.stop().await;
send_response(&req.id, serde_json::json!({ "stopped": true }));
} else {
send_response(
&req.id,
serde_json::json!({ "stopped": true, "wasRunning": false }),
);
}
}
"getEdgeStatus" => {
let e = edge.lock().await;
if let Some(edge_instance) = e.as_ref() {
let status = edge_instance.get_status().await;
send_response(
&req.id,
serde_json::to_value(&status).unwrap_or_default(),
);
} else {
send_response(
&req.id,
serde_json::json!({
"running": false,
"connected": false,
"publicIp": null,
"activeStreams": 0,
"listenPorts": []
}),
);
}
}
_ => {
send_error(&req.id, &format!("unknown method: {}", req.method));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ipc_request_deserialize() {
let json = r#"{"id": "1", "method": "ping", "params": {}}"#;
let req: IpcRequest = serde_json::from_str(json).unwrap();
assert_eq!(req.id, "1");
assert_eq!(req.method, "ping");
assert!(req.params.is_object());
}
#[test]
fn test_ipc_response_skip_error_when_none() {
let resp = IpcResponse {
id: "1".to_string(),
success: true,
result: Some(serde_json::json!({"pong": true})),
error: None,
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["id"], "1");
assert_eq!(json["success"], true);
assert_eq!(json["result"]["pong"], true);
assert!(json.get("error").is_none());
}
#[test]
fn test_ipc_response_skip_result_when_none() {
let resp = IpcResponse {
id: "2".to_string(),
success: false,
result: None,
error: Some("something failed".to_string()),
};
let json = serde_json::to_value(&resp).unwrap();
assert_eq!(json["id"], "2");
assert_eq!(json["success"], false);
assert_eq!(json["error"], "something failed");
assert!(json.get("result").is_none());
}
#[test]
fn test_ipc_event_serialize() {
let evt = IpcEvent {
event: "ready".to_string(),
data: serde_json::json!({"version": "2.0.0"}),
};
let json = serde_json::to_value(&evt).unwrap();
assert_eq!(json["event"], "ready");
assert_eq!(json["data"]["version"], "2.0.0");
}
}