diff --git a/changelog.md b/changelog.md index 621c155..6c0cc2c 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-04-10 - 1.14.0 - feat(proxy-engine) +add multiparty call mixing with dynamic SIP and WebRTC leg management + +- replace passthrough call handling with a mixer-backed call model that tracks multiple legs and exposes leg status in call state output +- add mixer and leg I/O infrastructure to bridge SIP RTP and WebRTC audio through channel-based mix-minus processing +- introduce add_leg and remove_leg proxy commands and wire frontend bridge APIs to manage external call legs +- emit leg lifecycle events for observability and mark unimplemented device-leg and transfer HTTP endpoints with 501 responses + ## 2026-04-10 - 1.13.0 - feat(proxy-engine,webrtc) add B2BUA SIP leg handling and WebRTC call bridging for outbound calls diff --git a/nogit/voicemail/default/msg-1775825168199.wav b/nogit/voicemail/default/msg-1775825168199.wav new file mode 100644 index 0000000..7dc145a Binary files /dev/null and b/nogit/voicemail/default/msg-1775825168199.wav differ diff --git a/rust/crates/proxy-engine/src/call.rs b/rust/crates/proxy-engine/src/call.rs index 686bcdf..ba38c0c 100644 --- a/rust/crates/proxy-engine/src/call.rs +++ b/rust/crates/proxy-engine/src/call.rs @@ -1,12 +1,19 @@ -//! Call hub — owns legs and bridges media. +//! Call hub — owns N legs and a mixer task. //! -//! Each Call has a unique ID and tracks its state, direction, and associated -//! SIP Call-IDs for message routing. +//! Every call has a central mixer that provides mix-minus audio to all +//! participants. Legs can be added and removed dynamically mid-call. +use crate::mixer::{MixerCommand, RtpPacket}; +use crate::sip_leg::SipLeg; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; use tokio::net::UdpSocket; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +pub type LegId = String; /// Call state machine. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -15,8 +22,6 @@ pub enum CallState { Ringing, Connected, Voicemail, - Ivr, - Terminating, Terminated, } @@ -27,8 +32,6 @@ impl CallState { Self::Ringing => "ringing", Self::Connected => "connected", Self::Voicemail => "voicemail", - Self::Ivr => "ivr", - Self::Terminating => "terminating", Self::Terminated => "terminated", } } @@ -49,43 +52,172 @@ impl CallDirection { } } -/// A passthrough call — both sides share the same SIP Call-ID. -/// The proxy rewrites SDP/Contact/Request-URI and relays RTP. -pub struct PassthroughCall { +/// The type of a call leg. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LegKind { + SipProvider, + SipDevice, + WebRtc, + Media, // voicemail playback, IVR, recording +} + +impl LegKind { + pub fn as_str(&self) -> &'static str { + match self { + Self::SipProvider => "sip-provider", + Self::SipDevice => "sip-device", + Self::WebRtc => "webrtc", + Self::Media => "media", + } + } +} + +/// Per-leg state. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LegState { + Inviting, + Ringing, + Connected, + Terminated, +} + +impl LegState { + pub fn as_str(&self) -> &'static str { + match self { + Self::Inviting => "inviting", + Self::Ringing => "ringing", + Self::Connected => "connected", + Self::Terminated => "terminated", + } + } +} + +/// Information about a single leg in a call. +pub struct LegInfo { + pub id: LegId, + pub kind: LegKind, + pub state: LegState, + pub codec_pt: u8, + + /// For SIP legs: the SIP dialog manager (handles 407 auth, BYE, etc). + pub sip_leg: Option, + /// For SIP legs: the SIP Call-ID for message routing. + pub sip_call_id: Option, + /// For WebRTC legs: the session ID in WebRtcEngine. + pub webrtc_session_id: Option, + /// The RTP socket allocated for this leg. + pub rtp_socket: Option>, + /// The RTP port number. + pub rtp_port: u16, + /// The remote media endpoint (learned from SDP or address learning). + pub remote_media: Option, + /// SIP signaling address (provider or device). + pub signaling_addr: Option, +} + +/// A multiparty call with N legs and a central mixer. +pub struct Call { pub id: String, - pub sip_call_id: String, pub state: CallState, pub direction: CallDirection, pub created_at: Instant, - // Call metadata. + // Metadata. pub caller_number: Option, pub callee_number: Option, pub provider_id: String, - // Provider side. - pub provider_addr: SocketAddr, - pub provider_media: Option, + /// All legs in this call, keyed by leg ID. + pub legs: HashMap, - // Device side. - pub device_addr: SocketAddr, - pub device_media: Option, + /// Channel to send commands to the mixer task. + pub mixer_cmd_tx: mpsc::Sender, - // RTP relay. - pub rtp_port: u16, - pub rtp_socket: Arc, - - // Packet counters. - pub pkt_from_device: u64, - pub pkt_from_provider: u64, + /// Handle to the mixer task (aborted on call teardown). + mixer_task: Option>, } -impl PassthroughCall { +impl Call { + pub fn new( + id: String, + direction: CallDirection, + provider_id: String, + mixer_cmd_tx: mpsc::Sender, + mixer_task: JoinHandle<()>, + ) -> Self { + Self { + id, + state: CallState::SettingUp, + direction, + created_at: Instant::now(), + caller_number: None, + callee_number: None, + provider_id, + legs: HashMap::new(), + mixer_cmd_tx, + mixer_task: Some(mixer_task), + } + } + + /// Add a leg to the mixer. Sends the AddLeg command with channel endpoints. + pub async fn add_leg_to_mixer( + &self, + leg_id: &str, + codec_pt: u8, + inbound_rx: mpsc::Receiver, + outbound_tx: mpsc::Sender>, + ) { + let _ = self + .mixer_cmd_tx + .send(MixerCommand::AddLeg { + leg_id: leg_id.to_string(), + codec_pt, + inbound_rx, + outbound_tx, + }) + .await; + } + + /// Remove a leg from the mixer. + pub async fn remove_leg_from_mixer(&self, leg_id: &str) { + let _ = self + .mixer_cmd_tx + .send(MixerCommand::RemoveLeg { + leg_id: leg_id.to_string(), + }) + .await; + } + pub fn duration_secs(&self) -> u64 { self.created_at.elapsed().as_secs() } + /// Shut down the mixer and abort its task. + pub async fn shutdown_mixer(&mut self) { + let _ = self.mixer_cmd_tx.send(MixerCommand::Shutdown).await; + if let Some(handle) = self.mixer_task.take() { + handle.abort(); + } + } + + /// Produce a JSON status snapshot for the dashboard. pub fn to_status_json(&self) -> serde_json::Value { + let legs: Vec = self + .legs + .values() + .filter(|l| l.state != LegState::Terminated) + .map(|l| { + serde_json::json!({ + "id": l.id, + "type": l.kind.as_str(), + "state": l.state.as_str(), + "codec": sip_proto::helpers::codec_name(l.codec_pt), + "rtpPort": l.rtp_port, + "remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())), + }) + }) + .collect(); + serde_json::json!({ "id": self.id, "state": self.state.as_str(), @@ -93,11 +225,8 @@ impl PassthroughCall { "callerNumber": self.caller_number, "calleeNumber": self.callee_number, "providerUsed": self.provider_id, - "createdAt": self.created_at.elapsed().as_millis(), "duration": self.duration_secs(), - "rtpPort": self.rtp_port, - "pktFromDevice": self.pkt_from_device, - "pktFromProvider": self.pkt_from_provider, + "legs": legs, }) } } diff --git a/rust/crates/proxy-engine/src/call_manager.rs b/rust/crates/proxy-engine/src/call_manager.rs index 906978a..a93b5b6 100644 --- a/rust/crates/proxy-engine/src/call_manager.rs +++ b/rust/crates/proxy-engine/src/call_manager.rs @@ -1,47 +1,31 @@ //! Call manager — central registry and orchestration for all calls. //! -//! Handles: -//! - Inbound passthrough calls (provider → proxy → device) -//! - Outbound passthrough calls (device → proxy → provider) -//! - SIP message routing by Call-ID -//! - BYE/CANCEL handling -//! - RTP relay setup -//! -//! Ported from ts/call/call-manager.ts (passthrough mode). +//! Unified model: every call owns N legs and a mixer task. +//! Legs can be SIP (provider/device), WebRTC (browser), or Media (voicemail/IVR). +//! The mixer provides mix-minus audio to all participants. -use crate::call::{CallDirection, CallState, PassthroughCall}; +use crate::call::{Call, CallDirection, CallState, LegId, LegInfo, LegKind, LegState}; use crate::config::{AppConfig, ProviderConfig}; -use crate::dtmf::DtmfDetector; use crate::ipc::{emit_event, OutTx}; +use crate::leg_io::{create_leg_channels, spawn_sip_inbound, spawn_sip_outbound}; +use crate::mixer::spawn_mixer; use crate::registrar::Registrar; use crate::rtp::RtpPortPool; -use crate::sip_leg::{LegState, SipLeg, SipLegAction, SipLegConfig}; -use sip_proto::helpers::parse_sdp_endpoint; +use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig}; +use sip_proto::helpers::{generate_call_id, parse_sdp_endpoint}; use sip_proto::message::SipMessage; use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; -use std::time::Instant; use tokio::net::UdpSocket; -/// A B2BUA call with a SipLeg for the provider side. -/// The other side is either a WebRTC session or another SipLeg. -pub struct B2buaCall { - pub id: String, - pub provider_leg: SipLeg, - pub webrtc_session_id: Option, - pub number: String, - pub created_at: std::time::Instant, - /// RTP socket allocated for the provider leg (used for WebRTC audio bridging). - pub rtp_socket: Option>, -} - pub struct CallManager { - /// Active passthrough calls, keyed by SIP Call-ID. - calls: HashMap, - /// Active B2BUA calls, keyed by SIP Call-ID of the provider leg. - b2bua_calls: HashMap, + /// All active calls, keyed by internal call ID. + pub calls: HashMap, + /// Index: SIP Call-ID → (internal call_id, leg_id). + /// Each SIP leg in a call has its own SIP Call-ID. + sip_index: HashMap, /// Call ID counter. next_call_num: u64, /// Output channel for events. @@ -52,13 +36,12 @@ impl CallManager { pub fn new(out_tx: OutTx) -> Self { Self { calls: HashMap::new(), - b2bua_calls: HashMap::new(), + sip_index: HashMap::new(), next_call_num: 0, out_tx, } } - /// Generate a unique call ID. fn next_call_id(&mut self) -> String { let id = format!( "call-{}-{}", @@ -72,55 +55,237 @@ impl CallManager { id } - /// Try to route a SIP message to an existing call. - /// Returns true if handled. + fn next_leg_id(&mut self) -> String { + self.next_call_num += 1; + format!("leg-{}", self.next_call_num) + } + + /// Check if a SIP Call-ID belongs to any active call. + pub fn has_call(&self, sip_call_id: &str) -> bool { + self.sip_index.contains_key(sip_call_id) + } + + /// Get an RTP socket for a call's provider leg (used by webrtc_link). + pub fn get_call_provider_rtp_socket(&self, call_id: &str) -> Option> { + let call = self.calls.get(call_id)?; + for leg in call.legs.values() { + if leg.kind == LegKind::SipProvider { + return leg.rtp_socket.clone(); + } + } + None + } + + /// Get all active call statuses for the dashboard. + pub fn get_all_statuses(&self) -> Vec { + self.calls + .values() + .filter(|c| c.state != CallState::Terminated) + .map(|c| c.to_status_json()) + .collect() + } + + // ----------------------------------------------------------------------- + // SIP message routing + // ----------------------------------------------------------------------- + + /// Route a SIP message to the correct call and leg. + /// Returns true if the message was handled. pub async fn route_sip_message( &mut self, msg: &SipMessage, from_addr: SocketAddr, socket: &UdpSocket, config: &AppConfig, - _registrar: &Registrar, ) -> bool { let sip_call_id = msg.call_id().to_string(); - // Check B2BUA calls first (provider legs with dialog management). - if self.b2bua_calls.contains_key(&sip_call_id) { - return self.route_b2bua_message(&sip_call_id, msg, from_addr, socket).await; + let (call_id, leg_id) = match self.sip_index.get(&sip_call_id) { + Some((cid, lid)) => (cid.clone(), lid.clone()), + None => return false, + }; + + // Check if this is a B2BUA leg (has a SipLeg with dialog management). + let is_b2bua_leg = self + .calls + .get(&call_id) + .and_then(|c| c.legs.get(&leg_id)) + .map(|l| l.sip_leg.is_some()) + .unwrap_or(false); + + if is_b2bua_leg { + return self + .route_b2bua_message(&call_id, &leg_id, msg, from_addr, socket) + .await; } - // Check passthrough calls. - if !self.calls.contains_key(&sip_call_id) { - return false; + // Passthrough-style routing for inbound/outbound device↔provider calls. + self.route_passthrough_message(&call_id, &leg_id, msg, from_addr, socket, config) + .await + } + + /// Route a message to a B2BUA leg (has SipLeg dialog management). + async fn route_b2bua_message( + &mut self, + call_id: &str, + leg_id: &str, + msg: &SipMessage, + from_addr: SocketAddr, + socket: &UdpSocket, + ) -> bool { + // Process the SipLeg action first, extracting all needed data. + let (action, target, codecs, rtp_socket_clone) = { + let call = match self.calls.get_mut(call_id) { + Some(c) => c, + None => return false, + }; + let leg = match call.legs.get_mut(leg_id) { + Some(l) => l, + None => return false, + }; + let sip_leg = match &mut leg.sip_leg { + Some(sl) => sl, + None => return false, + }; + let action = sip_leg.handle_message(msg); + let target = sip_leg.config.sip_target; + let codecs = sip_leg.config.codecs.clone(); + let rtp_socket_clone = leg.rtp_socket.clone(); + (action, target, codecs, rtp_socket_clone) + }; + // Mutable borrow on call/leg is now released. + + let sip_pt = codecs.first().copied().unwrap_or(9); + + match action { + SipLegAction::None => {} + SipLegAction::Send(buf) => { + let _ = socket.send_to(&buf, target).await; + } + SipLegAction::StateChange(crate::sip_leg::LegState::Ringing) => { + if let Some(call) = self.calls.get_mut(call_id) { + if let Some(leg) = call.legs.get_mut(leg_id) { + leg.state = LegState::Ringing; + } + } + emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); + emit_event(&self.out_tx, "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" })); + } + SipLegAction::ConnectedWithAck(ack_buf) => { + let _ = socket.send_to(&ack_buf, target).await; + + // Update leg state and get remote media. + let remote = { + let call = self.calls.get_mut(call_id).unwrap(); + let leg = call.legs.get_mut(leg_id).unwrap(); + let sip_leg = leg.sip_leg.as_ref().unwrap(); + let remote = sip_leg.remote_media; + leg.state = LegState::Connected; + leg.remote_media = remote; + call.state = CallState::Connected; + remote + }; + + // Wire the leg to the mixer if remote media is known. + if let (Some(remote_addr), Some(rtp_socket)) = (remote, rtp_socket_clone) { + let channels = create_leg_channels(); + spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx); + spawn_sip_outbound(rtp_socket, remote_addr, channels.outbound_rx); + if let Some(call) = self.calls.get(call_id) { + call.add_leg_to_mixer(leg_id, sip_pt, channels.inbound_rx, channels.outbound_tx) + .await; + } + } + + emit_event(&self.out_tx, "call_answered", serde_json::json!({ + "call_id": call_id, + "provider_media_addr": remote.map(|a| a.ip().to_string()), + "provider_media_port": remote.map(|a| a.port()), + "sip_pt": sip_pt, + })); + emit_event(&self.out_tx, "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" })); + } + SipLegAction::Terminated(reason) => { + let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0); + if let Some(call) = self.calls.get_mut(call_id) { + if let Some(leg) = call.legs.get_mut(leg_id) { + leg.state = LegState::Terminated; + } + } + emit_event(&self.out_tx, "call_ended", + serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration })); + self.terminate_call(call_id).await; + return true; + } + SipLegAction::SendAndTerminate(buf, reason) => { + let _ = socket.send_to(&buf, from_addr).await; + let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0); + emit_event(&self.out_tx, "call_ended", + serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration })); + self.terminate_call(call_id).await; + return true; + } + SipLegAction::AuthRetry { ack_407, invite_with_auth } => { + if let Some(ack) = ack_407 { + let _ = socket.send_to(&ack, target).await; + } + let _ = socket.send_to(&invite_with_auth, target).await; + } + _ => {} } - // Extract needed data from the call to avoid borrow conflicts. - let (call_id, provider_addr, device_addr, rtp_port, from_provider) = { - let call = self.calls.get(&sip_call_id).unwrap(); - let from_provider = from_addr.ip().to_string() == call.provider_addr.ip().to_string(); - ( - call.id.clone(), - call.provider_addr, - call.device_addr, - call.rtp_port, - from_provider, - ) + true + } + + /// Route a passthrough-style message (inbound/outbound device↔provider). + /// In the new model, both sides still go through the mixer, but SIP signaling + /// is forwarded between the two endpoints with SDP rewriting. + async fn route_passthrough_message( + &mut self, + call_id: &str, + this_leg_id: &str, + msg: &SipMessage, + from_addr: SocketAddr, + socket: &UdpSocket, + config: &AppConfig, + ) -> bool { + let call = match self.calls.get_mut(call_id) { + Some(c) => c, + None => return false, + }; + + // Find the "other" leg — the one we forward to. + let this_leg = call.legs.get(this_leg_id); + let this_kind = this_leg.map(|l| l.kind).unwrap_or(LegKind::SipProvider); + + // Find the counterpart leg. + let other_leg = call.legs.values().find(|l| l.id != this_leg_id && l.state != LegState::Terminated); + let (other_addr, other_rtp_port, other_leg_id) = match other_leg { + Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone()), + None => return false, + }; + let forward_to = match other_addr { + Some(a) => a, + None => return false, }; let lan_ip = config.proxy.lan_ip.clone(); let lan_port = config.proxy.lan_port; + // Get this leg's RTP port (for SDP rewriting — tell the other side to send RTP here). + let this_rtp_port = call.legs.get(this_leg_id).map(|l| l.rtp_port).unwrap_or(0); + if msg.is_request() { let method = msg.method().unwrap_or(""); - let forward_to = if from_provider { device_addr } else { provider_addr }; - // Handle BYE. if method == "BYE" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; let _ = socket.send_to(&msg.serialize(), forward_to).await; - let duration = self.calls.get(&sip_call_id).unwrap().duration_secs(); + let duration = call.duration_secs(); emit_event( &self.out_tx, "call_ended", @@ -128,54 +293,48 @@ impl CallManager { "call_id": call_id, "reason": "bye", "duration": duration, - "from_side": if from_provider { "provider" } else { "device" }, }), ); - self.calls.get_mut(&sip_call_id).unwrap().state = CallState::Terminated; + self.terminate_call(call_id).await; return true; } - // Handle CANCEL. if method == "CANCEL" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; let _ = socket.send_to(&msg.serialize(), forward_to).await; - let duration = self.calls.get(&sip_call_id).unwrap().duration_secs(); + let duration = call.duration_secs(); emit_event( &self.out_tx, "call_ended", - serde_json::json!({ - "call_id": call_id, "reason": "cancel", "duration": duration, - }), + serde_json::json!({ "call_id": call_id, "reason": "cancel", "duration": duration }), ); - self.calls.get_mut(&sip_call_id).unwrap().state = CallState::Terminated; + self.terminate_call(call_id).await; return true; } - // Handle INFO (DTMF relay). if method == "INFO" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; - - // Detect DTMF from INFO body. - if let Some(ct) = msg.get_header("Content-Type") { - let mut detector = DtmfDetector::new(call_id.clone(), self.out_tx.clone()); - detector.process_sip_info(ct, &msg.body); - } return true; } // Forward other requests with SDP rewriting. let mut fwd = msg.clone(); - if from_provider { - rewrite_sdp_for_device(&mut fwd, &lan_ip, rtp_port); + // Rewrite SDP to point the other side to this leg's RTP port + // (so we receive their audio on our socket). + if fwd.has_sdp_body() { + let (new_body, _) = rewrite_sdp(&fwd.body, &lan_ip, other_rtp_port); + fwd.body = new_body; + fwd.update_content_length(); + } + if this_kind == LegKind::SipProvider { + // From provider → forward to device: rewrite request URI. if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) { - let new_ruri = rewrite_sip_uri(&ruri, &device_addr.ip().to_string(), device_addr.port()); + let new_ruri = rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port()); fwd.set_request_uri(&new_ruri); } - } else { - rewrite_sdp_for_provider(&mut fwd, &lan_ip, rtp_port); } if fwd.is_dialog_establishing() { fwd.prepend_header("Record-Route", &format!("")); @@ -188,42 +347,67 @@ impl CallManager { if msg.is_response() { let code = msg.status_code().unwrap_or(0); let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase(); - let forward_to = if from_provider { device_addr } else { provider_addr }; let mut fwd = msg.clone(); - if from_provider { - rewrite_sdp_for_device(&mut fwd, &lan_ip, rtp_port); - } else { - rewrite_sdp_for_provider(&mut fwd, &lan_ip, rtp_port); - if let Some(contact) = fwd.get_header("Contact").map(|s| s.to_string()) { - let new_contact = rewrite_sip_uri(&contact, &lan_ip, lan_port); - if new_contact != contact { - fwd.set_header("Contact", &new_contact); - } - } + // Rewrite SDP so the forward-to side sends RTP to the correct leg port. + if fwd.has_sdp_body() { + let rewrite_ip = if this_kind == LegKind::SipDevice { + // Response from device → send to provider: use LAN/public IP. + &lan_ip + } else { + &lan_ip + }; + let (new_body, _) = rewrite_sdp(&fwd.body, rewrite_ip, other_rtp_port); + fwd.body = new_body; + fwd.update_content_length(); } - // State transitions. + // State transitions on INVITE responses. if cseq_method == "INVITE" { - let call = self.calls.get_mut(&sip_call_id).unwrap(); - if (code == 180 || code == 183) && call.state == CallState::SettingUp { - call.state = CallState::Ringing; - emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); + if code == 180 || code == 183 { + if call.state == CallState::SettingUp { + call.state = CallState::Ringing; + emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); + } + if let Some(leg) = call.legs.get_mut(this_leg_id) { + leg.state = LegState::Ringing; + } } else if code >= 200 && code < 300 { - call.state = CallState::Connected; - emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id })); + let mut needs_wiring = false; + if let Some(leg) = call.legs.get_mut(this_leg_id) { + leg.state = LegState::Connected; + // Learn remote media from SDP. + if msg.has_sdp_body() { + if let Some(ep) = parse_sdp_endpoint(&msg.body) { + if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { + leg.remote_media = Some(addr); + } + } + } + needs_wiring = true; + } + + if call.state != CallState::Connected { + call.state = CallState::Connected; + emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id })); + } + + // Forward the response before wiring (drop call borrow). + let _ = socket.send_to(&fwd.serialize(), forward_to).await; + + // Wire legs to mixer (needs &mut self, so call borrow must be released). + if needs_wiring { + self.maybe_wire_passthrough_legs(call_id).await; + } + return true; } else if code >= 300 { let duration = call.duration_secs(); - call.state = CallState::Terminated; emit_event( &self.out_tx, "call_ended", - serde_json::json!({ - "call_id": call_id, - "reason": format!("rejected_{code}"), - "duration": duration, - }), + serde_json::json!({ "call_id": call_id, "reason": format!("rejected_{code}"), "duration": duration }), ); + // Don't terminate yet — let the forward happen first. } } @@ -234,7 +418,47 @@ impl CallManager { false } - /// Create an inbound passthrough call (provider → device). + /// Wire passthrough legs to the mixer once both have remote media addresses. + async fn maybe_wire_passthrough_legs(&mut self, call_id: &str) { + let call = match self.calls.get(call_id) { + Some(c) => c, + None => return, + }; + + // Collect legs that need wiring (have remote_media + rtp_socket but aren't yet in mixer). + let mut to_wire: Vec<(String, u8, Arc, SocketAddr)> = Vec::new(); + for leg in call.legs.values() { + if leg.state == LegState::Connected || leg.state == LegState::Ringing { + if let (Some(rtp_socket), Some(remote)) = (&leg.rtp_socket, leg.remote_media) { + to_wire.push((leg.id.clone(), leg.codec_pt, rtp_socket.clone(), remote)); + } + } + } + + // Only wire if we have at least 2 legs ready. + if to_wire.len() < 2 { + return; + } + + let call = match self.calls.get(call_id) { + Some(c) => c, + None => return, + }; + + for (leg_id, codec_pt, rtp_socket, remote) in to_wire { + let channels = create_leg_channels(); + spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx); + spawn_sip_outbound(rtp_socket, remote, channels.outbound_rx); + call.add_leg_to_mixer(&leg_id, codec_pt, channels.inbound_rx, channels.outbound_tx) + .await; + } + } + + // ----------------------------------------------------------------------- + // Call creation + // ----------------------------------------------------------------------- + + /// Create an inbound call (provider → device). pub async fn create_inbound_call( &mut self, invite: &SipMessage, @@ -250,6 +474,7 @@ impl CallManager { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; + let sip_call_id = invite.call_id().to_string(); // Extract caller/callee info. let from_header = invite.get_header("From").unwrap_or(""); @@ -262,30 +487,30 @@ impl CallManager { .unwrap_or("") .to_string(); - // Resolve target device (first registered device). + // Resolve target device. let device_addr = match self.resolve_first_device(config, registrar) { Some(addr) => addr, None => { - // No device registered — route to voicemail. + // No device registered → voicemail. return self .route_to_voicemail( - &call_id, - invite, - from_addr, - &caller_number, - provider_id, - provider_config, - config, - rtp_pool, - socket, - public_ip, + &call_id, invite, from_addr, &caller_number, + provider_id, provider_config, config, rtp_pool, socket, public_ip, ) .await; } }; - // Allocate RTP port. - let rtp_alloc = match rtp_pool.allocate().await { + // Allocate RTP ports for both legs. + let provider_rtp = match rtp_pool.allocate().await { + Some(a) => a, + None => { + let resp = SipMessage::create_response(503, "Service Unavailable", invite, None); + let _ = socket.send_to(&resp.serialize(), from_addr).await; + return None; + } + }; + let device_rtp = match rtp_pool.allocate().await { Some(a) => a, None => { let resp = SipMessage::create_response(503, "Service Unavailable", invite, None); @@ -294,42 +519,73 @@ impl CallManager { } }; - // Create the call. - let mut call = PassthroughCall { - id: call_id.clone(), - sip_call_id: invite.call_id().to_string(), - state: CallState::Ringing, - direction: CallDirection::Inbound, - created_at: Instant::now(), - caller_number: Some(caller_number), - callee_number: Some(called_number), - provider_id: provider_id.to_string(), - provider_addr: from_addr, - provider_media: None, - device_addr, - device_media: None, - rtp_port: rtp_alloc.port, - rtp_socket: rtp_alloc.socket.clone(), - pkt_from_device: 0, - pkt_from_provider: 0, - }; + // Create the call with a mixer. + let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone()); + let mut call = Call::new( + call_id.clone(), + CallDirection::Inbound, + provider_id.to_string(), + mixer_cmd_tx, + mixer_task, + ); + call.caller_number = Some(caller_number); + call.callee_number = Some(called_number); + call.state = CallState::Ringing; - // Extract provider media from SDP. + let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); + + // Provider leg — extract media from SDP. + let mut provider_media: Option = None; if invite.has_sdp_body() { if let Some(ep) = parse_sdp_endpoint(&invite.body) { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { - call.provider_media = Some(addr); + provider_media = Some(addr); } } } - // Start RTP relay. - let rtp_socket = rtp_alloc.socket.clone(); - let device_addr_for_relay = device_addr; - let provider_addr_for_relay = from_addr; - tokio::spawn(async move { - rtp_relay_loop(rtp_socket, device_addr_for_relay, provider_addr_for_relay).await; - }); + let provider_leg_id = format!("{call_id}-prov"); + call.legs.insert( + provider_leg_id.clone(), + LegInfo { + id: provider_leg_id.clone(), + kind: LegKind::SipProvider, + state: LegState::Connected, // Provider already connected (sent us the INVITE). + codec_pt, + sip_leg: None, + sip_call_id: Some(sip_call_id.clone()), + webrtc_session_id: None, + rtp_socket: Some(provider_rtp.socket.clone()), + rtp_port: provider_rtp.port, + remote_media: provider_media, + signaling_addr: Some(from_addr), + }, + ); + + // Device leg. + let device_leg_id = format!("{call_id}-dev"); + call.legs.insert( + device_leg_id.clone(), + LegInfo { + id: device_leg_id.clone(), + kind: LegKind::SipDevice, + state: LegState::Inviting, + codec_pt, + sip_leg: None, + sip_call_id: Some(sip_call_id.clone()), // Same SIP Call-ID for passthrough. + webrtc_session_id: None, + rtp_socket: Some(device_rtp.socket.clone()), + rtp_port: device_rtp.port, + remote_media: None, // Learned from device's 200 OK. + signaling_addr: Some(device_addr), + }, + ); + + // Register SIP Call-ID → both legs (provider leg handles provider messages). + // For passthrough, both legs share the same SIP Call-ID. + // We route based on source address in route_passthrough_message. + self.sip_index + .insert(sip_call_id.clone(), (call_id.clone(), provider_leg_id.clone())); // Rewrite and forward INVITE to device. let mut fwd_invite = invite.clone(); @@ -340,264 +596,23 @@ impl CallManager { )); fwd_invite.prepend_header("Record-Route", &format!("")); + // Rewrite SDP: tell the device to send RTP to the device leg's port. if fwd_invite.has_sdp_body() { - let (new_body, original) = rewrite_sdp(&fwd_invite.body, lan_ip, rtp_alloc.port); + let (new_body, _) = rewrite_sdp(&fwd_invite.body, lan_ip, device_rtp.port); fwd_invite.body = new_body; fwd_invite.update_content_length(); - if let Some(ep) = original { - if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { - call.provider_media = Some(addr); - } - } } let _ = socket.send_to(&fwd_invite.serialize(), device_addr).await; // Store the call. - self.calls.insert(call.sip_call_id.clone(), call); + self.calls.insert(call_id.clone(), call); Some(call_id) } - /// Create an outbound passthrough call (device → provider). - pub async fn create_outbound_passthrough( - &mut self, - invite: &SipMessage, - from_addr: SocketAddr, - provider_config: &ProviderConfig, - config: &AppConfig, - rtp_pool: &mut RtpPortPool, - socket: &UdpSocket, - public_ip: Option<&str>, - ) -> Option { - let call_id = self.next_call_id(); - let lan_ip = &config.proxy.lan_ip; - let lan_port = config.proxy.lan_port; - let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); - - let callee = invite.request_uri().unwrap_or("").to_string(); - - // Allocate RTP port. - let rtp_alloc = match rtp_pool.allocate().await { - Some(a) => a, - None => return None, - }; - - let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() { - Some(a) => a, - None => return None, - }; - - let mut call = PassthroughCall { - id: call_id.clone(), - sip_call_id: invite.call_id().to_string(), - state: CallState::SettingUp, - direction: CallDirection::Outbound, - created_at: Instant::now(), - caller_number: None, - callee_number: Some(callee), - provider_id: provider_config.id.clone(), - provider_addr: provider_dest, - provider_media: None, - device_addr: from_addr, - device_media: None, - rtp_port: rtp_alloc.port, - rtp_socket: rtp_alloc.socket.clone(), - pkt_from_device: 0, - pkt_from_provider: 0, - }; - - // Start RTP relay. - let rtp_socket = rtp_alloc.socket.clone(); - let device_addr_for_relay = from_addr; - let provider_addr_for_relay = provider_dest; - tokio::spawn(async move { - rtp_relay_loop(rtp_socket, device_addr_for_relay, provider_addr_for_relay).await; - }); - - // Rewrite and forward INVITE to provider. - let mut fwd_invite = invite.clone(); - fwd_invite.prepend_header("Record-Route", &format!("")); - - // Rewrite Contact to public IP. - if let Some(contact) = fwd_invite.get_header("Contact").map(|s| s.to_string()) { - let new_contact = rewrite_sip_uri(&contact, pub_ip, lan_port); - if new_contact != contact { - fwd_invite.set_header("Contact", &new_contact); - } - } - - // Rewrite SDP. - if fwd_invite.has_sdp_body() { - let (new_body, original) = rewrite_sdp(&fwd_invite.body, pub_ip, rtp_alloc.port); - fwd_invite.body = new_body; - fwd_invite.update_content_length(); - if let Some(ep) = original { - if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { - call.device_media = Some(addr); - } - } - } - - let _ = socket.send_to(&fwd_invite.serialize(), provider_dest).await; - - self.calls.insert(call.sip_call_id.clone(), call); - Some(call_id) - } - - /// Hangup a call by call ID (from TypeScript command). - pub async fn hangup(&mut self, call_id: &str, socket: &UdpSocket) -> bool { - // Find the call by our internal call ID. - let sip_call_id = self - .calls - .iter() - .find(|(_, c)| c.id == call_id) - .map(|(k, _)| k.clone()); - - let sip_call_id = match sip_call_id { - Some(id) => id, - None => return false, - }; - - let call = match self.calls.get_mut(&sip_call_id) { - Some(c) => c, - None => return false, - }; - - if call.state == CallState::Terminated { - return false; - } - - // Build and send BYE to both sides. - // For passthrough, we build a simple BYE using the SIP Call-ID. - let bye_msg = format!( - "BYE sip:hangup SIP/2.0\r\n\ - Via: SIP/2.0/UDP 0.0.0.0:0;branch=z9hG4bK-hangup\r\n\ - Call-ID: {}\r\n\ - CSeq: 99 BYE\r\n\ - Max-Forwards: 70\r\n\ - Content-Length: 0\r\n\r\n", - sip_call_id - ); - let bye_bytes = bye_msg.as_bytes(); - - let _ = socket.send_to(bye_bytes, call.provider_addr).await; - let _ = socket.send_to(bye_bytes, call.device_addr).await; - - call.state = CallState::Terminated; - - emit_event( - &self.out_tx, - "call_ended", - serde_json::json!({ - "call_id": call.id, - "reason": "hangup_command", - "duration": call.duration_secs(), - }), - ); - - true - } - - /// Get all active call statuses. - pub fn get_all_statuses(&self) -> Vec { - self.calls - .values() - .filter(|c| c.state != CallState::Terminated) - .map(|c| c.to_status_json()) - .collect() - } - - /// Clean up terminated calls. - pub fn cleanup_terminated(&mut self) { - self.calls.retain(|_, c| c.state != CallState::Terminated); - } - - /// Check if a SIP Call-ID belongs to any active call. - pub fn has_call(&self, sip_call_id: &str) -> bool { - self.calls.contains_key(sip_call_id) || self.b2bua_calls.contains_key(sip_call_id) - } - - /// Get the RTP socket for a B2BUA call (by our internal call ID). - /// Used by webrtc_link to set up the audio bridge. - pub fn get_b2bua_rtp_socket(&self, call_id: &str) -> Option> { - for b2bua in self.b2bua_calls.values() { - if b2bua.id == call_id { - return b2bua.rtp_socket.clone(); - } - } - None - } - - // --- B2BUA outbound call --- - - /// Route a SIP message to a B2BUA call's provider leg. - async fn route_b2bua_message( - &mut self, - sip_call_id: &str, - msg: &SipMessage, - from_addr: SocketAddr, - socket: &UdpSocket, - ) -> bool { - let b2bua = match self.b2bua_calls.get_mut(sip_call_id) { - Some(c) => c, - None => return false, - }; - - let call_id = b2bua.id.clone(); - let action = b2bua.provider_leg.handle_message(msg); - - match action { - SipLegAction::None => {} - SipLegAction::Send(buf) => { - let _ = socket.send_to(&buf, b2bua.provider_leg.config.sip_target).await; - } - SipLegAction::StateChange(LegState::Ringing) => { - emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); - } - SipLegAction::ConnectedWithAck(ack_buf) => { - let _ = socket.send_to(&ack_buf, b2bua.provider_leg.config.sip_target).await; - let remote = b2bua.provider_leg.remote_media; - let sip_pt = b2bua.provider_leg.config.codecs.first().copied().unwrap_or(9); - emit_event(&self.out_tx, "call_answered", serde_json::json!({ - "call_id": call_id, - "provider_media_addr": remote.map(|a| a.ip().to_string()), - "provider_media_port": remote.map(|a| a.port()), - "sip_pt": sip_pt, - })); - } - SipLegAction::Terminated(reason) => { - let duration = b2bua.created_at.elapsed().as_secs(); - emit_event(&self.out_tx, "call_ended", serde_json::json!({ - "call_id": call_id, "reason": reason, "duration": duration, - })); - self.b2bua_calls.remove(sip_call_id); - return true; - } - SipLegAction::SendAndTerminate(buf, reason) => { - let _ = socket.send_to(&buf, from_addr).await; - let duration = b2bua.created_at.elapsed().as_secs(); - emit_event(&self.out_tx, "call_ended", serde_json::json!({ - "call_id": call_id, "reason": reason, "duration": duration, - })); - self.b2bua_calls.remove(sip_call_id); - return true; - } - SipLegAction::AuthRetry { ack_407, invite_with_auth } => { - let target = b2bua.provider_leg.config.sip_target; - if let Some(ack) = ack_407 { - let _ = socket.send_to(&ack, target).await; - } - let _ = socket.send_to(&invite_with_auth, target).await; - } - _ => {} - } - - true - } - - /// Initiate an outbound call from the dashboard using B2BUA mode. - /// Creates a SipLeg for the provider side with proper dialog + auth handling. + /// Initiate an outbound B2BUA call from the dashboard. + /// Creates a Call with a single SipLeg (provider). WebRTC leg added later via webrtc_link. pub async fn make_outbound_call( &mut self, number: &str, @@ -617,16 +632,14 @@ impl CallManager { None => return None, }; - // Allocate RTP port for the provider leg. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => return None, }; - // Build the SIP Call-ID for the provider dialog. - let sip_call_id = sip_proto::helpers::generate_call_id(None); + let sip_call_id = generate_call_id(None); - // Create a SipLeg with provider credentials for auth handling. + // Create SipLeg for provider. let leg_config = SipLegConfig { lan_ip: lan_ip.clone(), lan_port, @@ -639,29 +652,372 @@ impl CallManager { rtp_port: rtp_alloc.port, }; - let mut leg = SipLeg::new(format!("{call_id}-prov"), leg_config); + let leg_id = format!("{call_id}-prov"); + let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config); - // Send the INVITE. + // Send INVITE. let to_uri = format!("sip:{number}@{}", provider_config.domain); - leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await; + sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await; - // Store as B2BUA call. - let b2bua = B2buaCall { - id: call_id.clone(), - provider_leg: leg, - webrtc_session_id: None, - number: number.to_string(), - created_at: std::time::Instant::now(), - rtp_socket: Some(rtp_alloc.socket.clone()), - }; - self.b2bua_calls.insert(sip_call_id, b2bua); + // Create call with mixer. + let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone()); + let mut call = Call::new( + call_id.clone(), + CallDirection::Outbound, + provider_config.id.clone(), + mixer_cmd_tx, + mixer_task, + ); + call.callee_number = Some(number.to_string()); + let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); + + call.legs.insert( + leg_id.clone(), + LegInfo { + id: leg_id.clone(), + kind: LegKind::SipProvider, + state: LegState::Inviting, + codec_pt, + sip_leg: Some(sip_leg), + sip_call_id: Some(sip_call_id.clone()), + webrtc_session_id: None, + rtp_socket: Some(rtp_alloc.socket.clone()), + rtp_port: rtp_alloc.port, + remote_media: None, + signaling_addr: Some(provider_dest), + }, + ); + + // Register for SIP routing. + self.sip_index + .insert(sip_call_id, (call_id.clone(), leg_id)); + + self.calls.insert(call_id.clone(), call); Some(call_id) } - // --- Voicemail --- + /// Create an outbound passthrough call (device → provider). + pub async fn create_outbound_passthrough( + &mut self, + invite: &SipMessage, + from_addr: SocketAddr, + provider_config: &ProviderConfig, + config: &AppConfig, + rtp_pool: &mut RtpPortPool, + socket: &UdpSocket, + public_ip: Option<&str>, + ) -> Option { + let call_id = self.next_call_id(); + let lan_ip = &config.proxy.lan_ip; + let lan_port = config.proxy.lan_port; + let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); + let sip_call_id = invite.call_id().to_string(); + let callee = invite.request_uri().unwrap_or("").to_string(); + + let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() { + Some(a) => a, + None => return None, + }; + + // Allocate RTP ports for both legs. + let device_rtp = match rtp_pool.allocate().await { + Some(a) => a, + None => return None, + }; + let provider_rtp = match rtp_pool.allocate().await { + Some(a) => a, + None => return None, + }; + + let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); + + // Create call with mixer. + let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone()); + let mut call = Call::new( + call_id.clone(), + CallDirection::Outbound, + provider_config.id.clone(), + mixer_cmd_tx, + mixer_task, + ); + call.callee_number = Some(callee); + + // Device leg. + let device_leg_id = format!("{call_id}-dev"); + let mut device_media: Option = None; + if invite.has_sdp_body() { + if let Some(ep) = parse_sdp_endpoint(&invite.body) { + if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { + device_media = Some(addr); + } + } + } + + call.legs.insert( + device_leg_id.clone(), + LegInfo { + id: device_leg_id.clone(), + kind: LegKind::SipDevice, + state: LegState::Connected, + codec_pt, + sip_leg: None, + sip_call_id: Some(sip_call_id.clone()), + webrtc_session_id: None, + rtp_socket: Some(device_rtp.socket.clone()), + rtp_port: device_rtp.port, + remote_media: device_media, + signaling_addr: Some(from_addr), + }, + ); + + // Provider leg. + let provider_leg_id = format!("{call_id}-prov"); + call.legs.insert( + provider_leg_id.clone(), + LegInfo { + id: provider_leg_id.clone(), + kind: LegKind::SipProvider, + state: LegState::Inviting, + codec_pt, + sip_leg: None, + sip_call_id: Some(sip_call_id.clone()), + webrtc_session_id: None, + rtp_socket: Some(provider_rtp.socket.clone()), + rtp_port: provider_rtp.port, + remote_media: None, + signaling_addr: Some(provider_dest), + }, + ); + + self.sip_index + .insert(sip_call_id.clone(), (call_id.clone(), device_leg_id)); + + // Forward INVITE to provider with SDP rewriting. + let mut fwd_invite = invite.clone(); + fwd_invite.prepend_header("Record-Route", &format!("")); + + if let Some(contact) = fwd_invite.get_header("Contact").map(|s| s.to_string()) { + let new_contact = rewrite_sip_uri(&contact, pub_ip, lan_port); + if new_contact != contact { + fwd_invite.set_header("Contact", &new_contact); + } + } + + // Tell provider to send RTP to our provider_rtp port. + if fwd_invite.has_sdp_body() { + let (new_body, _) = rewrite_sdp(&fwd_invite.body, pub_ip, provider_rtp.port); + fwd_invite.body = new_body; + fwd_invite.update_content_length(); + } + + let _ = socket.send_to(&fwd_invite.serialize(), provider_dest).await; + + self.calls.insert(call_id.clone(), call); + Some(call_id) + } + + // ----------------------------------------------------------------------- + // Leg management (mid-call add/remove) + // ----------------------------------------------------------------------- + + /// Add a SIP leg to an existing call (e.g., add external participant). + pub async fn add_external_leg( + &mut self, + call_id: &str, + number: &str, + provider_config: &ProviderConfig, + config: &AppConfig, + rtp_pool: &mut RtpPortPool, + socket: &UdpSocket, + public_ip: Option<&str>, + registered_aor: &str, + ) -> Option { + let call = self.calls.get(call_id)?; + let lan_ip = &config.proxy.lan_ip; + let lan_port = config.proxy.lan_port; + + let provider_dest: SocketAddr = provider_config.outbound_proxy.to_socket_addr()?; + let rtp_alloc = rtp_pool.allocate().await?; + let sip_call_id = generate_call_id(None); + let leg_id = self.next_leg_id(); + + let leg_config = SipLegConfig { + lan_ip: lan_ip.clone(), + lan_port, + public_ip: public_ip.map(|s| s.to_string()), + sip_target: provider_dest, + username: Some(provider_config.username.clone()), + password: Some(provider_config.password.clone()), + registered_aor: Some(registered_aor.to_string()), + codecs: provider_config.codecs.clone(), + rtp_port: rtp_alloc.port, + }; + + let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config); + let to_uri = format!("sip:{number}@{}", provider_config.domain); + sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await; + + let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); + + let leg_info = LegInfo { + id: leg_id.clone(), + kind: LegKind::SipProvider, + state: LegState::Inviting, + codec_pt, + sip_leg: Some(sip_leg), + sip_call_id: Some(sip_call_id.clone()), + webrtc_session_id: None, + rtp_socket: Some(rtp_alloc.socket.clone()), + rtp_port: rtp_alloc.port, + remote_media: None, + signaling_addr: Some(provider_dest), + }; + + self.sip_index + .insert(sip_call_id, (call_id.to_string(), leg_id.clone())); + + let call = self.calls.get_mut(call_id).unwrap(); + call.legs.insert(leg_id.clone(), leg_info); + + emit_event( + &self.out_tx, + "leg_added", + serde_json::json!({ + "call_id": call_id, + "leg_id": leg_id, + "kind": "sip-provider", + "state": "inviting", + "number": number, + }), + ); + + Some(leg_id) + } + + /// Remove a leg from a call. + pub async fn remove_leg( + &mut self, + call_id: &str, + leg_id: &str, + socket: &UdpSocket, + ) -> bool { + let call = match self.calls.get_mut(call_id) { + Some(c) => c, + None => return false, + }; + + // Remove from mixer. + call.remove_leg_from_mixer(leg_id).await; + + // Send BYE if it's a SIP leg. + if let Some(leg) = call.legs.get_mut(leg_id) { + if let Some(sip_leg) = &mut leg.sip_leg { + if let Some(hangup_bytes) = sip_leg.build_hangup() { + let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await; + } + } + leg.state = LegState::Terminated; + + // Clean up SIP index. + if let Some(sip_cid) = &leg.sip_call_id { + self.sip_index.remove(sip_cid); + } + } + + emit_event( + &self.out_tx, + "leg_removed", + serde_json::json!({ "call_id": call_id, "leg_id": leg_id }), + ); + + // If fewer than 2 active legs remain, end the call. + let active_legs = call + .legs + .values() + .filter(|l| l.state != LegState::Terminated) + .count(); + if active_legs <= 1 { + let duration = call.duration_secs(); + emit_event( + &self.out_tx, + "call_ended", + serde_json::json!({ "call_id": call_id, "reason": "last_leg", "duration": duration }), + ); + self.terminate_call(call_id).await; + } + + true + } + + // ----------------------------------------------------------------------- + // Hangup + cleanup + // ----------------------------------------------------------------------- + + /// Hangup a call by internal call ID. + pub async fn hangup(&mut self, call_id: &str, socket: &UdpSocket) -> bool { + let call = match self.calls.get_mut(call_id) { + Some(c) => c, + None => return false, + }; + + if call.state == CallState::Terminated { + return false; + } + + let duration = call.duration_secs(); + + // Send BYE to all SIP legs. + for leg in call.legs.values_mut() { + if leg.state == LegState::Terminated { + continue; + } + if let Some(sip_leg) = &mut leg.sip_leg { + if let Some(hangup_bytes) = sip_leg.build_hangup() { + let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await; + } + } else if let Some(addr) = leg.signaling_addr { + // Passthrough leg — send a simple BYE. + if let Some(sip_cid) = &leg.sip_call_id { + let bye = format!( + "BYE sip:hangup SIP/2.0\r\n\ + Via: SIP/2.0/UDP 0.0.0.0:0;branch=z9hG4bK-hangup\r\n\ + Call-ID: {sip_cid}\r\n\ + CSeq: 99 BYE\r\n\ + Max-Forwards: 70\r\n\ + Content-Length: 0\r\n\r\n" + ); + let _ = socket.send_to(bye.as_bytes(), addr).await; + } + } + leg.state = LegState::Terminated; + } + + emit_event( + &self.out_tx, + "call_ended", + serde_json::json!({ "call_id": call_id, "reason": "hangup_command", "duration": duration }), + ); + + self.terminate_call(call_id).await; + true + } + + /// Clean up a terminated call: shutdown mixer, remove from indexes. + async fn terminate_call(&mut self, call_id: &str) { + if let Some(mut call) = self.calls.remove(call_id) { + call.state = CallState::Terminated; + call.shutdown_mixer().await; + + // Remove all SIP index entries for this call. + self.sip_index.retain(|_, (cid, _)| cid != call_id); + } + } + + // ----------------------------------------------------------------------- + // Voicemail + // ----------------------------------------------------------------------- - /// Route a call to voicemail: answer the INVITE, play greeting, record message. async fn route_to_voicemail( &mut self, call_id: &str, @@ -678,21 +1034,17 @@ impl CallManager { let lan_ip = &config.proxy.lan_ip; let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); - // Allocate RTP port for the voicemail session. let rtp_alloc = match rtp_pool.allocate().await { Some(a) => a, None => { - let resp = - SipMessage::create_response(503, "Service Unavailable", invite, None); + let resp = SipMessage::create_response(503, "Service Unavailable", invite, None); let _ = socket.send_to(&resp.serialize(), from_addr).await; return None; } }; - // Determine provider's preferred codec. - let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); // default G.722 + let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); - // Build SDP with our RTP port. let sdp = sip_proto::helpers::build_sdp(&sip_proto::helpers::SdpOptions { ip: pub_ip, port: rtp_alloc.port, @@ -700,11 +1052,8 @@ impl CallManager { ..Default::default() }); - // Answer the INVITE with 200 OK. let response = SipMessage::create_response( - 200, - "OK", - invite, + 200, "OK", invite, Some(sip_proto::message::ResponseOptions { to_tag: Some(sip_proto::helpers::generate_tag()), contact: Some(format!("", lan_ip, config.proxy.lan_port)), @@ -715,63 +1064,68 @@ impl CallManager { ); let _ = socket.send_to(&response.serialize(), from_addr).await; - // Extract provider media from original SDP. let provider_media = if invite.has_sdp_body() { - sip_proto::helpers::parse_sdp_endpoint(&invite.body) + parse_sdp_endpoint(&invite.body) .and_then(|ep| format!("{}:{}", ep.address, ep.port).parse().ok()) } else { - Some(from_addr) // fallback to signaling address + Some(from_addr) }; let provider_media = provider_media.unwrap_or(from_addr); - // Create a voicemail call entry for BYE routing. - let call = PassthroughCall { - id: call_id.to_string(), - sip_call_id: invite.call_id().to_string(), - state: CallState::Voicemail, - direction: CallDirection::Inbound, - created_at: std::time::Instant::now(), - caller_number: Some(caller_number.to_string()), - callee_number: None, - provider_id: provider_id.to_string(), - provider_addr: from_addr, - provider_media: Some(provider_media), - device_addr: from_addr, // no device — just use provider addr as placeholder - device_media: None, - rtp_port: rtp_alloc.port, - rtp_socket: rtp_alloc.socket.clone(), - pkt_from_device: 0, - pkt_from_provider: 0, - }; - self.calls.insert(invite.call_id().to_string(), call); + // Create a minimal call for BYE routing. + let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.to_string(), self.out_tx.clone()); + let mut call = Call::new( + call_id.to_string(), + CallDirection::Inbound, + provider_id.to_string(), + mixer_cmd_tx, + mixer_task, + ); + call.state = CallState::Voicemail; + call.caller_number = Some(caller_number.to_string()); - // Build recording file path. + let provider_leg_id = format!("{call_id}-prov"); + call.legs.insert( + provider_leg_id.clone(), + LegInfo { + id: provider_leg_id.clone(), + kind: LegKind::SipProvider, + state: LegState::Connected, + codec_pt, + sip_leg: None, + sip_call_id: Some(invite.call_id().to_string()), + webrtc_session_id: None, + rtp_socket: Some(rtp_alloc.socket.clone()), + rtp_port: rtp_alloc.port, + remote_media: Some(provider_media), + signaling_addr: Some(from_addr), + }, + ); + + self.sip_index.insert( + invite.call_id().to_string(), + (call_id.to_string(), provider_leg_id), + ); + self.calls.insert(call_id.to_string(), call); + + // Build recording path. let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis(); - let recording_dir = format!(".nogit/voicemail/default"); + let recording_dir = "nogit/voicemail/default".to_string(); let recording_path = format!("{recording_dir}/msg-{timestamp}.wav"); - - // Look for a greeting WAV file. let greeting_wav = find_greeting_wav(); - // Spawn the voicemail session. let out_tx = self.out_tx.clone(); let call_id_owned = call_id.to_string(); let caller_owned = caller_number.to_string(); let rtp_socket = rtp_alloc.socket; tokio::spawn(async move { crate::voicemail::run_voicemail_session( - rtp_socket, - provider_media, - codec_pt, - greeting_wav, - recording_path, - 120_000, // max 120 seconds - call_id_owned, - caller_owned, - out_tx, + rtp_socket, provider_media, codec_pt, + greeting_wav, recording_path, 120_000, + call_id_owned, caller_owned, out_tx, ) .await; }); @@ -779,7 +1133,9 @@ impl CallManager { Some(call_id.to_string()) } - // --- Internal helpers --- + // ----------------------------------------------------------------------- + // Internal helpers + // ----------------------------------------------------------------------- fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option { for device in &config.devices { @@ -787,13 +1143,11 @@ impl CallManager { return Some(addr); } } - None // No device registered — caller goes to voicemail. + None } } -/// Find a voicemail greeting WAV file. fn find_greeting_wav() -> Option { - // Check common locations for a pre-generated greeting. let candidates = [ ".nogit/voicemail/default/greeting.wav", ".nogit/voicemail/greeting.wav", @@ -803,83 +1157,5 @@ fn find_greeting_wav() -> Option { return Some(path.to_string()); } } - None // No greeting found — voicemail will just play the beep. -} - -/// Rewrite SDP for provider→device direction (use LAN IP). -fn rewrite_sdp_for_device(msg: &mut SipMessage, lan_ip: &str, rtp_port: u16) { - if msg.has_sdp_body() { - let (new_body, _original) = rewrite_sdp(&msg.body, lan_ip, rtp_port); - msg.body = new_body; - msg.update_content_length(); - } -} - -/// Rewrite SDP for device→provider direction (use public IP). -fn rewrite_sdp_for_provider(msg: &mut SipMessage, pub_ip: &str, rtp_port: u16) { - if msg.has_sdp_body() { - let (new_body, _original) = rewrite_sdp(&msg.body, pub_ip, rtp_port); - msg.body = new_body; - msg.update_content_length(); - } -} - -/// Bidirectional RTP relay loop. -/// Receives packets on the relay socket and forwards based on source address. -async fn rtp_relay_loop( - socket: Arc, - device_addr: SocketAddr, - provider_addr: SocketAddr, -) { - let mut buf = vec![0u8; 65535]; - let device_ip = device_addr.ip().to_string(); - let provider_ip = provider_addr.ip().to_string(); - - // Track learned media endpoints (may differ from signaling addresses). - let mut learned_device: Option = None; - let mut learned_provider: Option = None; - - loop { - match socket.recv_from(&mut buf).await { - Ok((n, from)) => { - let data = &buf[..n]; - let from_ip = from.ip().to_string(); - - if from_ip == device_ip || learned_device.map(|d| d == from).unwrap_or(false) { - // From device → forward to provider. - if learned_device.is_none() { - learned_device = Some(from); - } - if let Some(target) = learned_provider { - let _ = socket.send_to(data, target).await; - } else { - // Provider media not yet learned; try signaling address. - let _ = socket.send_to(data, provider_addr).await; - } - } else if from_ip == provider_ip - || learned_provider.map(|p| p == from).unwrap_or(false) - { - // From provider → forward to device. - if learned_provider.is_none() { - learned_provider = Some(from); - } - if let Some(target) = learned_device { - let _ = socket.send_to(data, target).await; - } else { - let _ = socket.send_to(data, device_addr).await; - } - } else { - // Unknown source — try to identify by known device addresses. - // For now, assume it's the device if not from provider IP range. - if learned_device.is_none() { - learned_device = Some(from); - } - } - } - Err(_) => { - // Socket closed or error — exit relay. - break; - } - } - } + None } diff --git a/rust/crates/proxy-engine/src/leg_io.rs b/rust/crates/proxy-engine/src/leg_io.rs new file mode 100644 index 0000000..1b4ac9a --- /dev/null +++ b/rust/crates/proxy-engine/src/leg_io.rs @@ -0,0 +1,80 @@ +//! Leg I/O task spawners. +//! +//! Each SIP leg gets two tasks: +//! - Inbound: recv_from on RTP socket → strip header → send RtpPacket to mixer channel +//! - Outbound: recv encoded RTP from mixer channel → send_to remote media endpoint +//! +//! WebRTC leg I/O is handled inside webrtc_engine.rs (on_track + track.write). + +use crate::mixer::RtpPacket; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::net::UdpSocket; +use tokio::sync::mpsc; + +/// Channel pair for connecting a leg to the mixer. +pub struct LegChannels { + /// Mixer receives decoded packets from this leg. + pub inbound_tx: mpsc::Sender, + pub inbound_rx: mpsc::Receiver, + /// Mixer sends encoded RTP to this leg. + pub outbound_tx: mpsc::Sender>, + pub outbound_rx: mpsc::Receiver>, +} + +/// Create a channel pair for a leg. +pub fn create_leg_channels() -> LegChannels { + let (inbound_tx, inbound_rx) = mpsc::channel::(64); + let (outbound_tx, outbound_rx) = mpsc::channel::>(8); + LegChannels { + inbound_tx, + inbound_rx, + outbound_tx, + outbound_rx, + } +} + +/// Spawn the inbound I/O task for a SIP leg. +/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer. +/// Returns the JoinHandle (exits when the inbound_tx channel is dropped). +pub fn spawn_sip_inbound( + rtp_socket: Arc, + inbound_tx: mpsc::Sender, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut buf = vec![0u8; 1500]; + loop { + match rtp_socket.recv_from(&mut buf).await { + Ok((n, _from)) => { + if n < 12 { + continue; // Too small for RTP header. + } + let pt = buf[1] & 0x7F; + let payload = buf[12..n].to_vec(); + if payload.is_empty() { + continue; + } + if inbound_tx.send(RtpPacket { payload, payload_type: pt }).await.is_err() { + break; // Channel closed — leg removed. + } + } + Err(_) => break, // Socket error. + } + } + }) +} + +/// Spawn the outbound I/O task for a SIP leg. +/// Reads encoded RTP packets from the mixer and sends them to the remote media endpoint. +/// Returns the JoinHandle (exits when the outbound_rx channel is closed). +pub fn spawn_sip_outbound( + rtp_socket: Arc, + remote_media: SocketAddr, + mut outbound_rx: mpsc::Receiver>, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + while let Some(rtp_data) = outbound_rx.recv().await { + let _ = rtp_socket.send_to(&rtp_data, remote_media).await; + } + }) +} diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index 048e139..cee40c8 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -12,6 +12,8 @@ mod call_manager; mod config; mod dtmf; mod ipc; +mod leg_io; +mod mixer; mod provider; mod recorder; mod registrar; @@ -131,11 +133,13 @@ async fn handle_command( "hangup" => handle_hangup(engine, out_tx, &cmd).await, "make_call" => handle_make_call(engine, out_tx, &cmd).await, "get_status" => handle_get_status(engine, out_tx, &cmd).await, + "add_leg" => handle_add_leg(engine, out_tx, &cmd).await, + "remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await, // WebRTC commands — lock webrtc only (no engine contention). "webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await, "webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await, "webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await, - // webrtc_link needs both: engine (for RTP socket) and webrtc (for session). + // webrtc_link needs both: engine (for mixer channels) and webrtc (for session). "webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await, _ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)), } @@ -259,14 +263,11 @@ async fn handle_sip_packet( } // 3. Route to existing call by SIP Call-ID. - // Check if this Call-ID belongs to an active call (avoids borrow conflict). if eng.call_mgr.has_call(msg.call_id()) { let config_ref = eng.config.as_ref().unwrap().clone(); - // Temporarily take registrar to avoid overlapping borrows. - let registrar_dummy = Registrar::new(eng.out_tx.clone()); if eng .call_mgr - .route_sip_message(&msg, from_addr, socket, &config_ref, ®istrar_dummy) + .route_sip_message(&msg, from_addr, socket, &config_ref) .await { return; @@ -578,8 +579,8 @@ async fn handle_webrtc_ice(webrtc: Arc>, out_tx: &OutTx, cmd } } -/// Handle `webrtc_link` — link a WebRTC session to a SIP call for audio bridging. -/// Briefly locks engine to get the RTP socket, then locks webrtc to set up the bridge. +/// Handle `webrtc_link` — link a WebRTC session to a call's mixer for audio bridging. +/// Creates channels, adds WebRTC leg to the call, wires the WebRTC engine. /// Locks are never held simultaneously — no deadlock possible. async fn handle_webrtc_link( engine: Arc>, @@ -595,44 +596,67 @@ async fn handle_webrtc_link( Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } }; - let provider_addr = match cmd.params.get("provider_media_addr").and_then(|v| v.as_str()) { - Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing provider_media_addr"); return; } - }; - let provider_port = match cmd.params.get("provider_media_port").and_then(|v| v.as_u64()) { - Some(p) => p as u16, - None => { respond_err(out_tx, &cmd.id, "missing provider_media_port"); return; } - }; - let sip_pt = cmd.params.get("sip_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8; - let provider_media: SocketAddr = match format!("{provider_addr}:{provider_port}").parse() { - Ok(a) => a, - Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad address: {e}")); return; } - }; + // Create channels for the WebRTC leg. + let channels = crate::leg_io::create_leg_channels(); - // Briefly lock engine to get the B2BUA call's RTP socket. - let rtp_socket = { + // Briefly lock engine to add the WebRTC leg to the call's mixer. + { let eng = engine.lock().await; - eng.call_mgr.get_b2bua_rtp_socket(&call_id) - }; // engine lock released here + let call = match eng.call_mgr.calls.get(&call_id) { + Some(c) => c, + None => { + respond_err(out_tx, &cmd.id, &format!("call {call_id} not found")); + return; + } + }; + // Add to mixer via channel. + call.add_leg_to_mixer( + &session_id, + codec_lib::PT_OPUS, + channels.inbound_rx, + channels.outbound_tx, + ) + .await; + } // engine lock released - let rtp_socket = match rtp_socket { - Some(s) => s, - None => { - respond_err(out_tx, &cmd.id, &format!("call {call_id} not found or no RTP socket")); - return; - } - }; - - let bridge_info = crate::webrtc_engine::SipBridgeInfo { - provider_media, - sip_pt, - rtp_socket, - }; - - // Lock webrtc to set up the audio bridge. + // Lock webrtc to wire the channels. let mut wrtc = webrtc.lock().await; - if wrtc.link_to_sip(&session_id, &call_id, bridge_info).await { + if wrtc + .link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx) + .await + { + // Also store the WebRTC leg info in the call. + drop(wrtc); // Release webrtc lock before re-acquiring engine. + { + let mut eng = engine.lock().await; + if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) { + call.legs.insert( + session_id.clone(), + crate::call::LegInfo { + id: session_id.clone(), + kind: crate::call::LegKind::WebRtc, + state: crate::call::LegState::Connected, + codec_pt: codec_lib::PT_OPUS, + sip_leg: None, + sip_call_id: None, + webrtc_session_id: Some(session_id.clone()), + rtp_socket: None, + rtp_port: 0, + remote_media: None, + signaling_addr: None, + }, + ); + } + } + + emit_event(out_tx, "leg_added", serde_json::json!({ + "call_id": call_id, + "leg_id": session_id, + "kind": "webrtc", + "state": "connected", + })); + respond_ok(out_tx, &cmd.id, serde_json::json!({ "session_id": session_id, "call_id": call_id, @@ -643,6 +667,98 @@ async fn handle_webrtc_link( } } +/// Handle `add_leg` — add a new SIP leg to an existing call. +async fn handle_add_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let number = match cmd.params.get("number").and_then(|v| v.as_str()) { + Some(n) => n.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing number"); return; } + }; + let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str()); + + let mut eng = engine.lock().await; + let config_ref = match &eng.config { + Some(c) => c.clone(), + None => { respond_err(out_tx, &cmd.id, "not configured"); return; } + }; + + // Resolve provider. + let provider_config = if let Some(pid) = provider_id { + config_ref.providers.iter().find(|p| p.id == pid).cloned() + } else { + config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider) + }; + + let provider_config = match provider_config { + Some(p) => p, + None => { respond_err(out_tx, &cmd.id, "no provider available"); return; } + }; + + // Get registered AOR. + let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address( + &provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()) + ).await { + let ps = ps_arc.lock().await; + ps.registered_aor.clone() + } else { + format!("sip:{}@{}", provider_config.username, provider_config.domain) + }; + + let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address( + &provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()) + ).await { + let ps = ps_arc.lock().await; + ps.public_ip.clone() + } else { + None + }; + + let socket = match &eng.transport { + Some(t) => t.socket(), + None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + }; + + let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng; + let rtp_pool = rtp_pool.as_mut().unwrap(); + + let leg_id = call_mgr.add_external_leg( + &call_id, &number, &provider_config, &config_ref, + rtp_pool, &socket, public_ip.as_deref(), ®istered_aor, + ).await; + + match leg_id { + Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })), + None => respond_err(out_tx, &cmd.id, "failed to add leg"), + } +} + +/// Handle `remove_leg` — remove a leg from a call. +async fn handle_remove_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + }; + + let mut eng = engine.lock().await; + let socket = match &eng.transport { + Some(t) => t.socket(), + None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + }; + + if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await { + respond_ok(out_tx, &cmd.id, serde_json::json!({})); + } else { + respond_err(out_tx, &cmd.id, &format!("call/leg not found")); + } +} + /// Handle `webrtc_close` — close a WebRTC session. /// Uses only the WebRTC lock. async fn handle_webrtc_close(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { diff --git a/rust/crates/proxy-engine/src/mixer.rs b/rust/crates/proxy-engine/src/mixer.rs new file mode 100644 index 0000000..878356d --- /dev/null +++ b/rust/crates/proxy-engine/src/mixer.rs @@ -0,0 +1,232 @@ +//! Audio mixer — mix-minus engine for multiparty calls. +//! +//! Each Call spawns one mixer task. Legs communicate with the mixer via +//! tokio mpsc channels — no shared mutable state, no lock contention. +//! +//! The mixer runs a 20ms tick loop: +//! 1. Drain inbound channels, decode to PCM, resample to 16kHz +//! 2. Compute total mix (sum of all legs' PCM as i32) +//! 3. For each leg: mix-minus = total - own, resample to leg codec rate, encode, send + +use crate::ipc::{emit_event, OutTx}; +use crate::rtp::{build_rtp_header, rtp_clock_increment}; +use codec_lib::{codec_sample_rate, TranscodeState}; +use std::collections::HashMap; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio::time::{self, Duration, MissedTickBehavior}; + +/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample. +const MIX_RATE: u32 = 16000; +/// Samples per 20ms frame at the mixing rate. +const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020 + +/// A raw RTP payload received from a leg (no RTP header). +pub struct RtpPacket { + pub payload: Vec, + pub payload_type: u8, +} + +/// Commands sent to the mixer task via a control channel. +pub enum MixerCommand { + /// Add a new leg to the mix. + AddLeg { + leg_id: String, + codec_pt: u8, + inbound_rx: mpsc::Receiver, + outbound_tx: mpsc::Sender>, + }, + /// Remove a leg from the mix (channels are dropped, I/O tasks exit). + RemoveLeg { leg_id: String }, + /// Shut down the mixer. + Shutdown, +} + +/// Internal per-leg state inside the mixer. +struct MixerLegSlot { + codec_pt: u8, + transcoder: TranscodeState, + inbound_rx: mpsc::Receiver, + outbound_tx: mpsc::Sender>, + /// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus. + last_pcm_frame: Vec, + /// Number of consecutive ticks with no inbound packet. + silent_ticks: u32, + // RTP output state. + rtp_seq: u16, + rtp_ts: u32, + rtp_ssrc: u32, +} + +/// Spawn the mixer task for a call. Returns the command sender and task handle. +pub fn spawn_mixer( + call_id: String, + out_tx: OutTx, +) -> (mpsc::Sender, JoinHandle<()>) { + let (cmd_tx, cmd_rx) = mpsc::channel::(32); + + let handle = tokio::spawn(async move { + mixer_loop(call_id, cmd_rx, out_tx).await; + }); + + (cmd_tx, handle) +} + +/// The 20ms mixing loop. +async fn mixer_loop( + call_id: String, + mut cmd_rx: mpsc::Receiver, + out_tx: OutTx, +) { + let mut legs: HashMap = HashMap::new(); + let mut interval = time::interval(Duration::from_millis(20)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + interval.tick().await; + + // 1. Process control commands (non-blocking). + loop { + match cmd_rx.try_recv() { + Ok(MixerCommand::AddLeg { + leg_id, + codec_pt, + inbound_rx, + outbound_tx, + }) => { + let transcoder = match TranscodeState::new() { + Ok(t) => t, + Err(e) => { + emit_event( + &out_tx, + "mixer_error", + serde_json::json!({ + "call_id": call_id, + "leg_id": leg_id, + "error": format!("codec init: {e}"), + }), + ); + continue; + } + }; + legs.insert( + leg_id, + MixerLegSlot { + codec_pt, + transcoder, + inbound_rx, + outbound_tx, + last_pcm_frame: vec![0i16; MIX_FRAME_SIZE], + silent_ticks: 0, + rtp_seq: 0, + rtp_ts: 0, + rtp_ssrc: rand::random(), + }, + ); + } + Ok(MixerCommand::RemoveLeg { leg_id }) => { + legs.remove(&leg_id); + // Channels drop → I/O tasks exit cleanly. + } + Ok(MixerCommand::Shutdown) => return, + Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Disconnected) => return, + } + } + + if legs.is_empty() { + continue; + } + + // 2. Drain inbound packets, decode to 16kHz PCM. + let leg_ids: Vec = legs.keys().cloned().collect(); + for lid in &leg_ids { + let slot = legs.get_mut(lid).unwrap(); + + // Drain channel, keep only the latest packet (simple jitter handling). + let mut latest: Option = None; + loop { + match slot.inbound_rx.try_recv() { + Ok(pkt) => latest = Some(pkt), + Err(_) => break, + } + } + + if let Some(pkt) = latest { + slot.silent_ticks = 0; + match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) { + Ok((pcm, rate)) => { + // Resample to mixing rate if needed. + let pcm_mix = if rate == MIX_RATE { + pcm + } else { + slot.transcoder + .resample(&pcm, rate, MIX_RATE) + .unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE]) + }; + // Pad or truncate to exactly MIX_FRAME_SIZE. + let mut frame = pcm_mix; + frame.resize(MIX_FRAME_SIZE, 0); + slot.last_pcm_frame = frame; + } + Err(_) => { + // Decode failed — use silence. + slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE]; + } + } + } else { + slot.silent_ticks += 1; + // After 150 ticks (3 seconds) of silence, zero out to avoid stale audio. + if slot.silent_ticks > 150 { + slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE]; + } + } + } + + // 3. Compute total mix (sum of all legs as i32 to avoid overflow). + let mut total_mix = vec![0i32; MIX_FRAME_SIZE]; + for slot in legs.values() { + for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) { + total_mix[i] += s as i32; + } + } + + // 4. For each leg: mix-minus, resample, encode, send. + for slot in legs.values_mut() { + // Mix-minus: total minus this leg's own contribution. + let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE); + for i in 0..MIX_FRAME_SIZE { + let sample = + (total_mix[i] - slot.last_pcm_frame[i] as i32).clamp(-32768, 32767) as i16; + mix_minus.push(sample); + } + + // Resample from 16kHz to the leg's codec native rate. + let target_rate = codec_sample_rate(slot.codec_pt); + let resampled = if target_rate == MIX_RATE { + mix_minus + } else { + slot.transcoder + .resample(&mix_minus, MIX_RATE, target_rate) + .unwrap_or_default() + }; + + // Encode to the leg's codec. + let encoded = match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) { + Ok(e) if !e.is_empty() => e, + _ => continue, + }; + + // Build RTP packet with header. + let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc); + let mut rtp = header.to_vec(); + rtp.extend_from_slice(&encoded); + + slot.rtp_seq = slot.rtp_seq.wrapping_add(1); + slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); + + // Non-blocking send — drop frame if channel is full. + let _ = slot.outbound_tx.try_send(rtp); + } + } +} diff --git a/rust/crates/proxy-engine/src/webrtc_engine.rs b/rust/crates/proxy-engine/src/webrtc_engine.rs index 467b1c3..40558f4 100644 --- a/rust/crates/proxy-engine/src/webrtc_engine.rs +++ b/rust/crates/proxy-engine/src/webrtc_engine.rs @@ -1,16 +1,17 @@ -//! WebRTC engine — manages browser PeerConnections with SIP audio bridging. +//! WebRTC engine — manages browser PeerConnections. //! -//! Browser Opus audio → Rust PeerConnection → transcode via codec-lib → SIP RTP -//! SIP RTP → transcode via codec-lib → Rust PeerConnection → Browser Opus +//! Audio bridging is now channel-based: +//! - Browser Opus audio → on_track → mixer inbound channel +//! - Mixer outbound channel → Opus RTP → TrackLocalStaticRTP → browser +//! +//! The mixer handles all transcoding. The WebRTC engine just shuttles raw Opus. use crate::ipc::{emit_event, OutTx}; -use crate::rtp::{build_rtp_header, rtp_clock_increment}; -use codec_lib::{TranscodeState, PT_G722, PT_OPUS}; +use crate::mixer::RtpPacket; +use codec_lib::PT_OPUS; use std::collections::HashMap; -use std::net::SocketAddr; use std::sync::Arc; -use tokio::net::UdpSocket; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use webrtc::api::media_engine::MediaEngine; use webrtc::api::APIBuilder; use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit; @@ -22,26 +23,14 @@ use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability; use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP; use webrtc::track::track_local::{TrackLocal, TrackLocalWriter}; -/// SIP-side bridge info for a WebRTC session. -#[derive(Clone)] -pub struct SipBridgeInfo { - /// Provider's media endpoint (RTP destination). - pub provider_media: SocketAddr, - /// Provider's codec payload type (e.g. 9 for G.722). - pub sip_pt: u8, - /// The allocated RTP socket for bidirectional audio with the provider. - /// This is the socket whose port was advertised in SDP, so the provider - /// sends RTP here and expects RTP from this port. - pub rtp_socket: Arc, -} - /// A managed WebRTC session. struct WebRtcSession { pc: Arc, local_track: Arc, call_id: Option, - /// SIP bridge — set when the session is linked to a call. - sip_bridge: Arc>>, + /// Channel sender for forwarding browser Opus audio to the mixer. + /// Set when the session is linked to a call via link_to_mixer(). + mixer_tx: Arc>>>, } /// Manages all WebRTC sessions. @@ -58,7 +47,7 @@ impl WebRtcEngine { } } - /// Handle a WebRTC offer from a browser. + /// Handle a WebRTC offer from a browser — create PeerConnection, return SDP answer. pub async fn handle_offer( &mut self, session_id: &str, @@ -101,8 +90,9 @@ impl WebRtcEngine { .await .map_err(|e| format!("add track: {e}"))?; - // Shared SIP bridge info (populated when linked to a call). - let sip_bridge: Arc>> = Arc::new(Mutex::new(None)); + // Shared mixer channel sender (populated when linked to a call). + let mixer_tx: Arc>>> = + Arc::new(Mutex::new(None)); // ICE candidate handler. let out_tx_ice = self.out_tx.clone(); @@ -153,14 +143,14 @@ impl WebRtcEngine { })); // Track handler — receives Opus audio from the browser. - // When SIP bridge is set, transcodes and forwards to provider. + // Forwards raw Opus payload to the mixer channel (when linked). let out_tx_track = self.out_tx.clone(); let sid_track = session_id.to_string(); - let sip_bridge_for_track = sip_bridge.clone(); + let mixer_tx_for_track = mixer_tx.clone(); pc.on_track(Box::new(move |track, _receiver, _transceiver| { let out_tx = out_tx_track.clone(); let sid = sid_track.clone(); - let bridge = sip_bridge_for_track.clone(); + let mixer_tx = mixer_tx_for_track.clone(); Box::pin(async move { let codec_info = track.codec(); emit_event( @@ -173,8 +163,8 @@ impl WebRtcEngine { }), ); - // Spawn the browser→SIP audio forwarding task. - tokio::spawn(browser_to_sip_loop(track, bridge, out_tx, sid)); + // Spawn browser→mixer forwarding task. + tokio::spawn(browser_to_mixer_loop(track, mixer_tx, out_tx, sid)); }) })); @@ -201,43 +191,41 @@ impl WebRtcEngine { pc, local_track, call_id: None, - sip_bridge, + mixer_tx, }, ); Ok(answer_sdp) } - /// Link a WebRTC session to a SIP call — sets up bidirectional audio bridge. - /// - Browser→SIP: already running via on_track handler, will start forwarding - /// once bridge info is set. - /// - SIP→Browser: spawned here, reads from the RTP socket and sends to browser. - pub async fn link_to_sip( + /// Link a WebRTC session to a call's mixer via channels. + /// - `inbound_tx`: browser audio goes TO the mixer through this channel + /// - `outbound_rx`: mixed audio comes FROM the mixer through this channel + pub async fn link_to_mixer( &mut self, session_id: &str, call_id: &str, - bridge_info: SipBridgeInfo, + inbound_tx: mpsc::Sender, + outbound_rx: mpsc::Receiver>, ) -> bool { - if let Some(session) = self.sessions.get_mut(session_id) { - session.call_id = Some(call_id.to_string()); + let session = match self.sessions.get_mut(session_id) { + Some(s) => s, + None => return false, + }; - // Spawn SIP → browser audio loop (provider RTP → transcode → Opus → WebRTC track). - let local_track = session.local_track.clone(); - let rtp_socket = bridge_info.rtp_socket.clone(); - let sip_pt = bridge_info.sip_pt; - let out_tx = self.out_tx.clone(); - let sid = session_id.to_string(); - tokio::spawn(sip_to_browser_loop( - rtp_socket, local_track, sip_pt, out_tx, sid, - )); + session.call_id = Some(call_id.to_string()); - // Set bridge info — this unblocks the browser→SIP loop (already running). - let mut bridge = session.sip_bridge.lock().await; - *bridge = Some(bridge_info); - true - } else { - false + // Set the mixer sender so the on_track loop starts forwarding. + { + let mut tx = session.mixer_tx.lock().await; + *tx = Some(inbound_tx); } + + // Spawn mixer→browser outbound task. + let local_track = session.local_track.clone(); + tokio::spawn(mixer_to_browser_loop(outbound_rx, local_track)); + + true } pub async fn add_ice_candidate( @@ -272,90 +260,48 @@ impl WebRtcEngine { } Ok(()) } - - pub fn has_session(&self, session_id: &str) -> bool { - self.sessions.contains_key(session_id) - } } -/// Browser → SIP audio forwarding loop. -/// Reads Opus RTP from the browser, transcodes to the SIP codec, sends to provider. -async fn browser_to_sip_loop( +/// Browser → Mixer audio forwarding loop. +/// Reads Opus RTP from the browser track, sends raw Opus payload to the mixer channel. +async fn browser_to_mixer_loop( track: Arc, - sip_bridge: Arc>>, + mixer_tx: Arc>>>, out_tx: OutTx, session_id: String, ) { - // Create a persistent codec state for this direction. - let mut transcoder = match TranscodeState::new() { - Ok(t) => t, - Err(e) => { - emit_event( - &out_tx, - "webrtc_error", - serde_json::json!({ "session_id": session_id, "error": format!("codec init: {e}") }), - ); - return; - } - }; - let mut buf = vec![0u8; 1500]; let mut count = 0u64; - let mut to_sip_seq: u16 = 0; - let mut to_sip_ts: u32 = 0; - let to_sip_ssrc: u32 = rand::random(); loop { match track.read(&mut buf).await { Ok((rtp_packet, _attributes)) => { count += 1; - // Get the SIP bridge info (may not be set yet if call isn't linked). - let bridge = sip_bridge.lock().await; - let bridge_info = match bridge.as_ref() { - Some(b) => b.clone(), - None => continue, // Not linked to a SIP call yet — drop the packet. - }; - drop(bridge); // Release lock before doing I/O. - - // Extract Opus payload from the RTP packet (skip 12-byte header). let payload = &rtp_packet.payload; if payload.is_empty() { continue; } - // Transcode Opus → SIP codec (e.g. G.722). - let sip_payload = match transcoder.transcode( - payload, - PT_OPUS, - bridge_info.sip_pt, - Some("to_sip"), - ) { - Ok(p) if !p.is_empty() => p, - _ => continue, - }; - - // Build SIP RTP packet. - let header = build_rtp_header(bridge_info.sip_pt, to_sip_seq, to_sip_ts, to_sip_ssrc); - let mut sip_rtp = header.to_vec(); - sip_rtp.extend_from_slice(&sip_payload); - - to_sip_seq = to_sip_seq.wrapping_add(1); - to_sip_ts = to_sip_ts.wrapping_add(rtp_clock_increment(bridge_info.sip_pt)); - - // Send to provider via the RTP socket (correct source port matching our SDP). - let _ = bridge_info - .rtp_socket - .send_to(&sip_rtp, bridge_info.provider_media) - .await; + // Send raw Opus payload to mixer (if linked). + let tx = mixer_tx.lock().await; + if let Some(ref tx) = *tx { + let _ = tx + .send(RtpPacket { + payload: payload.to_vec(), + payload_type: PT_OPUS, + }) + .await; + } + drop(tx); if count == 1 || count == 50 || count % 500 == 0 { emit_event( &out_tx, - "webrtc_audio_tx", + "webrtc_audio_rx", serde_json::json!({ "session_id": session_id, - "direction": "browser_to_sip", + "direction": "browser_to_mixer", "packet_count": count, }), ); @@ -366,85 +312,13 @@ async fn browser_to_sip_loop( } } -/// SIP → Browser audio forwarding loop. -/// Reads RTP from the provider (via the allocated RTP socket), transcodes to Opus, -/// and writes to the WebRTC local track for delivery to the browser. -async fn sip_to_browser_loop( - rtp_socket: Arc, +/// Mixer → Browser audio forwarding loop. +/// Reads Opus-encoded RTP packets from the mixer and writes to the WebRTC track. +async fn mixer_to_browser_loop( + mut outbound_rx: mpsc::Receiver>, local_track: Arc, - sip_pt: u8, - out_tx: OutTx, - session_id: String, ) { - let mut transcoder = match TranscodeState::new() { - Ok(t) => t, - Err(e) => { - emit_event( - &out_tx, - "webrtc_error", - serde_json::json!({ - "session_id": session_id, - "error": format!("sip_to_browser codec init: {e}"), - }), - ); - return; - } - }; - - let mut buf = vec![0u8; 1500]; - let mut count = 0u64; - let mut seq: u16 = 0; - let mut ts: u32 = 0; - let ssrc: u32 = rand::random(); - - loop { - match rtp_socket.recv_from(&mut buf).await { - Ok((n, _from)) => { - if n < 12 { - continue; // Too small for RTP header. - } - count += 1; - - // Extract payload (skip 12-byte RTP header). - let payload = &buf[12..n]; - if payload.is_empty() { - continue; - } - - // Transcode SIP codec → Opus. - let opus_payload = match transcoder.transcode( - payload, - sip_pt, - PT_OPUS, - Some("sip_to_browser"), - ) { - Ok(p) if !p.is_empty() => p, - _ => continue, - }; - - // Build Opus RTP packet. - let header = build_rtp_header(PT_OPUS, seq, ts, ssrc); - let mut packet = header.to_vec(); - packet.extend_from_slice(&opus_payload); - - seq = seq.wrapping_add(1); - ts = ts.wrapping_add(960); // Opus: 48000 Hz × 20ms = 960 samples - - let _ = local_track.write(&packet).await; - - if count == 1 || count == 50 || count % 500 == 0 { - emit_event( - &out_tx, - "webrtc_audio_rx", - serde_json::json!({ - "session_id": session_id, - "direction": "sip_to_browser", - "packet_count": count, - }), - ); - } - } - Err(_) => break, // Socket closed. - } + while let Some(rtp_data) = outbound_rx.recv().await { + let _ = local_track.write(&rtp_data).await; } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index dc3db2a..07d68dd 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.13.0', + version: '1.14.0', description: 'undefined' } diff --git a/ts/frontend.ts b/ts/frontend.ts index e4bf19b..2a1b3bb 100644 --- a/ts/frontend.ts +++ b/ts/frontend.ts @@ -128,14 +128,14 @@ async function handleRequest( } } - // API: add leg to call. + // API: add leg to call (device — not yet implemented, needs device-to-call routing). if (url.pathname.startsWith('/api/call/') && url.pathname.endsWith('/addleg') && method === 'POST') { try { const callId = url.pathname.split('/')[3]; const body = await readJsonBody(req); if (!body?.deviceId) return sendJson(res, { ok: false, error: 'missing deviceId' }, 400); - const ok = callManager?.addDeviceToCall(callId, body.deviceId) ?? false; - return sendJson(res, { ok }); + // TODO: implement device leg addition (needs SIP INVITE to device). + return sendJson(res, { ok: false, error: 'not yet implemented' }, 501); } catch (e: any) { return sendJson(res, { ok: false, error: e.message }, 400); } @@ -147,8 +147,9 @@ async function handleRequest( const callId = url.pathname.split('/')[3]; const body = await readJsonBody(req); if (!body?.number) return sendJson(res, { ok: false, error: 'missing number' }, 400); - const ok = callManager?.addExternalToCall(callId, body.number, body.providerId) ?? false; - return sendJson(res, { ok }); + const { addLeg: addLegFn } = await import('./proxybridge.ts'); + const legId = await addLegFn(callId, body.number, body.providerId); + return sendJson(res, { ok: !!legId, legId }); } catch (e: any) { return sendJson(res, { ok: false, error: e.message }, 400); } @@ -160,22 +161,22 @@ async function handleRequest( const callId = url.pathname.split('/')[3]; const body = await readJsonBody(req); if (!body?.legId) return sendJson(res, { ok: false, error: 'missing legId' }, 400); - const ok = callManager?.removeLegFromCall(callId, body.legId) ?? false; + const { removeLeg: removeLegFn } = await import('./proxybridge.ts'); + const ok = await removeLegFn(callId, body.legId); return sendJson(res, { ok }); } catch (e: any) { return sendJson(res, { ok: false, error: e.message }, 400); } } - // API: transfer leg. + // API: transfer leg (not yet implemented). if (url.pathname === '/api/transfer' && method === 'POST') { try { const body = await readJsonBody(req); if (!body?.sourceCallId || !body?.legId || !body?.targetCallId) { return sendJson(res, { ok: false, error: 'missing sourceCallId, legId, or targetCallId' }, 400); } - const ok = callManager?.transferLeg(body.sourceCallId, body.legId, body.targetCallId) ?? false; - return sendJson(res, { ok }); + return sendJson(res, { ok: false, error: 'not yet implemented' }, 501); } catch (e: any) { return sendJson(res, { ok: false, error: e.message }, 400); } diff --git a/ts/proxybridge.ts b/ts/proxybridge.ts index c363f61..fafcb85 100644 --- a/ts/proxybridge.ts +++ b/ts/proxybridge.ts @@ -238,6 +238,38 @@ export async function webrtcLink(sessionId: string, callId: string, providerMedi } } +/** + * Add an external SIP leg to an existing call (multiparty). + */ +export async function addLeg(callId: string, number: string, providerId?: string): Promise { + if (!bridge || !initialized) return null; + try { + const result = await bridge.sendCommand('add_leg', { + call_id: callId, + number, + provider_id: providerId, + } as any); + return (result as any)?.leg_id || null; + } catch (e: any) { + logFn?.(`[proxy-engine] add_leg error: ${e?.message || e}`); + return null; + } +} + +/** + * Remove a leg from a call. + */ +export async function removeLeg(callId: string, legId: string): Promise { + if (!bridge || !initialized) return false; + try { + await bridge.sendCommand('remove_leg', { call_id: callId, leg_id: legId } as any); + return true; + } catch (e: any) { + logFn?.(`[proxy-engine] remove_leg error: ${e?.message || e}`); + return false; + } +} + /** * Close a WebRTC session. */ diff --git a/ts/sipproxy.ts b/ts/sipproxy.ts index 16bfc0b..2ee994c 100644 --- a/ts/sipproxy.ts +++ b/ts/sipproxy.ts @@ -39,6 +39,8 @@ import { webrtcIce, webrtcLink, webrtcClose, + addLeg, + removeLeg, } from './proxybridge.ts'; import type { IIncomingCallEvent, @@ -359,6 +361,19 @@ async function startProxyEngine(): Promise { log(`[sip] unhandled ${data.method_or_status} Call-ID=${data.call_id?.slice(0, 20)} from=${data.from_addr}:${data.from_port}`); }); + // Leg events (multiparty). + onProxyEvent('leg_added', (data: any) => { + log(`[leg] added: call=${data.call_id} leg=${data.leg_id} kind=${data.kind} state=${data.state}`); + }); + + onProxyEvent('leg_removed', (data: any) => { + log(`[leg] removed: call=${data.call_id} leg=${data.leg_id}`); + }); + + onProxyEvent('leg_state_changed', (data: any) => { + log(`[leg] state: call=${data.call_id} leg=${data.leg_id} → ${data.state}`); + }); + // WebRTC events from Rust — forward ICE candidates to browser via WebSocket. onProxyEvent('webrtc_ice_candidate', (data: any) => { // Find the browser's WebSocket by session ID and send the ICE candidate. diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index dc3db2a..07d68dd 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.13.0', + version: '1.14.0', description: 'undefined' }