//! Call manager — central registry and orchestration for all calls. //! //! Handles: //! - Inbound passthrough calls (provider → proxy → device) //! - Outbound passthrough calls (device → proxy → provider) //! - SIP message routing by Call-ID //! - BYE/CANCEL handling //! - RTP relay setup //! //! Ported from ts/call/call-manager.ts (passthrough mode). use crate::call::{CallDirection, CallState, PassthroughCall}; use crate::config::{AppConfig, ProviderConfig}; use crate::dtmf::DtmfDetector; use crate::ipc::{emit_event, OutTx}; use crate::registrar::Registrar; use crate::rtp::RtpPortPool; use sip_proto::helpers::parse_sdp_endpoint; use sip_proto::message::SipMessage; use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; use tokio::net::UdpSocket; pub struct CallManager { /// Active passthrough calls, keyed by SIP Call-ID. calls: HashMap, /// Call ID counter. next_call_num: u64, /// Output channel for events. out_tx: OutTx, } impl CallManager { pub fn new(out_tx: OutTx) -> Self { Self { calls: HashMap::new(), next_call_num: 0, out_tx, } } /// Generate a unique call ID. fn next_call_id(&mut self) -> String { let id = format!( "call-{}-{}", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(), self.next_call_num, ); self.next_call_num += 1; id } /// Try to route a SIP message to an existing call. /// Returns true if handled. pub async fn route_sip_message( &mut self, msg: &SipMessage, from_addr: SocketAddr, socket: &UdpSocket, config: &AppConfig, _registrar: &Registrar, ) -> bool { let sip_call_id = msg.call_id().to_string(); // Check if this Call-ID belongs to an active call. if !self.calls.contains_key(&sip_call_id) { return false; } // Extract needed data from the call to avoid borrow conflicts. let (call_id, provider_addr, device_addr, rtp_port, from_provider) = { let call = self.calls.get(&sip_call_id).unwrap(); let from_provider = from_addr.ip().to_string() == call.provider_addr.ip().to_string(); ( call.id.clone(), call.provider_addr, call.device_addr, call.rtp_port, from_provider, ) }; let lan_ip = config.proxy.lan_ip.clone(); let lan_port = config.proxy.lan_port; if msg.is_request() { let method = msg.method().unwrap_or(""); let forward_to = if from_provider { device_addr } else { provider_addr }; // Handle BYE. if method == "BYE" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; let _ = socket.send_to(&msg.serialize(), forward_to).await; let duration = self.calls.get(&sip_call_id).unwrap().duration_secs(); emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call_id, "reason": "bye", "duration": duration, "from_side": if from_provider { "provider" } else { "device" }, }), ); self.calls.get_mut(&sip_call_id).unwrap().state = CallState::Terminated; return true; } // Handle CANCEL. if method == "CANCEL" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; let _ = socket.send_to(&msg.serialize(), forward_to).await; let duration = self.calls.get(&sip_call_id).unwrap().duration_secs(); emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call_id, "reason": "cancel", "duration": duration, }), ); self.calls.get_mut(&sip_call_id).unwrap().state = CallState::Terminated; return true; } // Handle INFO (DTMF relay). if method == "INFO" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; // Detect DTMF from INFO body. if let Some(ct) = msg.get_header("Content-Type") { let mut detector = DtmfDetector::new(call_id.clone(), self.out_tx.clone()); detector.process_sip_info(ct, &msg.body); } return true; } // Forward other requests with SDP rewriting. let mut fwd = msg.clone(); if from_provider { rewrite_sdp_for_device(&mut fwd, &lan_ip, rtp_port); if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) { let new_ruri = rewrite_sip_uri(&ruri, &device_addr.ip().to_string(), device_addr.port()); fwd.set_request_uri(&new_ruri); } } else { rewrite_sdp_for_provider(&mut fwd, &lan_ip, rtp_port); } if fwd.is_dialog_establishing() { fwd.prepend_header("Record-Route", &format!("")); } let _ = socket.send_to(&fwd.serialize(), forward_to).await; return true; } // --- Responses --- if msg.is_response() { let code = msg.status_code().unwrap_or(0); let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase(); let forward_to = if from_provider { device_addr } else { provider_addr }; let mut fwd = msg.clone(); if from_provider { rewrite_sdp_for_device(&mut fwd, &lan_ip, rtp_port); } else { rewrite_sdp_for_provider(&mut fwd, &lan_ip, rtp_port); if let Some(contact) = fwd.get_header("Contact").map(|s| s.to_string()) { let new_contact = rewrite_sip_uri(&contact, &lan_ip, lan_port); if new_contact != contact { fwd.set_header("Contact", &new_contact); } } } // State transitions. if cseq_method == "INVITE" { let call = self.calls.get_mut(&sip_call_id).unwrap(); if (code == 180 || code == 183) && call.state == CallState::SettingUp { call.state = CallState::Ringing; emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); } else if code >= 200 && code < 300 { call.state = CallState::Connected; emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id })); } else if code >= 300 { let duration = call.duration_secs(); call.state = CallState::Terminated; emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call_id, "reason": format!("rejected_{code}"), "duration": duration, }), ); } } let _ = socket.send_to(&fwd.serialize(), forward_to).await; return true; } false } /// Create an inbound passthrough call (provider → device). pub async fn create_inbound_call( &mut self, invite: &SipMessage, from_addr: SocketAddr, provider_id: &str, provider_config: &ProviderConfig, config: &AppConfig, registrar: &Registrar, rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, ) -> Option { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; // Extract caller/callee info. let from_header = invite.get_header("From").unwrap_or(""); let caller_number = SipMessage::extract_uri(from_header) .unwrap_or("Unknown") .to_string(); let called_number = invite .request_uri() .and_then(|uri| SipMessage::extract_uri(uri)) .unwrap_or("") .to_string(); // Resolve target device (first registered device for now). let device_addr = match self.resolve_first_device(config, registrar) { Some(addr) => addr, None => { // No device available — could route to voicemail // For now, send 480 Temporarily Unavailable. let resp = SipMessage::create_response(480, "Temporarily Unavailable", invite, None); let _ = socket.send_to(&resp.serialize(), from_addr).await; return None; } }; // Allocate RTP port. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => { let resp = SipMessage::create_response(503, "Service Unavailable", invite, None); let _ = socket.send_to(&resp.serialize(), from_addr).await; return None; } }; // Create the call. let mut call = PassthroughCall { id: call_id.clone(), sip_call_id: invite.call_id().to_string(), state: CallState::Ringing, direction: CallDirection::Inbound, created_at: Instant::now(), caller_number: Some(caller_number), callee_number: Some(called_number), provider_id: provider_id.to_string(), provider_addr: from_addr, provider_media: None, device_addr, device_media: None, rtp_port: rtp_alloc.port, rtp_socket: rtp_alloc.socket.clone(), pkt_from_device: 0, pkt_from_provider: 0, }; // Extract provider media from SDP. if invite.has_sdp_body() { if let Some(ep) = parse_sdp_endpoint(&invite.body) { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { call.provider_media = Some(addr); } } } // Start RTP relay. let rtp_socket = rtp_alloc.socket.clone(); let device_addr_for_relay = device_addr; let provider_addr_for_relay = from_addr; tokio::spawn(async move { rtp_relay_loop(rtp_socket, device_addr_for_relay, provider_addr_for_relay).await; }); // Rewrite and forward INVITE to device. let mut fwd_invite = invite.clone(); fwd_invite.set_request_uri(&rewrite_sip_uri( fwd_invite.request_uri().unwrap_or(""), &device_addr.ip().to_string(), device_addr.port(), )); fwd_invite.prepend_header("Record-Route", &format!("")); if fwd_invite.has_sdp_body() { let (new_body, original) = rewrite_sdp(&fwd_invite.body, lan_ip, rtp_alloc.port); fwd_invite.body = new_body; fwd_invite.update_content_length(); if let Some(ep) = original { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { call.provider_media = Some(addr); } } } let _ = socket.send_to(&fwd_invite.serialize(), device_addr).await; // Store the call. self.calls.insert(call.sip_call_id.clone(), call); Some(call_id) } /// Create an outbound passthrough call (device → provider). pub async fn create_outbound_passthrough( &mut self, invite: &SipMessage, from_addr: SocketAddr, provider_config: &ProviderConfig, config: &AppConfig, rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, ) -> Option { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); let callee = invite.request_uri().unwrap_or("").to_string(); // Allocate RTP port. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => return None, }; let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() { Some(a) => a, None => return None, }; let mut call = PassthroughCall { id: call_id.clone(), sip_call_id: invite.call_id().to_string(), state: CallState::SettingUp, direction: CallDirection::Outbound, created_at: Instant::now(), caller_number: None, callee_number: Some(callee), provider_id: provider_config.id.clone(), provider_addr: provider_dest, provider_media: None, device_addr: from_addr, device_media: None, rtp_port: rtp_alloc.port, rtp_socket: rtp_alloc.socket.clone(), pkt_from_device: 0, pkt_from_provider: 0, }; // Start RTP relay. let rtp_socket = rtp_alloc.socket.clone(); let device_addr_for_relay = from_addr; let provider_addr_for_relay = provider_dest; tokio::spawn(async move { rtp_relay_loop(rtp_socket, device_addr_for_relay, provider_addr_for_relay).await; }); // Rewrite and forward INVITE to provider. let mut fwd_invite = invite.clone(); fwd_invite.prepend_header("Record-Route", &format!("")); // Rewrite Contact to public IP. if let Some(contact) = fwd_invite.get_header("Contact").map(|s| s.to_string()) { let new_contact = rewrite_sip_uri(&contact, pub_ip, lan_port); if new_contact != contact { fwd_invite.set_header("Contact", &new_contact); } } // Rewrite SDP. if fwd_invite.has_sdp_body() { let (new_body, original) = rewrite_sdp(&fwd_invite.body, pub_ip, rtp_alloc.port); fwd_invite.body = new_body; fwd_invite.update_content_length(); if let Some(ep) = original { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { call.device_media = Some(addr); } } } let _ = socket.send_to(&fwd_invite.serialize(), provider_dest).await; self.calls.insert(call.sip_call_id.clone(), call); Some(call_id) } /// Hangup a call by call ID (from TypeScript command). pub async fn hangup(&mut self, call_id: &str, socket: &UdpSocket) -> bool { // Find the call by our internal call ID. let sip_call_id = self .calls .iter() .find(|(_, c)| c.id == call_id) .map(|(k, _)| k.clone()); let sip_call_id = match sip_call_id { Some(id) => id, None => return false, }; let call = match self.calls.get_mut(&sip_call_id) { Some(c) => c, None => return false, }; if call.state == CallState::Terminated { return false; } // Build and send BYE to both sides. // For passthrough, we build a simple BYE using the SIP Call-ID. let bye_msg = format!( "BYE sip:hangup SIP/2.0\r\n\ Via: SIP/2.0/UDP 0.0.0.0:0;branch=z9hG4bK-hangup\r\n\ Call-ID: {}\r\n\ CSeq: 99 BYE\r\n\ Max-Forwards: 70\r\n\ Content-Length: 0\r\n\r\n", sip_call_id ); let bye_bytes = bye_msg.as_bytes(); let _ = socket.send_to(bye_bytes, call.provider_addr).await; let _ = socket.send_to(bye_bytes, call.device_addr).await; call.state = CallState::Terminated; emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call.id, "reason": "hangup_command", "duration": call.duration_secs(), }), ); true } /// Get all active call statuses. pub fn get_all_statuses(&self) -> Vec { self.calls .values() .filter(|c| c.state != CallState::Terminated) .map(|c| c.to_status_json()) .collect() } /// Clean up terminated calls. pub fn cleanup_terminated(&mut self) { self.calls.retain(|_, c| c.state != CallState::Terminated); } /// Check if a SIP Call-ID belongs to any active call. pub fn has_call(&self, sip_call_id: &str) -> bool { self.calls.contains_key(sip_call_id) } // --- Internal helpers --- fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option { for device in &config.devices { if let Some(addr) = registrar.get_device_contact(&device.id) { return Some(addr); } } None } } /// Rewrite SDP for provider→device direction (use LAN IP). fn rewrite_sdp_for_device(msg: &mut SipMessage, lan_ip: &str, rtp_port: u16) { if msg.has_sdp_body() { let (new_body, _original) = rewrite_sdp(&msg.body, lan_ip, rtp_port); msg.body = new_body; msg.update_content_length(); } } /// Rewrite SDP for device→provider direction (use public IP). fn rewrite_sdp_for_provider(msg: &mut SipMessage, pub_ip: &str, rtp_port: u16) { if msg.has_sdp_body() { let (new_body, _original) = rewrite_sdp(&msg.body, pub_ip, rtp_port); msg.body = new_body; msg.update_content_length(); } } /// Bidirectional RTP relay loop. /// Receives packets on the relay socket and forwards based on source address. async fn rtp_relay_loop( socket: Arc, device_addr: SocketAddr, provider_addr: SocketAddr, ) { let mut buf = vec![0u8; 65535]; let device_ip = device_addr.ip().to_string(); let provider_ip = provider_addr.ip().to_string(); // Track learned media endpoints (may differ from signaling addresses). let mut learned_device: Option = None; let mut learned_provider: Option = None; loop { match socket.recv_from(&mut buf).await { Ok((n, from)) => { let data = &buf[..n]; let from_ip = from.ip().to_string(); if from_ip == device_ip || learned_device.map(|d| d == from).unwrap_or(false) { // From device → forward to provider. if learned_device.is_none() { learned_device = Some(from); } if let Some(target) = learned_provider { let _ = socket.send_to(data, target).await; } else { // Provider media not yet learned; try signaling address. let _ = socket.send_to(data, provider_addr).await; } } else if from_ip == provider_ip || learned_provider.map(|p| p == from).unwrap_or(false) { // From provider → forward to device. if learned_provider.is_none() { learned_provider = Some(from); } if let Some(target) = learned_device { let _ = socket.send_to(data, target).await; } else { let _ = socket.send_to(data, device_addr).await; } } else { // Unknown source — try to identify by known device addresses. // For now, assume it's the device if not from provider IP range. if learned_device.is_none() { learned_device = Some(from); } } } Err(_) => { // Socket closed or error — exit relay. break; } } } }