Files
siprouter/rust/crates/proxy-engine/src/call_manager.rs

1557 lines
60 KiB
Rust
Raw Normal View History

//! Call manager — central registry and orchestration for all calls.
//!
//! Unified model: every call owns N legs and a mixer task.
//! Legs can be SIP (provider/device), WebRTC (browser), or Media (voicemail/IVR).
//! The mixer provides mix-minus audio to all participants.
use crate::call::{Call, CallDirection, CallState, LegId, LegInfo, LegKind, LegState};
use crate::config::{AppConfig, ProviderConfig};
use crate::ipc::{emit_event, OutTx};
use crate::leg_io::{create_leg_channels, spawn_sip_inbound, spawn_sip_outbound};
use crate::mixer::spawn_mixer;
use crate::registrar::Registrar;
use crate::rtp::RtpPortPool;
use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig};
use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions};
use sip_proto::message::{ResponseOptions, SipMessage};
use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
pub struct CallManager {
/// All active calls, keyed by internal call ID.
pub calls: HashMap<String, Call>,
/// Index: SIP Call-ID → (internal call_id, leg_id).
/// Each SIP leg in a call has its own SIP Call-ID.
sip_index: HashMap<String, (String, LegId)>,
/// Call ID counter.
next_call_num: u64,
/// Output channel for events.
out_tx: OutTx,
}
impl CallManager {
pub fn new(out_tx: OutTx) -> Self {
Self {
calls: HashMap::new(),
sip_index: HashMap::new(),
next_call_num: 0,
out_tx,
}
}
fn next_call_id(&mut self) -> String {
let id = format!(
"call-{}-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis(),
self.next_call_num,
);
self.next_call_num += 1;
id
}
fn next_leg_id(&mut self) -> String {
self.next_call_num += 1;
format!("leg-{}", self.next_call_num)
}
/// Check if a SIP Call-ID belongs to any active call.
pub fn has_call(&self, sip_call_id: &str) -> bool {
self.sip_index.contains_key(sip_call_id)
}
/// Get an RTP socket for a call's provider leg (used by webrtc_link).
pub fn get_call_provider_rtp_socket(&self, call_id: &str) -> Option<Arc<UdpSocket>> {
let call = self.calls.get(call_id)?;
for leg in call.legs.values() {
if leg.kind == LegKind::SipProvider {
return leg.rtp_socket.clone();
}
}
None
}
/// Get all active call statuses for the dashboard.
pub fn get_all_statuses(&self) -> Vec<serde_json::Value> {
self.calls
.values()
.filter(|c| c.state != CallState::Terminated)
.map(|c| c.to_status_json())
.collect()
}
// -----------------------------------------------------------------------
// SIP message routing
// -----------------------------------------------------------------------
/// Route a SIP message to the correct call and leg.
/// Returns true if the message was handled.
pub async fn route_sip_message(
&mut self,
msg: &SipMessage,
from_addr: SocketAddr,
socket: &UdpSocket,
config: &AppConfig,
) -> bool {
let sip_call_id = msg.call_id().to_string();
let (call_id, leg_id) = match self.sip_index.get(&sip_call_id) {
Some((cid, lid)) => (cid.clone(), lid.clone()),
None => return false,
};
// Check if this is a B2BUA leg (has a SipLeg with dialog management).
let is_b2bua_leg = self
.calls
.get(&call_id)
.and_then(|c| c.legs.get(&leg_id))
.map(|l| l.sip_leg.is_some())
.unwrap_or(false);
if is_b2bua_leg {
return self
.route_b2bua_message(&call_id, &leg_id, msg, from_addr, socket)
.await;
}
// Passthrough-style routing for inbound/outbound device↔provider calls.
// The sip_index only stores one leg for shared Call-IDs, so we need to
// determine which leg the message actually belongs to by comparing from_addr.
let actual_leg_id = self
.calls
.get(&call_id)
.and_then(|call| {
call.legs
.values()
.find(|l| l.signaling_addr == Some(from_addr))
.map(|l| l.id.clone())
})
.unwrap_or(leg_id);
self.route_passthrough_message(&call_id, &actual_leg_id, msg, from_addr, socket, config)
.await
}
/// Route a message to a B2BUA leg (has SipLeg dialog management).
async fn route_b2bua_message(
&mut self,
call_id: &str,
leg_id: &str,
msg: &SipMessage,
from_addr: SocketAddr,
socket: &UdpSocket,
) -> bool {
// Process the SipLeg action first, extracting all needed data.
let (action, target, codecs, rtp_socket_clone) = {
let call = match self.calls.get_mut(call_id) {
Some(c) => c,
None => return false,
};
let leg = match call.legs.get_mut(leg_id) {
Some(l) => l,
None => return false,
};
let sip_leg = match &mut leg.sip_leg {
Some(sl) => sl,
None => return false,
};
let action = sip_leg.handle_message(msg);
let target = sip_leg.config.sip_target;
let codecs = sip_leg.config.codecs.clone();
let rtp_socket_clone = leg.rtp_socket.clone();
(action, target, codecs, rtp_socket_clone)
};
// Mutable borrow on call/leg is now released.
let sip_pt = codecs.first().copied().unwrap_or(9);
match action {
SipLegAction::None => {}
SipLegAction::Send(buf) => {
let _ = socket.send_to(&buf, target).await;
}
SipLegAction::StateChange(crate::sip_leg::LegState::Ringing) => {
if let Some(call) = self.calls.get_mut(call_id) {
if let Some(leg) = call.legs.get_mut(leg_id) {
leg.state = LegState::Ringing;
}
// Forward 180 Ringing to device if this is a device-originated call.
if let Some(device_invite) = &call.device_invite {
let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice);
if let Some(dev) = device_leg {
if let Some(dev_addr) = dev.signaling_addr {
let ringing = SipMessage::create_response(180, "Ringing", device_invite, None);
let _ = socket.send_to(&ringing.serialize(), dev_addr).await;
}
}
}
}
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }));
}
SipLegAction::ConnectedWithAck(ack_buf) => {
let _ = socket.send_to(&ack_buf, target).await;
// Update leg state and get remote media.
let remote = {
let call = self.calls.get_mut(call_id).unwrap();
let leg = call.legs.get_mut(leg_id).unwrap();
let sip_leg = leg.sip_leg.as_ref().unwrap();
let remote = sip_leg.remote_media;
leg.state = LegState::Connected;
leg.remote_media = remote;
call.state = CallState::Connected;
remote
};
// Wire the provider leg to the mixer if remote media is known.
if let (Some(remote_addr), Some(rtp_socket)) = (remote, rtp_socket_clone) {
let channels = create_leg_channels();
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
spawn_sip_outbound(rtp_socket, remote_addr, channels.outbound_rx);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(leg_id, sip_pt, channels.inbound_rx, channels.outbound_tx)
.await;
}
}
// For device-originated calls: send 200 OK to device and wire device leg.
if let Some(call) = self.calls.get(call_id) {
if let Some(device_invite) = call.device_invite.clone() {
let device_leg_info: Option<(SocketAddr, u16, Arc<UdpSocket>, Option<SocketAddr>, String)> =
call.legs.values().find(|l| l.kind == LegKind::SipDevice).and_then(|dev| {
Some((
dev.signaling_addr?,
dev.rtp_port,
dev.rtp_socket.clone()?,
dev.remote_media,
dev.id.clone(),
))
});
if let Some((dev_addr, dev_rtp_port, dev_rtp_socket, dev_remote, dev_leg_id)) = device_leg_info {
// Build SDP pointing device to our device_rtp port.
// Use LAN IP for the device (it's on the local network).
let call_ref = self.calls.get(call_id).unwrap();
let prov_leg = call_ref.legs.values().find(|l| l.kind == LegKind::SipProvider);
let lan_ip_str = prov_leg
.and_then(|l| l.sip_leg.as_ref())
.map(|sl| sl.config.lan_ip.clone())
.unwrap_or_else(|| "0.0.0.0".to_string());
let sdp = build_sdp(&SdpOptions {
ip: &lan_ip_str,
port: dev_rtp_port,
..Default::default()
});
let ok = SipMessage::create_response(200, "OK", &device_invite, Some(ResponseOptions {
to_tag: Some(generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip_str, 5060)),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: None,
}));
let _ = socket.send_to(&ok.serialize(), dev_addr).await;
// Update device leg state.
if let Some(call) = self.calls.get_mut(call_id) {
if let Some(dev_leg) = call.legs.get_mut(&dev_leg_id) {
dev_leg.state = LegState::Connected;
}
}
// Wire device leg to mixer.
if let Some(dev_remote_addr) = dev_remote {
let dev_channels = create_leg_channels();
spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx);
spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(&dev_leg_id, sip_pt, dev_channels.inbound_rx, dev_channels.outbound_tx)
.await;
}
}
}
}
}
emit_event(&self.out_tx, "call_answered", serde_json::json!({
"call_id": call_id,
"provider_media_addr": remote.map(|a| a.ip().to_string()),
"provider_media_port": remote.map(|a| a.port()),
"sip_pt": sip_pt,
}));
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }));
}
SipLegAction::Terminated(reason) => {
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
// Notify device if this is a device-originated outbound call.
if let Some(call) = self.calls.get(call_id) {
if let Some(device_invite) = &call.device_invite {
let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice);
if let Some(dev) = device_leg {
if let Some(dev_addr) = dev.signaling_addr {
// Map reason to SIP response code.
let code: u16 = if reason.starts_with("rejected_") {
reason.strip_prefix("rejected_")
.and_then(|s| s.parse().ok())
.unwrap_or(503)
} else if reason == "bye" {
// Provider sent BYE — send BYE to device too.
// (200 OK already connected; just let terminate_call handle it)
0
} else {
503
};
if code > 0 && dev.state != LegState::Connected {
let resp = SipMessage::create_response(code, "Service Unavailable", device_invite, None);
let _ = socket.send_to(&resp.serialize(), dev_addr).await;
}
}
}
}
}
if let Some(call) = self.calls.get_mut(call_id) {
if let Some(leg) = call.legs.get_mut(leg_id) {
leg.state = LegState::Terminated;
}
}
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
self.terminate_call(call_id).await;
return true;
}
SipLegAction::SendAndTerminate(buf, reason) => {
let _ = socket.send_to(&buf, from_addr).await;
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
self.terminate_call(call_id).await;
return true;
}
SipLegAction::AuthRetry { ack_407, invite_with_auth } => {
if let Some(ack) = ack_407 {
let _ = socket.send_to(&ack, target).await;
}
let _ = socket.send_to(&invite_with_auth, target).await;
}
_ => {}
}
true
}
/// Route a passthrough-style message (inbound/outbound device↔provider).
/// In the new model, both sides still go through the mixer, but SIP signaling
/// is forwarded between the two endpoints with SDP rewriting.
async fn route_passthrough_message(
&mut self,
call_id: &str,
this_leg_id: &str,
msg: &SipMessage,
from_addr: SocketAddr,
socket: &UdpSocket,
config: &AppConfig,
) -> bool {
let call = match self.calls.get_mut(call_id) {
Some(c) => c,
None => return false,
};
// Find the "other" leg — the one we forward to.
let this_leg = call.legs.get(this_leg_id);
let this_kind = this_leg.map(|l| l.kind).unwrap_or(LegKind::SipProvider);
// Find the counterpart leg.
let other_leg = call.legs.values().find(|l| l.id != this_leg_id && l.state != LegState::Terminated);
let (other_addr, other_rtp_port, other_leg_id) = match other_leg {
Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone()),
None => return false,
};
let forward_to = match other_addr {
Some(a) => a,
None => return false,
};
let lan_ip = config.proxy.lan_ip.clone();
let lan_port = config.proxy.lan_port;
// Get this leg's RTP port (for SDP rewriting — tell the other side to send RTP here).
let this_rtp_port = call.legs.get(this_leg_id).map(|l| l.rtp_port).unwrap_or(0);
// Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt).
let other_has_sip_leg = call.legs.get(&other_leg_id)
.map(|l| l.sip_leg.is_some())
.unwrap_or(false);
if msg.is_request() {
let method = msg.method().unwrap_or("");
// ACK: In hybrid B2BUA mode, the device's ACK for our 200 OK
// is absorbed silently (provider's 200 was already ACKed by SipLeg).
if method == "ACK" {
if other_has_sip_leg {
return true; // Absorb — provider ACK handled by SipLeg.
}
// Pure passthrough: forward ACK normally.
let _ = socket.send_to(&msg.serialize(), forward_to).await;
return true;
}
// INVITE retransmit: the call already exists, re-send 100 Trying.
if method == "INVITE" {
let trying = SipMessage::create_response(100, "Trying", msg, None);
let _ = socket.send_to(&trying.serialize(), from_addr).await;
return true;
}
if method == "BYE" {
let ok = SipMessage::create_response(200, "OK", msg, None);
let _ = socket.send_to(&ok.serialize(), from_addr).await;
// If other leg has SipLeg, use build_hangup for proper dialog teardown.
if other_has_sip_leg {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
} else {
let _ = socket.send_to(&msg.serialize(), forward_to).await;
}
let duration = call.duration_secs();
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({
"call_id": call_id,
"reason": "bye",
"duration": duration,
}),
);
self.terminate_call(call_id).await;
return true;
}
if method == "CANCEL" {
let ok = SipMessage::create_response(200, "OK", msg, None);
let _ = socket.send_to(&ok.serialize(), from_addr).await;
// If other leg has SipLeg, use build_hangup (produces CANCEL for early dialog).
if other_has_sip_leg {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
} else {
let _ = socket.send_to(&msg.serialize(), forward_to).await;
}
let duration = call.duration_secs();
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": "cancel", "duration": duration }),
);
self.terminate_call(call_id).await;
return true;
}
if method == "INFO" {
let ok = SipMessage::create_response(200, "OK", msg, None);
let _ = socket.send_to(&ok.serialize(), from_addr).await;
return true;
}
// Forward other requests with SDP rewriting.
let mut fwd = msg.clone();
// Rewrite SDP to point the other side to this leg's RTP port
// (so we receive their audio on our socket).
if fwd.has_sdp_body() {
let (new_body, _) = rewrite_sdp(&fwd.body, &lan_ip, other_rtp_port);
fwd.body = new_body;
fwd.update_content_length();
}
if this_kind == LegKind::SipProvider {
// From provider → forward to device: rewrite request URI.
if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) {
let new_ruri = rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port());
fwd.set_request_uri(&new_ruri);
}
}
if fwd.is_dialog_establishing() {
fwd.prepend_header("Record-Route", &format!("<sip:{lan_ip}:{lan_port};lr>"));
}
let _ = socket.send_to(&fwd.serialize(), forward_to).await;
return true;
}
// --- Responses ---
if msg.is_response() {
let code = msg.status_code().unwrap_or(0);
let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase();
let mut fwd = msg.clone();
// Rewrite SDP so the forward-to side sends RTP to the correct leg port.
if fwd.has_sdp_body() {
let rewrite_ip = if this_kind == LegKind::SipDevice {
// Response from device → send to provider: use LAN/public IP.
&lan_ip
} else {
&lan_ip
};
let (new_body, _) = rewrite_sdp(&fwd.body, rewrite_ip, other_rtp_port);
fwd.body = new_body;
fwd.update_content_length();
}
// State transitions on INVITE responses.
if cseq_method == "INVITE" {
if code == 180 || code == 183 {
if call.state == CallState::SettingUp {
call.state = CallState::Ringing;
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
}
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Connected;
// Learn remote media from SDP.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
leg.remote_media = Some(addr);
}
}
}
needs_wiring = true;
}
if call.state != CallState::Connected {
call.state = CallState::Connected;
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
}
// Forward the response before wiring (drop call borrow).
let _ = socket.send_to(&fwd.serialize(), forward_to).await;
// Wire legs to mixer (needs &mut self, so call borrow must be released).
if needs_wiring {
self.maybe_wire_passthrough_legs(call_id).await;
}
return true;
} else if code >= 300 {
let duration = call.duration_secs();
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": format!("rejected_{code}"), "duration": duration }),
);
// Don't terminate yet — let the forward happen first.
}
}
let _ = socket.send_to(&fwd.serialize(), forward_to).await;
return true;
}
false
}
/// Wire passthrough legs to the mixer once both have remote media addresses.
async fn maybe_wire_passthrough_legs(&mut self, call_id: &str) {
let call = match self.calls.get(call_id) {
Some(c) => c,
None => return,
};
// Collect legs that need wiring (have remote_media + rtp_socket but aren't yet in mixer).
let mut to_wire: Vec<(String, u8, Arc<UdpSocket>, SocketAddr)> = Vec::new();
for leg in call.legs.values() {
if leg.state == LegState::Connected || leg.state == LegState::Ringing {
if let (Some(rtp_socket), Some(remote)) = (&leg.rtp_socket, leg.remote_media) {
to_wire.push((leg.id.clone(), leg.codec_pt, rtp_socket.clone(), remote));
}
}
}
// Only wire if we have at least 2 legs ready.
if to_wire.len() < 2 {
return;
}
let call = match self.calls.get(call_id) {
Some(c) => c,
None => return,
};
for (leg_id, codec_pt, rtp_socket, remote) in to_wire {
let channels = create_leg_channels();
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
spawn_sip_outbound(rtp_socket, remote, channels.outbound_rx);
call.add_leg_to_mixer(&leg_id, codec_pt, channels.inbound_rx, channels.outbound_tx)
.await;
}
}
// -----------------------------------------------------------------------
// Call creation
// -----------------------------------------------------------------------
/// Create an inbound call (provider → device).
pub async fn create_inbound_call(
&mut self,
invite: &SipMessage,
from_addr: SocketAddr,
provider_id: &str,
provider_config: &ProviderConfig,
config: &AppConfig,
registrar: &Registrar,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
) -> Option<String> {
let call_id = self.next_call_id();
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let sip_call_id = invite.call_id().to_string();
// Extract caller/callee info.
let from_header = invite.get_header("From").unwrap_or("");
let caller_number = SipMessage::extract_uri(from_header)
.unwrap_or("Unknown")
.to_string();
let called_number = invite
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or("")
.to_string();
// Resolve target device.
let device_addr = match self.resolve_first_device(config, registrar) {
Some(addr) => addr,
None => {
// No device registered → voicemail.
return self
.route_to_voicemail(
&call_id, invite, from_addr, &caller_number,
provider_id, provider_config, config, rtp_pool, socket, public_ip,
)
.await;
}
};
// Allocate RTP ports for both legs.
let provider_rtp = match rtp_pool.allocate().await {
Some(a) => a,
None => {
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
}
};
let device_rtp = match rtp_pool.allocate().await {
Some(a) => a,
None => {
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
}
};
// Create the call with a mixer.
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone());
let mut call = Call::new(
call_id.clone(),
CallDirection::Inbound,
provider_id.to_string(),
mixer_cmd_tx,
mixer_task,
);
call.caller_number = Some(caller_number);
call.callee_number = Some(called_number);
call.state = CallState::Ringing;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
// Provider leg — extract media from SDP.
let mut provider_media: Option<SocketAddr> = None;
if invite.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&invite.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
provider_media = Some(addr);
}
}
}
let provider_leg_id = format!("{call_id}-prov");
call.legs.insert(
provider_leg_id.clone(),
LegInfo {
id: provider_leg_id.clone(),
kind: LegKind::SipProvider,
state: LegState::Connected, // Provider already connected (sent us the INVITE).
codec_pt,
sip_leg: None,
sip_call_id: Some(sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(provider_rtp.socket.clone()),
rtp_port: provider_rtp.port,
remote_media: provider_media,
signaling_addr: Some(from_addr),
metadata: HashMap::new(),
},
);
// Device leg.
let device_leg_id = format!("{call_id}-dev");
call.legs.insert(
device_leg_id.clone(),
LegInfo {
id: device_leg_id.clone(),
kind: LegKind::SipDevice,
state: LegState::Inviting,
codec_pt,
sip_leg: None,
sip_call_id: Some(sip_call_id.clone()), // Same SIP Call-ID for passthrough.
webrtc_session_id: None,
rtp_socket: Some(device_rtp.socket.clone()),
rtp_port: device_rtp.port,
remote_media: None, // Learned from device's 200 OK.
signaling_addr: Some(device_addr),
metadata: HashMap::new(),
},
);
// Register SIP Call-ID → both legs (provider leg handles provider messages).
// For passthrough, both legs share the same SIP Call-ID.
// We route based on source address in route_passthrough_message.
self.sip_index
.insert(sip_call_id.clone(), (call_id.clone(), provider_leg_id.clone()));
// Rewrite and forward INVITE to device.
let mut fwd_invite = invite.clone();
fwd_invite.set_request_uri(&rewrite_sip_uri(
fwd_invite.request_uri().unwrap_or(""),
&device_addr.ip().to_string(),
device_addr.port(),
));
fwd_invite.prepend_header("Record-Route", &format!("<sip:{lan_ip}:{lan_port};lr>"));
// Rewrite SDP: tell the device to send RTP to the device leg's port.
if fwd_invite.has_sdp_body() {
let (new_body, _) = rewrite_sdp(&fwd_invite.body, lan_ip, device_rtp.port);
fwd_invite.body = new_body;
fwd_invite.update_content_length();
}
let _ = socket.send_to(&fwd_invite.serialize(), device_addr).await;
// Store the call.
self.calls.insert(call_id.clone(), call);
Some(call_id)
}
/// Initiate an outbound B2BUA call from the dashboard.
/// Creates a Call with a single SipLeg (provider). WebRTC leg added later via webrtc_link.
pub async fn make_outbound_call(
&mut self,
number: &str,
provider_config: &ProviderConfig,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
registered_aor: &str,
) -> Option<String> {
let call_id = self.next_call_id();
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
Some(a) => a,
None => return None,
};
let rtp_alloc = match rtp_pool.allocate().await {
Some(a) => a,
None => return None,
};
let sip_call_id = generate_call_id(None);
// Create SipLeg for provider.
let leg_config = SipLegConfig {
lan_ip: lan_ip.clone(),
lan_port,
public_ip: public_ip.map(|s| s.to_string()),
sip_target: provider_dest,
username: Some(provider_config.username.clone()),
password: Some(provider_config.password.clone()),
registered_aor: Some(registered_aor.to_string()),
codecs: provider_config.codecs.clone(),
rtp_port: rtp_alloc.port,
};
let leg_id = format!("{call_id}-prov");
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
// Send INVITE.
let to_uri = format!("sip:{number}@{}", provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
// Create call with mixer.
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone());
let mut call = Call::new(
call_id.clone(),
CallDirection::Outbound,
provider_config.id.clone(),
mixer_cmd_tx,
mixer_task,
);
call.callee_number = Some(number.to_string());
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
call.legs.insert(
leg_id.clone(),
LegInfo {
id: leg_id.clone(),
kind: LegKind::SipProvider,
state: LegState::Inviting,
codec_pt,
sip_leg: Some(sip_leg),
sip_call_id: Some(sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(rtp_alloc.socket.clone()),
rtp_port: rtp_alloc.port,
remote_media: None,
signaling_addr: Some(provider_dest),
metadata: HashMap::new(),
},
);
// Register for SIP routing.
self.sip_index
.insert(sip_call_id, (call_id.clone(), leg_id));
self.calls.insert(call_id.clone(), call);
Some(call_id)
}
/// Create a device-originated outbound call (device → provider) using hybrid B2BUA.
///
/// The device side is a simple passthrough leg (no SipLeg needed).
/// The provider side uses a full SipLeg for proper dialog management,
/// 407 auth, correct From URI, and public IP in SDP.
pub async fn create_device_outbound_call(
&mut self,
invite: &SipMessage,
from_addr: SocketAddr,
provider_config: &ProviderConfig,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
registered_aor: &str,
) -> Option<String> {
let call_id = self.next_call_id();
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let device_sip_call_id = invite.call_id().to_string();
// Extract just the user part from the request URI (e.g., "sip:16196000@10.0.0.1" → "16196000").
// extract_uri is for header values with angle brackets, not bare request URIs.
let dialed_number = invite
.request_uri()
.map(|uri| {
let stripped = uri
.strip_prefix("sip:")
.or_else(|| uri.strip_prefix("sips:"))
.unwrap_or(uri);
stripped.split('@').next().unwrap_or(stripped).to_string()
})
.unwrap_or_default();
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
Some(a) => a,
None => return None,
};
// Send 100 Trying to device immediately to stop retransmissions.
let trying = SipMessage::create_response(100, "Trying", invite, None);
let _ = socket.send_to(&trying.serialize(), from_addr).await;
// Allocate RTP ports for both legs.
let device_rtp = match rtp_pool.allocate().await {
Some(a) => a,
None => return None,
};
let provider_rtp = match rtp_pool.allocate().await {
Some(a) => a,
None => return None,
};
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
// Create call with mixer.
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone());
let mut call = Call::new(
call_id.clone(),
CallDirection::Outbound,
provider_config.id.clone(),
mixer_cmd_tx,
mixer_task,
);
call.callee_number = Some(dialed_number.clone());
call.device_invite = Some(invite.clone());
// --- Device leg (passthrough, no SipLeg) ---
let device_leg_id = format!("{call_id}-dev");
let mut device_media: Option<SocketAddr> = None;
if invite.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&invite.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
device_media = Some(addr);
}
}
}
call.legs.insert(
device_leg_id.clone(),
LegInfo {
id: device_leg_id.clone(),
kind: LegKind::SipDevice,
state: LegState::Inviting, // Not connected yet — waiting for provider answer
codec_pt,
sip_leg: None,
sip_call_id: Some(device_sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(device_rtp.socket.clone()),
rtp_port: device_rtp.port,
remote_media: device_media,
signaling_addr: Some(from_addr),
metadata: HashMap::new(),
},
);
// Register device's SIP Call-ID → device leg.
self.sip_index
.insert(device_sip_call_id, (call_id.clone(), device_leg_id));
// --- Provider leg (B2BUA with SipLeg) ---
let provider_leg_id = format!("{call_id}-prov");
let provider_sip_call_id = generate_call_id(None);
let leg_config = SipLegConfig {
lan_ip: lan_ip.clone(),
lan_port,
public_ip: public_ip.map(|s| s.to_string()),
sip_target: provider_dest,
username: Some(provider_config.username.clone()),
password: Some(provider_config.password.clone()),
registered_aor: Some(registered_aor.to_string()),
codecs: provider_config.codecs.clone(),
rtp_port: provider_rtp.port,
};
let mut sip_leg = SipLeg::new(provider_leg_id.clone(), leg_config);
// Build proper To URI and send INVITE.
let to_uri = format!("sip:{}@{}", dialed_number, provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket).await;
call.legs.insert(
provider_leg_id.clone(),
LegInfo {
id: provider_leg_id.clone(),
kind: LegKind::SipProvider,
state: LegState::Inviting,
codec_pt,
sip_leg: Some(sip_leg),
sip_call_id: Some(provider_sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(provider_rtp.socket.clone()),
rtp_port: provider_rtp.port,
remote_media: None,
signaling_addr: Some(provider_dest),
metadata: HashMap::new(),
},
);
// Register provider's SIP Call-ID → provider leg.
self.sip_index
.insert(provider_sip_call_id, (call_id.clone(), provider_leg_id));
self.calls.insert(call_id.clone(), call);
Some(call_id)
}
// -----------------------------------------------------------------------
// Leg management (mid-call add/remove)
// -----------------------------------------------------------------------
/// Add a SIP leg to an existing call (e.g., add external participant).
pub async fn add_external_leg(
&mut self,
call_id: &str,
number: &str,
provider_config: &ProviderConfig,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
registered_aor: &str,
) -> Option<String> {
let call = self.calls.get(call_id)?;
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let provider_dest: SocketAddr = provider_config.outbound_proxy.to_socket_addr()?;
let rtp_alloc = rtp_pool.allocate().await?;
let sip_call_id = generate_call_id(None);
let leg_id = self.next_leg_id();
let leg_config = SipLegConfig {
lan_ip: lan_ip.clone(),
lan_port,
public_ip: public_ip.map(|s| s.to_string()),
sip_target: provider_dest,
username: Some(provider_config.username.clone()),
password: Some(provider_config.password.clone()),
registered_aor: Some(registered_aor.to_string()),
codecs: provider_config.codecs.clone(),
rtp_port: rtp_alloc.port,
};
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
let to_uri = format!("sip:{number}@{}", provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
let leg_info = LegInfo {
id: leg_id.clone(),
kind: LegKind::SipProvider,
state: LegState::Inviting,
codec_pt,
sip_leg: Some(sip_leg),
sip_call_id: Some(sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(rtp_alloc.socket.clone()),
rtp_port: rtp_alloc.port,
remote_media: None,
signaling_addr: Some(provider_dest),
metadata: HashMap::new(),
};
self.sip_index
.insert(sip_call_id, (call_id.to_string(), leg_id.clone()));
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-provider",
"state": "inviting",
"number": number,
}),
);
Some(leg_id)
}
/// Add a local SIP device to an existing call (mid-call INVITE to desk phone).
pub async fn add_device_leg(
&mut self,
call_id: &str,
device_id: &str,
registrar: &Registrar,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
) -> Option<String> {
let device_addr = registrar.get_device_contact(device_id)?;
let call = self.calls.get(call_id)?;
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let rtp_alloc = rtp_pool.allocate().await?;
let sip_call_id = generate_call_id(None);
let leg_id = self.next_leg_id();
// Use G.722 by default for local devices (most SIP phones support it).
let codec_pt: u8 = 9;
// Build a B2BUA SipLeg targeting the device.
let leg_config = SipLegConfig {
lan_ip: lan_ip.clone(),
lan_port,
public_ip: None, // local device — no public IP needed
sip_target: device_addr,
username: None,
password: None,
registered_aor: None,
codecs: vec![codec_pt, 0], // G.722, PCMU fallback
rtp_port: rtp_alloc.port,
};
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
let to_uri = format!("sip:{}@{}:{}", device_id, device_addr.ip(), device_addr.port());
let from_uri = format!("sip:sipproxy@{lan_ip}:{lan_port}");
sip_leg.send_invite(&from_uri, &to_uri, &sip_call_id, socket).await;
let leg_info = LegInfo {
id: leg_id.clone(),
kind: LegKind::SipDevice,
state: LegState::Inviting,
codec_pt,
sip_leg: Some(sip_leg),
sip_call_id: Some(sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(rtp_alloc.socket.clone()),
rtp_port: rtp_alloc.port,
remote_media: None,
signaling_addr: Some(device_addr),
metadata: HashMap::new(),
};
self.sip_index
.insert(sip_call_id, (call_id.to_string(), leg_id.clone()));
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-device",
"state": "inviting",
"device_id": device_id,
}),
);
Some(leg_id)
}
/// Remove a leg from a call.
pub async fn remove_leg(
&mut self,
call_id: &str,
leg_id: &str,
socket: &UdpSocket,
) -> bool {
let call = match self.calls.get_mut(call_id) {
Some(c) => c,
None => return false,
};
// Remove from mixer.
call.remove_leg_from_mixer(leg_id).await;
// Send BYE if it's a SIP leg.
if let Some(leg) = call.legs.get_mut(leg_id) {
if let Some(sip_leg) = &mut leg.sip_leg {
if let Some(hangup_bytes) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
}
}
leg.state = LegState::Terminated;
// Clean up SIP index.
if let Some(sip_cid) = &leg.sip_call_id {
self.sip_index.remove(sip_cid);
}
}
emit_event(
&self.out_tx,
"leg_removed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id }),
);
// If fewer than 2 active legs remain, end the call.
let active_legs = call
.legs
.values()
.filter(|l| l.state != LegState::Terminated)
.count();
if active_legs <= 1 {
let duration = call.duration_secs();
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": "last_leg", "duration": duration }),
);
self.terminate_call(call_id).await;
}
true
}
/// Transfer a leg from one call to another.
/// The leg stays connected (same RTP socket, same SIP dialog) but moves
/// between mixers so it hears the new call's participants.
pub async fn transfer_leg(
&mut self,
source_call_id: &str,
leg_id: &str,
target_call_id: &str,
) -> bool {
// Validate both calls exist and the leg is in the source call.
if !self.calls.contains_key(source_call_id)
|| !self.calls.contains_key(target_call_id)
{
return false;
}
// Remove from source mixer (drops old channels → old I/O tasks exit).
let source_call = self.calls.get(source_call_id).unwrap();
source_call.remove_leg_from_mixer(leg_id).await;
// Take the LegInfo out of the source call.
let source_call = self.calls.get_mut(source_call_id).unwrap();
let leg_info = match source_call.legs.remove(leg_id) {
Some(l) => l,
None => return false,
};
// Update SIP index to point to the target call.
if let Some(sip_cid) = &leg_info.sip_call_id {
self.sip_index.insert(
sip_cid.clone(),
(target_call_id.to_string(), leg_id.to_string()),
);
}
// Create new channels and I/O tasks for the target mixer.
let channels = create_leg_channels();
if let Some(rtp_socket) = &leg_info.rtp_socket {
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
if let Some(remote_media) = leg_info.remote_media {
spawn_sip_outbound(rtp_socket.clone(), remote_media, channels.outbound_rx);
}
}
// Add to target mixer.
let target_call = self.calls.get(target_call_id).unwrap();
target_call
.add_leg_to_mixer(
leg_id,
leg_info.codec_pt,
channels.inbound_rx,
channels.outbound_tx,
)
.await;
// Insert leg into target call.
let target_call = self.calls.get_mut(target_call_id).unwrap();
target_call.legs.insert(leg_id.to_string(), leg_info);
emit_event(
&self.out_tx,
"leg_transferred",
serde_json::json!({
"leg_id": leg_id,
"source_call_id": source_call_id,
"target_call_id": target_call_id,
}),
);
// Check if source call has too few legs remaining.
let source_call = self.calls.get(source_call_id).unwrap();
let active_legs = source_call
.legs
.values()
.filter(|l| l.state != LegState::Terminated)
.count();
if active_legs <= 1 {
let duration = source_call.duration_secs();
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({
"call_id": source_call_id,
"reason": "leg_transferred",
"duration": duration,
}),
);
self.terminate_call(source_call_id).await;
}
true
}
/// Replace a leg: terminate the old leg, then dial a new one into the same call.
/// Returns the new leg ID on success.
pub async fn replace_leg(
&mut self,
call_id: &str,
old_leg_id: &str,
number: &str,
provider_config: &ProviderConfig,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
registered_aor: &str,
) -> Option<String> {
// Terminate the old leg.
self.remove_leg(call_id, old_leg_id, socket).await;
// If the call was terminated because it had too few legs, bail.
if !self.calls.contains_key(call_id) {
return None;
}
// Dial the replacement.
self.add_external_leg(
call_id,
number,
provider_config,
config,
rtp_pool,
socket,
public_ip,
registered_aor,
)
.await
}
// -----------------------------------------------------------------------
// Hangup + cleanup
// -----------------------------------------------------------------------
/// Hangup a call by internal call ID.
pub async fn hangup(&mut self, call_id: &str, socket: &UdpSocket) -> bool {
let call = match self.calls.get_mut(call_id) {
Some(c) => c,
None => return false,
};
if call.state == CallState::Terminated {
return false;
}
let duration = call.duration_secs();
// Send BYE to all SIP legs.
for leg in call.legs.values_mut() {
if leg.state == LegState::Terminated {
continue;
}
if let Some(sip_leg) = &mut leg.sip_leg {
if let Some(hangup_bytes) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
}
} else if let Some(addr) = leg.signaling_addr {
// Passthrough leg — send a simple BYE.
if let Some(sip_cid) = &leg.sip_call_id {
let bye = format!(
"BYE sip:hangup SIP/2.0\r\n\
Via: SIP/2.0/UDP 0.0.0.0:0;branch=z9hG4bK-hangup\r\n\
Call-ID: {sip_cid}\r\n\
CSeq: 99 BYE\r\n\
Max-Forwards: 70\r\n\
Content-Length: 0\r\n\r\n"
);
let _ = socket.send_to(bye.as_bytes(), addr).await;
}
}
leg.state = LegState::Terminated;
}
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": "hangup_command", "duration": duration }),
);
self.terminate_call(call_id).await;
true
}
/// Clean up a terminated call: shutdown mixer, remove from indexes.
async fn terminate_call(&mut self, call_id: &str) {
if let Some(mut call) = self.calls.remove(call_id) {
call.state = CallState::Terminated;
call.shutdown_mixer().await;
// Remove all SIP index entries for this call.
self.sip_index.retain(|_, (cid, _)| cid != call_id);
}
}
// -----------------------------------------------------------------------
// Voicemail
// -----------------------------------------------------------------------
async fn route_to_voicemail(
&mut self,
call_id: &str,
invite: &SipMessage,
from_addr: SocketAddr,
caller_number: &str,
provider_id: &str,
provider_config: &ProviderConfig,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
) -> Option<String> {
let lan_ip = &config.proxy.lan_ip;
let pub_ip = public_ip.unwrap_or(lan_ip.as_str());
let rtp_alloc = match rtp_pool.allocate().await {
Some(a) => a,
None => {
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
}
};
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
let sdp = sip_proto::helpers::build_sdp(&sip_proto::helpers::SdpOptions {
ip: pub_ip,
port: rtp_alloc.port,
payload_types: &provider_config.codecs,
..Default::default()
});
let response = SipMessage::create_response(
200, "OK", invite,
Some(sip_proto::message::ResponseOptions {
to_tag: Some(sip_proto::helpers::generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
..Default::default()
}),
);
let _ = socket.send_to(&response.serialize(), from_addr).await;
let provider_media = if invite.has_sdp_body() {
parse_sdp_endpoint(&invite.body)
.and_then(|ep| format!("{}:{}", ep.address, ep.port).parse().ok())
} else {
Some(from_addr)
};
let provider_media = provider_media.unwrap_or(from_addr);
// Create a minimal call for BYE routing.
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.to_string(), self.out_tx.clone());
let mut call = Call::new(
call_id.to_string(),
CallDirection::Inbound,
provider_id.to_string(),
mixer_cmd_tx,
mixer_task,
);
call.state = CallState::Voicemail;
call.caller_number = Some(caller_number.to_string());
let provider_leg_id = format!("{call_id}-prov");
call.legs.insert(
provider_leg_id.clone(),
LegInfo {
id: provider_leg_id.clone(),
kind: LegKind::SipProvider,
state: LegState::Connected,
codec_pt,
sip_leg: None,
sip_call_id: Some(invite.call_id().to_string()),
webrtc_session_id: None,
rtp_socket: Some(rtp_alloc.socket.clone()),
rtp_port: rtp_alloc.port,
remote_media: Some(provider_media),
signaling_addr: Some(from_addr),
metadata: HashMap::new(),
},
);
self.sip_index.insert(
invite.call_id().to_string(),
(call_id.to_string(), provider_leg_id),
);
self.calls.insert(call_id.to_string(), call);
// Build recording path.
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let recording_dir = "nogit/voicemail/default".to_string();
let recording_path = format!("{recording_dir}/msg-{timestamp}.wav");
let greeting_wav = find_greeting_wav();
let out_tx = self.out_tx.clone();
let call_id_owned = call_id.to_string();
let caller_owned = caller_number.to_string();
let rtp_socket = rtp_alloc.socket;
tokio::spawn(async move {
crate::voicemail::run_voicemail_session(
rtp_socket, provider_media, codec_pt,
greeting_wav, recording_path, 120_000,
call_id_owned, caller_owned, out_tx,
)
.await;
});
Some(call_id.to_string())
}
// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option<SocketAddr> {
for device in &config.devices {
if let Some(addr) = registrar.get_device_contact(&device.id) {
return Some(addr);
}
}
None
}
}
fn find_greeting_wav() -> Option<String> {
let candidates = [
".nogit/voicemail/default/greeting.wav",
".nogit/voicemail/greeting.wav",
];
for path in &candidates {
if std::path::Path::new(path).exists() {
return Some(path.to_string());
}
}
None
}