//! Call manager — central registry and orchestration for all calls. //! //! Handles: //! - Inbound passthrough calls (provider → proxy → device) //! - Outbound passthrough calls (device → proxy → provider) //! - SIP message routing by Call-ID //! - BYE/CANCEL handling //! - RTP relay setup //! //! Ported from ts/call/call-manager.ts (passthrough mode). use crate::call::{CallDirection, CallState, PassthroughCall}; use crate::config::{AppConfig, ProviderConfig}; use crate::dtmf::DtmfDetector; use crate::ipc::{emit_event, OutTx}; use crate::registrar::Registrar; use crate::rtp::RtpPortPool; use sip_proto::helpers::parse_sdp_endpoint; use sip_proto::message::SipMessage; use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; use tokio::net::UdpSocket; pub struct CallManager { /// Active passthrough calls, keyed by SIP Call-ID. calls: HashMap, /// Call ID counter. next_call_num: u64, /// Output channel for events. out_tx: OutTx, } impl CallManager { pub fn new(out_tx: OutTx) -> Self { Self { calls: HashMap::new(), next_call_num: 0, out_tx, } } /// Generate a unique call ID. fn next_call_id(&mut self) -> String { let id = format!( "call-{}-{}", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(), self.next_call_num, ); self.next_call_num += 1; id } /// Try to route a SIP message to an existing call. /// Returns true if handled. pub async fn route_sip_message( &mut self, msg: &SipMessage, from_addr: SocketAddr, socket: &UdpSocket, config: &AppConfig, _registrar: &Registrar, ) -> bool { let sip_call_id = msg.call_id().to_string(); // Check if this Call-ID belongs to an active call. if !self.calls.contains_key(&sip_call_id) { return false; } // Extract needed data from the call to avoid borrow conflicts. let (call_id, provider_addr, device_addr, rtp_port, from_provider) = { let call = self.calls.get(&sip_call_id).unwrap(); let from_provider = from_addr.ip().to_string() == call.provider_addr.ip().to_string(); ( call.id.clone(), call.provider_addr, call.device_addr, call.rtp_port, from_provider, ) }; let lan_ip = config.proxy.lan_ip.clone(); let lan_port = config.proxy.lan_port; if msg.is_request() { let method = msg.method().unwrap_or(""); let forward_to = if from_provider { device_addr } else { provider_addr }; // Handle BYE. if method == "BYE" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; let _ = socket.send_to(&msg.serialize(), forward_to).await; let duration = self.calls.get(&sip_call_id).unwrap().duration_secs(); emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call_id, "reason": "bye", "duration": duration, "from_side": if from_provider { "provider" } else { "device" }, }), ); self.calls.get_mut(&sip_call_id).unwrap().state = CallState::Terminated; return true; } // Handle CANCEL. if method == "CANCEL" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; let _ = socket.send_to(&msg.serialize(), forward_to).await; let duration = self.calls.get(&sip_call_id).unwrap().duration_secs(); emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call_id, "reason": "cancel", "duration": duration, }), ); self.calls.get_mut(&sip_call_id).unwrap().state = CallState::Terminated; return true; } // Handle INFO (DTMF relay). if method == "INFO" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; // Detect DTMF from INFO body. if let Some(ct) = msg.get_header("Content-Type") { let mut detector = DtmfDetector::new(call_id.clone(), self.out_tx.clone()); detector.process_sip_info(ct, &msg.body); } return true; } // Forward other requests with SDP rewriting. let mut fwd = msg.clone(); if from_provider { rewrite_sdp_for_device(&mut fwd, &lan_ip, rtp_port); if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) { let new_ruri = rewrite_sip_uri(&ruri, &device_addr.ip().to_string(), device_addr.port()); fwd.set_request_uri(&new_ruri); } } else { rewrite_sdp_for_provider(&mut fwd, &lan_ip, rtp_port); } if fwd.is_dialog_establishing() { fwd.prepend_header("Record-Route", &format!("")); } let _ = socket.send_to(&fwd.serialize(), forward_to).await; return true; } // --- Responses --- if msg.is_response() { let code = msg.status_code().unwrap_or(0); let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase(); let forward_to = if from_provider { device_addr } else { provider_addr }; let mut fwd = msg.clone(); if from_provider { rewrite_sdp_for_device(&mut fwd, &lan_ip, rtp_port); } else { rewrite_sdp_for_provider(&mut fwd, &lan_ip, rtp_port); if let Some(contact) = fwd.get_header("Contact").map(|s| s.to_string()) { let new_contact = rewrite_sip_uri(&contact, &lan_ip, lan_port); if new_contact != contact { fwd.set_header("Contact", &new_contact); } } } // State transitions. if cseq_method == "INVITE" { let call = self.calls.get_mut(&sip_call_id).unwrap(); if (code == 180 || code == 183) && call.state == CallState::SettingUp { call.state = CallState::Ringing; emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); } else if code >= 200 && code < 300 { call.state = CallState::Connected; emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id })); } else if code >= 300 { let duration = call.duration_secs(); call.state = CallState::Terminated; emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call_id, "reason": format!("rejected_{code}"), "duration": duration, }), ); } } let _ = socket.send_to(&fwd.serialize(), forward_to).await; return true; } false } /// Create an inbound passthrough call (provider → device). pub async fn create_inbound_call( &mut self, invite: &SipMessage, from_addr: SocketAddr, provider_id: &str, provider_config: &ProviderConfig, config: &AppConfig, registrar: &Registrar, rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, ) -> Option { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; // Extract caller/callee info. let from_header = invite.get_header("From").unwrap_or(""); let caller_number = SipMessage::extract_uri(from_header) .unwrap_or("Unknown") .to_string(); let called_number = invite .request_uri() .and_then(|uri| SipMessage::extract_uri(uri)) .unwrap_or("") .to_string(); // Resolve target device (first registered device). let device_addr = match self.resolve_first_device(config, registrar) { Some(addr) => addr, None => { // No device registered — route to voicemail. return self .route_to_voicemail( &call_id, invite, from_addr, &caller_number, provider_id, provider_config, config, rtp_pool, socket, public_ip, ) .await; } }; // Allocate RTP port. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => { let resp = SipMessage::create_response(503, "Service Unavailable", invite, None); let _ = socket.send_to(&resp.serialize(), from_addr).await; return None; } }; // Create the call. let mut call = PassthroughCall { id: call_id.clone(), sip_call_id: invite.call_id().to_string(), state: CallState::Ringing, direction: CallDirection::Inbound, created_at: Instant::now(), caller_number: Some(caller_number), callee_number: Some(called_number), provider_id: provider_id.to_string(), provider_addr: from_addr, provider_media: None, device_addr, device_media: None, rtp_port: rtp_alloc.port, rtp_socket: rtp_alloc.socket.clone(), pkt_from_device: 0, pkt_from_provider: 0, }; // Extract provider media from SDP. if invite.has_sdp_body() { if let Some(ep) = parse_sdp_endpoint(&invite.body) { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { call.provider_media = Some(addr); } } } // Start RTP relay. let rtp_socket = rtp_alloc.socket.clone(); let device_addr_for_relay = device_addr; let provider_addr_for_relay = from_addr; tokio::spawn(async move { rtp_relay_loop(rtp_socket, device_addr_for_relay, provider_addr_for_relay).await; }); // Rewrite and forward INVITE to device. let mut fwd_invite = invite.clone(); fwd_invite.set_request_uri(&rewrite_sip_uri( fwd_invite.request_uri().unwrap_or(""), &device_addr.ip().to_string(), device_addr.port(), )); fwd_invite.prepend_header("Record-Route", &format!("")); if fwd_invite.has_sdp_body() { let (new_body, original) = rewrite_sdp(&fwd_invite.body, lan_ip, rtp_alloc.port); fwd_invite.body = new_body; fwd_invite.update_content_length(); if let Some(ep) = original { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { call.provider_media = Some(addr); } } } let _ = socket.send_to(&fwd_invite.serialize(), device_addr).await; // Store the call. self.calls.insert(call.sip_call_id.clone(), call); Some(call_id) } /// Create an outbound passthrough call (device → provider). pub async fn create_outbound_passthrough( &mut self, invite: &SipMessage, from_addr: SocketAddr, provider_config: &ProviderConfig, config: &AppConfig, rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, ) -> Option { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); let callee = invite.request_uri().unwrap_or("").to_string(); // Allocate RTP port. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => return None, }; let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() { Some(a) => a, None => return None, }; let mut call = PassthroughCall { id: call_id.clone(), sip_call_id: invite.call_id().to_string(), state: CallState::SettingUp, direction: CallDirection::Outbound, created_at: Instant::now(), caller_number: None, callee_number: Some(callee), provider_id: provider_config.id.clone(), provider_addr: provider_dest, provider_media: None, device_addr: from_addr, device_media: None, rtp_port: rtp_alloc.port, rtp_socket: rtp_alloc.socket.clone(), pkt_from_device: 0, pkt_from_provider: 0, }; // Start RTP relay. let rtp_socket = rtp_alloc.socket.clone(); let device_addr_for_relay = from_addr; let provider_addr_for_relay = provider_dest; tokio::spawn(async move { rtp_relay_loop(rtp_socket, device_addr_for_relay, provider_addr_for_relay).await; }); // Rewrite and forward INVITE to provider. let mut fwd_invite = invite.clone(); fwd_invite.prepend_header("Record-Route", &format!("")); // Rewrite Contact to public IP. if let Some(contact) = fwd_invite.get_header("Contact").map(|s| s.to_string()) { let new_contact = rewrite_sip_uri(&contact, pub_ip, lan_port); if new_contact != contact { fwd_invite.set_header("Contact", &new_contact); } } // Rewrite SDP. if fwd_invite.has_sdp_body() { let (new_body, original) = rewrite_sdp(&fwd_invite.body, pub_ip, rtp_alloc.port); fwd_invite.body = new_body; fwd_invite.update_content_length(); if let Some(ep) = original { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { call.device_media = Some(addr); } } } let _ = socket.send_to(&fwd_invite.serialize(), provider_dest).await; self.calls.insert(call.sip_call_id.clone(), call); Some(call_id) } /// Hangup a call by call ID (from TypeScript command). pub async fn hangup(&mut self, call_id: &str, socket: &UdpSocket) -> bool { // Find the call by our internal call ID. let sip_call_id = self .calls .iter() .find(|(_, c)| c.id == call_id) .map(|(k, _)| k.clone()); let sip_call_id = match sip_call_id { Some(id) => id, None => return false, }; let call = match self.calls.get_mut(&sip_call_id) { Some(c) => c, None => return false, }; if call.state == CallState::Terminated { return false; } // Build and send BYE to both sides. // For passthrough, we build a simple BYE using the SIP Call-ID. let bye_msg = format!( "BYE sip:hangup SIP/2.0\r\n\ Via: SIP/2.0/UDP 0.0.0.0:0;branch=z9hG4bK-hangup\r\n\ Call-ID: {}\r\n\ CSeq: 99 BYE\r\n\ Max-Forwards: 70\r\n\ Content-Length: 0\r\n\r\n", sip_call_id ); let bye_bytes = bye_msg.as_bytes(); let _ = socket.send_to(bye_bytes, call.provider_addr).await; let _ = socket.send_to(bye_bytes, call.device_addr).await; call.state = CallState::Terminated; emit_event( &self.out_tx, "call_ended", serde_json::json!({ "call_id": call.id, "reason": "hangup_command", "duration": call.duration_secs(), }), ); true } /// Get all active call statuses. pub fn get_all_statuses(&self) -> Vec { self.calls .values() .filter(|c| c.state != CallState::Terminated) .map(|c| c.to_status_json()) .collect() } /// Clean up terminated calls. pub fn cleanup_terminated(&mut self) { self.calls.retain(|_, c| c.state != CallState::Terminated); } /// Check if a SIP Call-ID belongs to any active call. pub fn has_call(&self, sip_call_id: &str) -> bool { self.calls.contains_key(sip_call_id) } // --- Dashboard outbound call (B2BUA) --- /// Initiate an outbound call from the dashboard. /// Builds an INVITE from scratch and sends it to the provider. /// The browser connects separately via WebRTC and gets linked to this call. pub async fn make_outbound_call( &mut self, number: &str, provider_config: &ProviderConfig, config: &AppConfig, rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, registered_aor: &str, ) -> Option { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() { Some(a) => a, None => return None, }; // Allocate RTP port for the provider leg. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => return None, }; // Build the SIP Call-ID for this new dialog. let sip_call_id = sip_proto::helpers::generate_call_id(None); // Build SDP offer. let sdp = sip_proto::helpers::build_sdp(&sip_proto::helpers::SdpOptions { ip: pub_ip, port: rtp_alloc.port, payload_types: &provider_config.codecs, ..Default::default() }); // Build INVITE. let to_uri = format!("sip:{number}@{}", provider_config.domain); let invite = SipMessage::create_request( "INVITE", &to_uri, sip_proto::message::RequestOptions { via_host: pub_ip.to_string(), via_port: lan_port, via_transport: None, via_branch: Some(sip_proto::helpers::generate_branch()), from_uri: registered_aor.to_string(), from_display_name: None, from_tag: Some(sip_proto::helpers::generate_tag()), to_uri: to_uri.clone(), to_display_name: None, to_tag: None, call_id: Some(sip_call_id.clone()), cseq: Some(1), contact: Some(format!("")), max_forwards: Some(70), body: Some(sdp), content_type: Some("application/sdp".to_string()), extra_headers: Some(vec![ ("User-Agent".to_string(), "SipRouter/1.0".to_string()), ("Allow".to_string(), "INVITE, ACK, OPTIONS, CANCEL, BYE, INFO".to_string()), ]), }, ); // Send INVITE to provider. let _ = socket.send_to(&invite.serialize(), provider_dest).await; // Create call entry — device_addr is a dummy (WebRTC will be linked later). let dummy_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); let call = PassthroughCall { id: call_id.clone(), sip_call_id: sip_call_id.clone(), state: CallState::SettingUp, direction: CallDirection::Outbound, created_at: Instant::now(), caller_number: Some(registered_aor.to_string()), callee_number: Some(number.to_string()), provider_id: provider_config.id.clone(), provider_addr: provider_dest, provider_media: None, device_addr: dummy_addr, device_media: None, rtp_port: rtp_alloc.port, rtp_socket: rtp_alloc.socket.clone(), pkt_from_device: 0, pkt_from_provider: 0, }; self.calls.insert(sip_call_id, call); Some(call_id) } // --- Voicemail --- /// Route a call to voicemail: answer the INVITE, play greeting, record message. async fn route_to_voicemail( &mut self, call_id: &str, invite: &SipMessage, from_addr: SocketAddr, caller_number: &str, provider_id: &str, provider_config: &ProviderConfig, config: &AppConfig, rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, ) -> Option { let lan_ip = &config.proxy.lan_ip; let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); // Allocate RTP port for the voicemail session. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => { let resp = SipMessage::create_response(503, "Service Unavailable", invite, None); let _ = socket.send_to(&resp.serialize(), from_addr).await; return None; } }; // Determine provider's preferred codec. let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); // default G.722 // Build SDP with our RTP port. let sdp = sip_proto::helpers::build_sdp(&sip_proto::helpers::SdpOptions { ip: pub_ip, port: rtp_alloc.port, payload_types: &provider_config.codecs, ..Default::default() }); // Answer the INVITE with 200 OK. let response = SipMessage::create_response( 200, "OK", invite, Some(sip_proto::message::ResponseOptions { to_tag: Some(sip_proto::helpers::generate_tag()), contact: Some(format!("", lan_ip, config.proxy.lan_port)), body: Some(sdp), content_type: Some("application/sdp".to_string()), ..Default::default() }), ); let _ = socket.send_to(&response.serialize(), from_addr).await; // Extract provider media from original SDP. let provider_media = if invite.has_sdp_body() { sip_proto::helpers::parse_sdp_endpoint(&invite.body) .and_then(|ep| format!("{}:{}", ep.address, ep.port).parse().ok()) } else { Some(from_addr) // fallback to signaling address }; let provider_media = provider_media.unwrap_or(from_addr); // Create a voicemail call entry for BYE routing. let call = PassthroughCall { id: call_id.to_string(), sip_call_id: invite.call_id().to_string(), state: CallState::Voicemail, direction: CallDirection::Inbound, created_at: std::time::Instant::now(), caller_number: Some(caller_number.to_string()), callee_number: None, provider_id: provider_id.to_string(), provider_addr: from_addr, provider_media: Some(provider_media), device_addr: from_addr, // no device — just use provider addr as placeholder device_media: None, rtp_port: rtp_alloc.port, rtp_socket: rtp_alloc.socket.clone(), pkt_from_device: 0, pkt_from_provider: 0, }; self.calls.insert(invite.call_id().to_string(), call); // Build recording file path. let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); let recording_dir = format!(".nogit/voicemail/default"); let recording_path = format!("{recording_dir}/msg-{timestamp}.wav"); // Look for a greeting WAV file. let greeting_wav = find_greeting_wav(); // Spawn the voicemail session. let out_tx = self.out_tx.clone(); let call_id_owned = call_id.to_string(); let caller_owned = caller_number.to_string(); let rtp_socket = rtp_alloc.socket; tokio::spawn(async move { crate::voicemail::run_voicemail_session( rtp_socket, provider_media, codec_pt, greeting_wav, recording_path, 120_000, // max 120 seconds call_id_owned, caller_owned, out_tx, ) .await; }); Some(call_id.to_string()) } // --- Internal helpers --- fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option { for device in &config.devices { if let Some(addr) = registrar.get_device_contact(&device.id) { return Some(addr); } } None // No device registered — caller goes to voicemail. } } /// Find a voicemail greeting WAV file. fn find_greeting_wav() -> Option { // Check common locations for a pre-generated greeting. let candidates = [ ".nogit/voicemail/default/greeting.wav", ".nogit/voicemail/greeting.wav", ]; for path in &candidates { if std::path::Path::new(path).exists() { return Some(path.to_string()); } } None // No greeting found — voicemail will just play the beep. } /// Rewrite SDP for provider→device direction (use LAN IP). fn rewrite_sdp_for_device(msg: &mut SipMessage, lan_ip: &str, rtp_port: u16) { if msg.has_sdp_body() { let (new_body, _original) = rewrite_sdp(&msg.body, lan_ip, rtp_port); msg.body = new_body; msg.update_content_length(); } } /// Rewrite SDP for device→provider direction (use public IP). fn rewrite_sdp_for_provider(msg: &mut SipMessage, pub_ip: &str, rtp_port: u16) { if msg.has_sdp_body() { let (new_body, _original) = rewrite_sdp(&msg.body, pub_ip, rtp_port); msg.body = new_body; msg.update_content_length(); } } /// Bidirectional RTP relay loop. /// Receives packets on the relay socket and forwards based on source address. async fn rtp_relay_loop( socket: Arc, device_addr: SocketAddr, provider_addr: SocketAddr, ) { let mut buf = vec![0u8; 65535]; let device_ip = device_addr.ip().to_string(); let provider_ip = provider_addr.ip().to_string(); // Track learned media endpoints (may differ from signaling addresses). let mut learned_device: Option = None; let mut learned_provider: Option = None; loop { match socket.recv_from(&mut buf).await { Ok((n, from)) => { let data = &buf[..n]; let from_ip = from.ip().to_string(); if from_ip == device_ip || learned_device.map(|d| d == from).unwrap_or(false) { // From device → forward to provider. if learned_device.is_none() { learned_device = Some(from); } if let Some(target) = learned_provider { let _ = socket.send_to(data, target).await; } else { // Provider media not yet learned; try signaling address. let _ = socket.send_to(data, provider_addr).await; } } else if from_ip == provider_ip || learned_provider.map(|p| p == from).unwrap_or(false) { // From provider → forward to device. if learned_provider.is_none() { learned_provider = Some(from); } if let Some(target) = learned_device { let _ = socket.send_to(data, target).await; } else { let _ = socket.send_to(data, device_addr).await; } } else { // Unknown source — try to identify by known device addresses. // For now, assume it's the device if not from provider IP range. if learned_device.is_none() { learned_device = Some(from); } } } Err(_) => { // Socket closed or error — exit relay. break; } } } }