/// SIP proxy engine — the Rust data plane for the SIP router. /// /// Handles ALL SIP protocol mechanics. TypeScript only sends high-level /// commands (routing decisions, config) and receives high-level events /// (incoming calls, registration state). /// /// No raw SIP ever touches TypeScript. mod audio_player; mod call; mod call_manager; mod config; mod dtmf; mod ipc; mod provider; mod recorder; mod registrar; mod rtp; mod sip_leg; mod sip_transport; mod voicemail; mod webrtc_engine; use crate::call_manager::CallManager; use crate::config::AppConfig; use crate::ipc::{emit_event, respond_err, respond_ok, Command, OutTx}; use crate::provider::ProviderManager; use crate::registrar::Registrar; use crate::rtp::RtpPortPool; use crate::sip_transport::SipTransport; use crate::webrtc_engine::WebRtcEngine; use sip_proto::message::SipMessage; use std::net::SocketAddr; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, Mutex}; /// Shared mutable state for the proxy engine (SIP side). /// WebRTC is intentionally kept in a separate lock to avoid contention /// between SIP packet handlers and WebRTC command handlers. struct ProxyEngine { config: Option, transport: Option, provider_mgr: ProviderManager, registrar: Registrar, call_mgr: CallManager, rtp_pool: Option, out_tx: OutTx, } impl ProxyEngine { fn new(out_tx: OutTx) -> Self { Self { config: None, transport: None, provider_mgr: ProviderManager::new(out_tx.clone()), registrar: Registrar::new(out_tx.clone()), call_mgr: CallManager::new(out_tx.clone()), rtp_pool: None, out_tx, } } } #[tokio::main] async fn main() { // Output channel: all stdout writes go through here for serialization. let (out_tx, mut out_rx) = mpsc::unbounded_channel::(); // Stdout writer task. tokio::spawn(async move { let mut stdout = tokio::io::stdout(); while let Some(line) = out_rx.recv().await { let mut output = line.into_bytes(); output.push(b'\n'); if stdout.write_all(&output).await.is_err() { break; } let _ = stdout.flush().await; } }); // Emit ready event. emit_event(&out_tx, "ready", serde_json::json!({})); // Shared engine state (SIP side). let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone()))); // WebRTC engine — separate lock to avoid deadlock with SIP handlers. let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone()))); // Read commands from stdin. let stdin = tokio::io::stdin(); let reader = BufReader::new(stdin); let mut lines = reader.lines(); while let Ok(Some(line)) = lines.next_line().await { if line.trim().is_empty() { continue; } let cmd: Command = match serde_json::from_str(&line) { Ok(c) => c, Err(e) => { respond_err(&out_tx, "", &format!("parse: {e}")); continue; } }; let engine = engine.clone(); let webrtc = webrtc.clone(); let out_tx = out_tx.clone(); // Handle commands — some are async, so we spawn. tokio::spawn(async move { handle_command(engine, webrtc, &out_tx, cmd).await; }); } } async fn handle_command( engine: Arc>, webrtc: Arc>, out_tx: &OutTx, cmd: Command, ) { match cmd.method.as_str() { // SIP commands — lock engine only. "configure" => handle_configure(engine, out_tx, &cmd).await, "hangup" => handle_hangup(engine, out_tx, &cmd).await, "make_call" => handle_make_call(engine, out_tx, &cmd).await, "get_status" => handle_get_status(engine, out_tx, &cmd).await, // WebRTC commands — lock webrtc only (no engine contention). "webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await, "webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await, "webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await, // webrtc_link needs both: engine (for RTP socket) and webrtc (for session). "webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await, _ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)), } } /// Handle the `configure` command — receives full app config from TypeScript. /// First call: initializes SIP transport + everything. /// Subsequent calls: reconfigures providers/devices/routing without rebinding. async fn handle_configure(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let app_config: AppConfig = match serde_json::from_value(cmd.params.clone()) { Ok(c) => c, Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad config: {e}")); return; } }; let mut eng = engine.lock().await; let is_reconfigure = eng.transport.is_some(); let socket = if is_reconfigure { // Reconfigure — socket already bound, just update subsystems. eng.transport.as_ref().unwrap().socket() } else { // First configure — bind SIP transport. let bind_addr = format!("0.0.0.0:{}", app_config.proxy.lan_port); let transport = match SipTransport::bind(&bind_addr).await { Ok(t) => t, Err(e) => { respond_err(out_tx, &cmd.id, &format!("SIP bind failed: {e}")); return; } }; let socket = transport.socket(); // Start UDP receiver. let engine_for_recv = engine.clone(); let socket_for_recv = socket.clone(); transport.spawn_receiver(move |data: &[u8], addr: SocketAddr| { let engine = engine_for_recv.clone(); let socket = socket_for_recv.clone(); let data = data.to_vec(); tokio::spawn(async move { handle_sip_packet(engine, &socket, &data, addr).await; }); }); eng.transport = Some(transport); // Initialize RTP port pool (only on first configure). eng.rtp_pool = Some(RtpPortPool::new( app_config.proxy.rtp_port_range.min, app_config.proxy.rtp_port_range.max, )); socket }; // (Re)configure registrar. eng.registrar.configure(&app_config.devices); // (Re)configure provider registrations. eng.provider_mgr .configure( &app_config.providers, app_config.proxy.public_ip_seed.as_deref(), &app_config.proxy.lan_ip, app_config.proxy.lan_port, socket, ) .await; let bind_info = format!("0.0.0.0:{}", app_config.proxy.lan_port); eng.config = Some(app_config); respond_ok( out_tx, &cmd.id, serde_json::json!({ "bound": bind_info, "reconfigure": is_reconfigure, }), ); } /// Handle incoming SIP packets from the UDP socket. /// This is the core routing pipeline — entirely in Rust. async fn handle_sip_packet( engine: Arc>, socket: &UdpSocket, data: &[u8], from_addr: SocketAddr, ) { let msg = match SipMessage::parse(data) { Some(m) => m, None => return, // Not a valid SIP message, ignore. }; let mut eng = engine.lock().await; // 1. Provider registration responses — consumed internally. if msg.is_response() { if eng.provider_mgr.handle_response(&msg, socket).await { return; } } // 2. Device REGISTER — handled by registrar. let is_from_provider = eng .provider_mgr .find_by_address(&from_addr) .await .is_some(); if !is_from_provider && msg.method() == Some("REGISTER") { if let Some(response_buf) = eng.registrar.handle_register(&msg, from_addr) { let _ = socket.send_to(&response_buf, from_addr).await; return; } } // 3. Route to existing call by SIP Call-ID. // Check if this Call-ID belongs to an active call (avoids borrow conflict). if eng.call_mgr.has_call(msg.call_id()) { let config_ref = eng.config.as_ref().unwrap().clone(); // Temporarily take registrar to avoid overlapping borrows. let registrar_dummy = Registrar::new(eng.out_tx.clone()); if eng .call_mgr .route_sip_message(&msg, from_addr, socket, &config_ref, ®istrar_dummy) .await { return; } } let config_ref = eng.config.as_ref().unwrap().clone(); // 4. New inbound INVITE from provider. if is_from_provider && msg.is_request() && msg.method() == Some("INVITE") { // Detect public IP from Via. if let Some(via) = msg.get_header("Via") { if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await { let mut ps = ps_arc.lock().await; ps.detect_public_ip(via); } } // Send 100 Trying immediately. let trying = SipMessage::create_response(100, "Trying", &msg, None); let _ = socket.send_to(&trying.serialize(), from_addr).await; // Determine provider info. let (provider_id, provider_config, public_ip) = if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await { let ps = ps_arc.lock().await; ( ps.config.id.clone(), ps.config.clone(), ps.public_ip.clone(), ) } else { return; }; // Create the inbound call — Rust handles everything. // Split borrows via destructuring to satisfy the borrow checker. let ProxyEngine { ref registrar, ref mut call_mgr, ref mut rtp_pool, .. } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); let call_id = call_mgr .create_inbound_call( &msg, from_addr, &provider_id, &provider_config, &config_ref, registrar, rtp_pool, socket, public_ip.as_deref(), ) .await; if let Some(call_id) = call_id { // Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc). let from_header = msg.get_header("From").unwrap_or(""); let from_uri = SipMessage::extract_uri(from_header).unwrap_or("Unknown"); let called_number = msg .request_uri() .and_then(|uri| SipMessage::extract_uri(uri)) .unwrap_or(""); emit_event( &eng.out_tx, "incoming_call", serde_json::json!({ "call_id": call_id, "from_uri": from_uri, "to_number": called_number, "provider_id": provider_id, }), ); } return; } // 5. New outbound INVITE from device. if !is_from_provider && msg.is_request() && msg.method() == Some("INVITE") { // Resolve outbound route. let dialed_number = msg .request_uri() .and_then(|uri| SipMessage::extract_uri(uri)) .unwrap_or(msg.request_uri().unwrap_or("")) .to_string(); let device = eng.registrar.find_by_address(&from_addr); let device_id = device.map(|d| d.device_id.clone()); // Find provider via routing rules. let route_result = config_ref.resolve_outbound_route( &dialed_number, device_id.as_deref(), &|pid: &str| { // Can't call async here — use a sync check. // For now, assume all configured providers are available. true }, ); if let Some(route) = route_result { let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await { let ps = ps_arc.lock().await; ps.public_ip.clone() } else { None }; let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); let call_id = call_mgr .create_outbound_passthrough( &msg, from_addr, &route.provider, &config_ref, rtp_pool, socket, public_ip.as_deref(), ) .await; if let Some(call_id) = call_id { emit_event( &eng.out_tx, "outbound_device_call", serde_json::json!({ "call_id": call_id, "from_device": device_id, "to_number": dialed_number, }), ); } } return; } // 6. Other messages — log for debugging. let label = if msg.is_request() { msg.method().unwrap_or("?").to_string() } else { msg.status_code().map(|c| c.to_string()).unwrap_or_default() }; emit_event( &eng.out_tx, "sip_unhandled", serde_json::json!({ "method_or_status": label, "call_id": msg.call_id(), "from_addr": from_addr.ip().to_string(), "from_port": from_addr.port(), "is_from_provider": is_from_provider, }), ); } /// Handle `get_status` — return active call statuses from Rust. async fn handle_get_status(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let eng = engine.lock().await; let calls = eng.call_mgr.get_all_statuses(); respond_ok(out_tx, &cmd.id, serde_json::json!({ "calls": calls })); } /// Handle `make_call` — initiate an outbound call to a number via a provider. async fn handle_make_call(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let number = match cmd.params.get("number").and_then(|v| v.as_str()) { Some(n) => n.to_string(), None => { respond_err(out_tx, &cmd.id, "missing number"); return; } }; let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str()); let mut eng = engine.lock().await; let config_ref = match &eng.config { Some(c) => c.clone(), None => { respond_err(out_tx, &cmd.id, "not configured"); return; } }; // Resolve provider. let provider_config = if let Some(pid) = provider_id { config_ref.providers.iter().find(|p| p.id == pid).cloned() } else { // Use route resolution or first provider. let route = config_ref.resolve_outbound_route(&number, None, &|_| true); route.map(|r| r.provider) }; let provider_config = match provider_config { Some(p) => p, None => { respond_err(out_tx, &cmd.id, "no provider available"); return; } }; // Get public IP and registered AOR from provider state. let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_address( &provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()) ).await { let ps = ps_arc.lock().await; (ps.public_ip.clone(), ps.registered_aor.clone()) } else { // Fallback — construct AOR from config. (None, format!("sip:{}@{}", provider_config.username, provider_config.domain)) }; let socket = match &eng.transport { Some(t) => t.socket(), None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } }; let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); let call_id = call_mgr.make_outbound_call( &number, &provider_config, &config_ref, rtp_pool, &socket, public_ip.as_deref(), ®istered_aor, ).await; match call_id { Some(id) => { emit_event(out_tx, "outbound_call_started", serde_json::json!({ "call_id": id, "number": number, "provider_id": provider_config.id, })); respond_ok(out_tx, &cmd.id, serde_json::json!({ "call_id": id })); } None => { respond_err(out_tx, &cmd.id, "call origination failed — provider not registered or no ports available"); } } } /// Handle the `hangup` command. async fn handle_hangup(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(id) => id.to_string(), None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } }; let mut eng = engine.lock().await; let socket = match &eng.transport { Some(t) => t.socket(), None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } }; if eng.call_mgr.hangup(&call_id, &socket).await { respond_ok(out_tx, &cmd.id, serde_json::json!({})); } else { respond_err(out_tx, &cmd.id, &format!("call {call_id} not found")); } } /// Handle `webrtc_offer` — browser sends SDP offer, we create PeerConnection and return answer. /// Uses only the WebRTC lock — no contention with SIP handlers. async fn handle_webrtc_offer(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } }; let offer_sdp = match cmd.params.get("sdp").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; } }; let mut wrtc = webrtc.lock().await; match wrtc.handle_offer(&session_id, &offer_sdp).await { Ok(answer_sdp) => { respond_ok(out_tx, &cmd.id, serde_json::json!({ "session_id": session_id, "sdp": answer_sdp, })); } Err(e) => respond_err(out_tx, &cmd.id, &e), } } /// Handle `webrtc_ice` — forward ICE candidate from browser to Rust PeerConnection. /// Uses only the WebRTC lock. async fn handle_webrtc_ice(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } }; let candidate = cmd.params.get("candidate").and_then(|v| v.as_str()).unwrap_or(""); let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str()); let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16); let wrtc = webrtc.lock().await; match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await { Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})), Err(e) => respond_err(out_tx, &cmd.id, &e), } } /// Handle `webrtc_link` — link a WebRTC session to a SIP call for audio bridging. /// Briefly locks engine to get the RTP socket, then locks webrtc to set up the bridge. /// Locks are never held simultaneously — no deadlock possible. async fn handle_webrtc_link( engine: Arc>, webrtc: Arc>, out_tx: &OutTx, cmd: &Command, ) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } }; let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } }; let provider_addr = match cmd.params.get("provider_media_addr").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing provider_media_addr"); return; } }; let provider_port = match cmd.params.get("provider_media_port").and_then(|v| v.as_u64()) { Some(p) => p as u16, None => { respond_err(out_tx, &cmd.id, "missing provider_media_port"); return; } }; let sip_pt = cmd.params.get("sip_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8; let provider_media: SocketAddr = match format!("{provider_addr}:{provider_port}").parse() { Ok(a) => a, Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad address: {e}")); return; } }; // Briefly lock engine to get the B2BUA call's RTP socket. let rtp_socket = { let eng = engine.lock().await; eng.call_mgr.get_b2bua_rtp_socket(&call_id) }; // engine lock released here let rtp_socket = match rtp_socket { Some(s) => s, None => { respond_err(out_tx, &cmd.id, &format!("call {call_id} not found or no RTP socket")); return; } }; let bridge_info = crate::webrtc_engine::SipBridgeInfo { provider_media, sip_pt, rtp_socket, }; // Lock webrtc to set up the audio bridge. let mut wrtc = webrtc.lock().await; if wrtc.link_to_sip(&session_id, &call_id, bridge_info).await { respond_ok(out_tx, &cmd.id, serde_json::json!({ "session_id": session_id, "call_id": call_id, "bridged": true, })); } else { respond_err(out_tx, &cmd.id, &format!("session {session_id} not found")); } } /// Handle `webrtc_close` — close a WebRTC session. /// Uses only the WebRTC lock. async fn handle_webrtc_close(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } }; let mut wrtc = webrtc.lock().await; match wrtc.close_session(&session_id).await { Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})), Err(e) => respond_err(out_tx, &cmd.id, &e), } }