diff --git a/changelog.md b/changelog.md index a883680..621c155 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-04-10 - 1.13.0 - feat(proxy-engine,webrtc) +add B2BUA SIP leg handling and WebRTC call bridging for outbound calls + +- introduce a new SipLeg module to manage outbound provider dialogs, including INVITE lifecycle, digest auth retries, ACK handling, media endpoint tracking, and termination +- store outbound dashboard calls as B2BUA calls in the call manager and emit provider media details on call_answered for bridge setup +- separate SIP and WebRTC engine locking to avoid contention and deadlocks while linking sessions to call RTP sockets +- add bidirectional RTP bridging between provider SIP media and browser WebRTC audio using the allocated RTP socket +- wire browser webrtc-accept events in the frontend and sipproxy so session-to-call linking can occur when media and acceptance arrive in either order + ## 2026-04-10 - 1.12.0 - feat(proxy-engine) add Rust-based outbound calling, WebRTC bridging, and voicemail handling diff --git a/rust/crates/proxy-engine/src/call_manager.rs b/rust/crates/proxy-engine/src/call_manager.rs index ba44045..906978a 100644 --- a/rust/crates/proxy-engine/src/call_manager.rs +++ b/rust/crates/proxy-engine/src/call_manager.rs @@ -15,6 +15,7 @@ use crate::dtmf::DtmfDetector; use crate::ipc::{emit_event, OutTx}; use crate::registrar::Registrar; use crate::rtp::RtpPortPool; +use crate::sip_leg::{LegState, SipLeg, SipLegAction, SipLegConfig}; use sip_proto::helpers::parse_sdp_endpoint; use sip_proto::message::SipMessage; use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri}; @@ -24,9 +25,23 @@ use std::sync::Arc; use std::time::Instant; use tokio::net::UdpSocket; +/// A B2BUA call with a SipLeg for the provider side. +/// The other side is either a WebRTC session or another SipLeg. +pub struct B2buaCall { + pub id: String, + pub provider_leg: SipLeg, + pub webrtc_session_id: Option, + pub number: String, + pub created_at: std::time::Instant, + /// RTP socket allocated for the provider leg (used for WebRTC audio bridging). + pub rtp_socket: Option>, +} + pub struct CallManager { /// Active passthrough calls, keyed by SIP Call-ID. calls: HashMap, + /// Active B2BUA calls, keyed by SIP Call-ID of the provider leg. + b2bua_calls: HashMap, /// Call ID counter. next_call_num: u64, /// Output channel for events. @@ -37,6 +52,7 @@ impl CallManager { pub fn new(out_tx: OutTx) -> Self { Self { calls: HashMap::new(), + b2bua_calls: HashMap::new(), next_call_num: 0, out_tx, } @@ -68,7 +84,12 @@ impl CallManager { ) -> bool { let sip_call_id = msg.call_id().to_string(); - // Check if this Call-ID belongs to an active call. + // Check B2BUA calls first (provider legs with dialog management). + if self.b2bua_calls.contains_key(&sip_call_id) { + return self.route_b2bua_message(&sip_call_id, msg, from_addr, socket).await; + } + + // Check passthrough calls. if !self.calls.contains_key(&sip_call_id) { return false; } @@ -494,14 +515,89 @@ impl CallManager { /// Check if a SIP Call-ID belongs to any active call. pub fn has_call(&self, sip_call_id: &str) -> bool { - self.calls.contains_key(sip_call_id) + self.calls.contains_key(sip_call_id) || self.b2bua_calls.contains_key(sip_call_id) } - // --- Dashboard outbound call (B2BUA) --- + /// Get the RTP socket for a B2BUA call (by our internal call ID). + /// Used by webrtc_link to set up the audio bridge. + pub fn get_b2bua_rtp_socket(&self, call_id: &str) -> Option> { + for b2bua in self.b2bua_calls.values() { + if b2bua.id == call_id { + return b2bua.rtp_socket.clone(); + } + } + None + } - /// Initiate an outbound call from the dashboard. - /// Builds an INVITE from scratch and sends it to the provider. - /// The browser connects separately via WebRTC and gets linked to this call. + // --- B2BUA outbound call --- + + /// Route a SIP message to a B2BUA call's provider leg. + async fn route_b2bua_message( + &mut self, + sip_call_id: &str, + msg: &SipMessage, + from_addr: SocketAddr, + socket: &UdpSocket, + ) -> bool { + let b2bua = match self.b2bua_calls.get_mut(sip_call_id) { + Some(c) => c, + None => return false, + }; + + let call_id = b2bua.id.clone(); + let action = b2bua.provider_leg.handle_message(msg); + + match action { + SipLegAction::None => {} + SipLegAction::Send(buf) => { + let _ = socket.send_to(&buf, b2bua.provider_leg.config.sip_target).await; + } + SipLegAction::StateChange(LegState::Ringing) => { + emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); + } + SipLegAction::ConnectedWithAck(ack_buf) => { + let _ = socket.send_to(&ack_buf, b2bua.provider_leg.config.sip_target).await; + let remote = b2bua.provider_leg.remote_media; + let sip_pt = b2bua.provider_leg.config.codecs.first().copied().unwrap_or(9); + emit_event(&self.out_tx, "call_answered", serde_json::json!({ + "call_id": call_id, + "provider_media_addr": remote.map(|a| a.ip().to_string()), + "provider_media_port": remote.map(|a| a.port()), + "sip_pt": sip_pt, + })); + } + SipLegAction::Terminated(reason) => { + let duration = b2bua.created_at.elapsed().as_secs(); + emit_event(&self.out_tx, "call_ended", serde_json::json!({ + "call_id": call_id, "reason": reason, "duration": duration, + })); + self.b2bua_calls.remove(sip_call_id); + return true; + } + SipLegAction::SendAndTerminate(buf, reason) => { + let _ = socket.send_to(&buf, from_addr).await; + let duration = b2bua.created_at.elapsed().as_secs(); + emit_event(&self.out_tx, "call_ended", serde_json::json!({ + "call_id": call_id, "reason": reason, "duration": duration, + })); + self.b2bua_calls.remove(sip_call_id); + return true; + } + SipLegAction::AuthRetry { ack_407, invite_with_auth } => { + let target = b2bua.provider_leg.config.sip_target; + if let Some(ack) = ack_407 { + let _ = socket.send_to(&ack, target).await; + } + let _ = socket.send_to(&invite_with_auth, target).await; + } + _ => {} + } + + true + } + + /// Initiate an outbound call from the dashboard using B2BUA mode. + /// Creates a SipLeg for the provider side with proper dialog + auth handling. pub async fn make_outbound_call( &mut self, number: &str, @@ -515,7 +611,6 @@ impl CallManager { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; - let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() { Some(a) => a, @@ -528,70 +623,38 @@ impl CallManager { None => return None, }; - // Build the SIP Call-ID for this new dialog. + // Build the SIP Call-ID for the provider dialog. let sip_call_id = sip_proto::helpers::generate_call_id(None); - // Build SDP offer. - let sdp = sip_proto::helpers::build_sdp(&sip_proto::helpers::SdpOptions { - ip: pub_ip, - port: rtp_alloc.port, - payload_types: &provider_config.codecs, - ..Default::default() - }); - - // Build INVITE. - let to_uri = format!("sip:{number}@{}", provider_config.domain); - let invite = SipMessage::create_request( - "INVITE", - &to_uri, - sip_proto::message::RequestOptions { - via_host: pub_ip.to_string(), - via_port: lan_port, - via_transport: None, - via_branch: Some(sip_proto::helpers::generate_branch()), - from_uri: registered_aor.to_string(), - from_display_name: None, - from_tag: Some(sip_proto::helpers::generate_tag()), - to_uri: to_uri.clone(), - to_display_name: None, - to_tag: None, - call_id: Some(sip_call_id.clone()), - cseq: Some(1), - contact: Some(format!("")), - max_forwards: Some(70), - body: Some(sdp), - content_type: Some("application/sdp".to_string()), - extra_headers: Some(vec![ - ("User-Agent".to_string(), "SipRouter/1.0".to_string()), - ("Allow".to_string(), "INVITE, ACK, OPTIONS, CANCEL, BYE, INFO".to_string()), - ]), - }, - ); - - // Send INVITE to provider. - let _ = socket.send_to(&invite.serialize(), provider_dest).await; - - // Create call entry — device_addr is a dummy (WebRTC will be linked later). - let dummy_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); - let call = PassthroughCall { - id: call_id.clone(), - sip_call_id: sip_call_id.clone(), - state: CallState::SettingUp, - direction: CallDirection::Outbound, - created_at: Instant::now(), - caller_number: Some(registered_aor.to_string()), - callee_number: Some(number.to_string()), - provider_id: provider_config.id.clone(), - provider_addr: provider_dest, - provider_media: None, - device_addr: dummy_addr, - device_media: None, + // Create a SipLeg with provider credentials for auth handling. + let leg_config = SipLegConfig { + lan_ip: lan_ip.clone(), + lan_port, + public_ip: public_ip.map(|s| s.to_string()), + sip_target: provider_dest, + username: Some(provider_config.username.clone()), + password: Some(provider_config.password.clone()), + registered_aor: Some(registered_aor.to_string()), + codecs: provider_config.codecs.clone(), rtp_port: rtp_alloc.port, - rtp_socket: rtp_alloc.socket.clone(), - pkt_from_device: 0, - pkt_from_provider: 0, }; - self.calls.insert(sip_call_id, call); + + let mut leg = SipLeg::new(format!("{call_id}-prov"), leg_config); + + // Send the INVITE. + let to_uri = format!("sip:{number}@{}", provider_config.domain); + leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await; + + // Store as B2BUA call. + let b2bua = B2buaCall { + id: call_id.clone(), + provider_leg: leg, + webrtc_session_id: None, + number: number.to_string(), + created_at: std::time::Instant::now(), + rtp_socket: Some(rtp_alloc.socket.clone()), + }; + self.b2bua_calls.insert(sip_call_id, b2bua); Some(call_id) } diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index ba41bb2..048e139 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -16,6 +16,7 @@ mod provider; mod recorder; mod registrar; mod rtp; +mod sip_leg; mod sip_transport; mod voicemail; mod webrtc_engine; @@ -35,14 +36,15 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::UdpSocket; use tokio::sync::{mpsc, Mutex}; -/// Shared mutable state for the proxy engine. +/// Shared mutable state for the proxy engine (SIP side). +/// WebRTC is intentionally kept in a separate lock to avoid contention +/// between SIP packet handlers and WebRTC command handlers. struct ProxyEngine { config: Option, transport: Option, provider_mgr: ProviderManager, registrar: Registrar, call_mgr: CallManager, - webrtc: WebRtcEngine, rtp_pool: Option, out_tx: OutTx, } @@ -55,7 +57,6 @@ impl ProxyEngine { provider_mgr: ProviderManager::new(out_tx.clone()), registrar: Registrar::new(out_tx.clone()), call_mgr: CallManager::new(out_tx.clone()), - webrtc: WebRtcEngine::new(out_tx.clone()), rtp_pool: None, out_tx, } @@ -83,9 +84,12 @@ async fn main() { // Emit ready event. emit_event(&out_tx, "ready", serde_json::json!({})); - // Shared engine state. + // Shared engine state (SIP side). let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone()))); + // WebRTC engine — separate lock to avoid deadlock with SIP handlers. + let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone()))); + // Read commands from stdin. let stdin = tokio::io::stdin(); let reader = BufReader::new(stdin); @@ -105,25 +109,34 @@ async fn main() { }; let engine = engine.clone(); + let webrtc = webrtc.clone(); let out_tx = out_tx.clone(); // Handle commands — some are async, so we spawn. tokio::spawn(async move { - handle_command(engine, &out_tx, cmd).await; + handle_command(engine, webrtc, &out_tx, cmd).await; }); } } -async fn handle_command(engine: Arc>, out_tx: &OutTx, cmd: Command) { +async fn handle_command( + engine: Arc>, + webrtc: Arc>, + out_tx: &OutTx, + cmd: Command, +) { match cmd.method.as_str() { + // SIP commands — lock engine only. "configure" => handle_configure(engine, out_tx, &cmd).await, "hangup" => handle_hangup(engine, out_tx, &cmd).await, "make_call" => handle_make_call(engine, out_tx, &cmd).await, "get_status" => handle_get_status(engine, out_tx, &cmd).await, - "webrtc_offer" => handle_webrtc_offer(engine, out_tx, &cmd).await, - "webrtc_ice" => handle_webrtc_ice(engine, out_tx, &cmd).await, - "webrtc_link" => handle_webrtc_link(engine, out_tx, &cmd).await, - "webrtc_close" => handle_webrtc_close(engine, out_tx, &cmd).await, + // WebRTC commands — lock webrtc only (no engine contention). + "webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await, + "webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await, + "webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await, + // webrtc_link needs both: engine (for RTP socket) and webrtc (for session). + "webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await, _ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)), } } @@ -524,7 +537,8 @@ async fn handle_hangup(engine: Arc>, out_tx: &OutTx, cmd: &Co } /// Handle `webrtc_offer` — browser sends SDP offer, we create PeerConnection and return answer. -async fn handle_webrtc_offer(engine: Arc>, out_tx: &OutTx, cmd: &Command) { +/// Uses only the WebRTC lock — no contention with SIP handlers. +async fn handle_webrtc_offer(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } @@ -534,8 +548,8 @@ async fn handle_webrtc_offer(engine: Arc>, out_tx: &OutTx, cm None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; } }; - let mut eng = engine.lock().await; - match eng.webrtc.handle_offer(&session_id, &offer_sdp).await { + let mut wrtc = webrtc.lock().await; + match wrtc.handle_offer(&session_id, &offer_sdp).await { Ok(answer_sdp) => { respond_ok(out_tx, &cmd.id, serde_json::json!({ "session_id": session_id, @@ -547,7 +561,8 @@ async fn handle_webrtc_offer(engine: Arc>, out_tx: &OutTx, cm } /// Handle `webrtc_ice` — forward ICE candidate from browser to Rust PeerConnection. -async fn handle_webrtc_ice(engine: Arc>, out_tx: &OutTx, cmd: &Command) { +/// Uses only the WebRTC lock. +async fn handle_webrtc_ice(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } @@ -556,15 +571,22 @@ async fn handle_webrtc_ice(engine: Arc>, out_tx: &OutTx, cmd: let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str()); let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16); - let eng = engine.lock().await; - match eng.webrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await { + let wrtc = webrtc.lock().await; + match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await { Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})), Err(e) => respond_err(out_tx, &cmd.id, &e), } } /// Handle `webrtc_link` — link a WebRTC session to a SIP call for audio bridging. -async fn handle_webrtc_link(engine: Arc>, out_tx: &OutTx, cmd: &Command) { +/// Briefly locks engine to get the RTP socket, then locks webrtc to set up the bridge. +/// Locks are never held simultaneously — no deadlock possible. +async fn handle_webrtc_link( + engine: Arc>, + webrtc: Arc>, + out_tx: &OutTx, + cmd: &Command, +) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } @@ -588,19 +610,29 @@ async fn handle_webrtc_link(engine: Arc>, out_tx: &OutTx, cmd Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad address: {e}")); return; } }; - let mut eng = engine.lock().await; - let sip_socket = match &eng.transport { - Some(t) => t.socket(), - None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + // Briefly lock engine to get the B2BUA call's RTP socket. + let rtp_socket = { + let eng = engine.lock().await; + eng.call_mgr.get_b2bua_rtp_socket(&call_id) + }; // engine lock released here + + let rtp_socket = match rtp_socket { + Some(s) => s, + None => { + respond_err(out_tx, &cmd.id, &format!("call {call_id} not found or no RTP socket")); + return; + } }; let bridge_info = crate::webrtc_engine::SipBridgeInfo { provider_media, sip_pt, - sip_socket, + rtp_socket, }; - if eng.webrtc.link_to_sip(&session_id, &call_id, bridge_info).await { + // Lock webrtc to set up the audio bridge. + let mut wrtc = webrtc.lock().await; + if wrtc.link_to_sip(&session_id, &call_id, bridge_info).await { respond_ok(out_tx, &cmd.id, serde_json::json!({ "session_id": session_id, "call_id": call_id, @@ -612,14 +644,15 @@ async fn handle_webrtc_link(engine: Arc>, out_tx: &OutTx, cmd } /// Handle `webrtc_close` — close a WebRTC session. -async fn handle_webrtc_close(engine: Arc>, out_tx: &OutTx, cmd: &Command) { +/// Uses only the WebRTC lock. +async fn handle_webrtc_close(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } }; - let mut eng = engine.lock().await; - match eng.webrtc.close_session(&session_id).await { + let mut wrtc = webrtc.lock().await; + match wrtc.close_session(&session_id).await { Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})), Err(e) => respond_err(out_tx, &cmd.id, &e), } diff --git a/rust/crates/proxy-engine/src/sip_leg.rs b/rust/crates/proxy-engine/src/sip_leg.rs new file mode 100644 index 0000000..40f1c4a --- /dev/null +++ b/rust/crates/proxy-engine/src/sip_leg.rs @@ -0,0 +1,475 @@ +//! SipLeg — manages one side of a B2BUA call. +//! +//! Handles the full INVITE lifecycle: +//! - Send INVITE with SDP +//! - Handle 407 Proxy Authentication (digest auth retry) +//! - Handle 200 OK (ACK, learn media endpoint) +//! - Handle BYE/CANCEL (teardown) +//! - Track SIP dialog state (early → confirmed → terminated) +//! +//! Ported from ts/call/sip-leg.ts. + +use sip_proto::dialog::{DialogState, SipDialog}; +use sip_proto::helpers::{ + build_sdp, compute_digest_auth, generate_branch, generate_tag, parse_digest_challenge, + parse_sdp_endpoint, SdpOptions, +}; +use sip_proto::message::{RequestOptions, SipMessage}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::net::UdpSocket; + +/// State of a SIP leg. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LegState { + Inviting, + Ringing, + Connected, + Terminating, + Terminated, +} + +/// Configuration for creating a SIP leg. +pub struct SipLegConfig { + /// Proxy LAN IP (for Via, Contact, SDP). + pub lan_ip: String, + /// Proxy LAN port. + pub lan_port: u16, + /// Public IP (for provider-facing legs). + pub public_ip: Option, + /// SIP target endpoint (provider outbound proxy or device address). + pub sip_target: SocketAddr, + /// Provider credentials (for 407 auth). + pub username: Option, + pub password: Option, + pub registered_aor: Option, + /// Codec payload types to offer. + pub codecs: Vec, + /// Our RTP port for SDP. + pub rtp_port: u16, +} + +/// A SIP leg with full dialog management. +pub struct SipLeg { + pub id: String, + pub state: LegState, + pub config: SipLegConfig, + pub dialog: Option, + + /// The INVITE we sent (needed for CANCEL and 407 ACK). + invite: Option, + /// Original unauthenticated INVITE (for re-ACKing retransmitted 407s). + orig_invite: Option, + /// Whether we've attempted digest auth. + auth_attempted: bool, + + /// Remote media endpoint (learned from SDP in 200 OK). + pub remote_media: Option, +} + +impl SipLeg { + pub fn new(id: String, config: SipLegConfig) -> Self { + Self { + id, + state: LegState::Inviting, + config, + dialog: None, + invite: None, + orig_invite: None, + auth_attempted: false, + remote_media: None, + } + } + + /// Build and send an INVITE to establish this leg. + pub async fn send_invite( + &mut self, + from_uri: &str, + to_uri: &str, + sip_call_id: &str, + socket: &UdpSocket, + ) { + let ip = self + .config + .public_ip + .as_deref() + .unwrap_or(&self.config.lan_ip); + + let sdp = build_sdp(&SdpOptions { + ip, + port: self.config.rtp_port, + payload_types: &self.config.codecs, + ..Default::default() + }); + + let invite = SipMessage::create_request( + "INVITE", + to_uri, + RequestOptions { + via_host: ip.to_string(), + via_port: self.config.lan_port, + via_transport: None, + via_branch: Some(generate_branch()), + from_uri: from_uri.to_string(), + from_display_name: None, + from_tag: Some(generate_tag()), + to_uri: to_uri.to_string(), + to_display_name: None, + to_tag: None, + call_id: Some(sip_call_id.to_string()), + cseq: Some(1), + contact: Some(format!("", self.config.lan_port)), + max_forwards: Some(70), + body: Some(sdp), + content_type: Some("application/sdp".to_string()), + extra_headers: Some(vec![ + ("User-Agent".to_string(), "SipRouter/1.0".to_string()), + ]), + }, + ); + + self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port)); + self.invite = Some(invite.clone()); + self.state = LegState::Inviting; + + let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await; + } + + /// Handle an incoming SIP message routed to this leg. + /// Returns an optional reply to send (e.g. ACK, auth retry INVITE). + pub fn handle_message(&mut self, msg: &SipMessage) -> SipLegAction { + if msg.is_response() { + self.handle_response(msg) + } else { + self.handle_request(msg) + } + } + + fn handle_response(&mut self, msg: &SipMessage) -> SipLegAction { + let code = msg.status_code().unwrap_or(0); + let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase(); + + if cseq_method != "INVITE" { + return SipLegAction::None; + } + + // Handle retransmitted 407 for the original unauthenticated INVITE. + if self.auth_attempted { + if let Some(dialog) = &self.dialog { + let response_cseq: u32 = msg + .get_header("CSeq") + .and_then(|s| s.split_whitespace().next()) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + if response_cseq < dialog.local_cseq && code >= 400 { + // ACK the retransmitted error response. + if let Some(orig) = &self.orig_invite { + let ack = build_non_2xx_ack(orig, msg); + return SipLegAction::Send(ack.serialize()); + } + return SipLegAction::None; + } + } + } + + // Handle 407 Proxy Authentication Required. + if code == 407 { + return self.handle_auth_challenge(msg); + } + + // Update dialog state. + if let Some(dialog) = &mut self.dialog { + dialog.process_response(msg); + } + + if code == 180 || code == 183 { + self.state = LegState::Ringing; + SipLegAction::StateChange(LegState::Ringing) + } else if code >= 200 && code < 300 { + // ACK the 200 OK. + let ack_buf = if let Some(dialog) = &self.dialog { + let ack = dialog.create_ack(); + Some(ack.serialize()) + } else { + None + }; + + // If already connected (200 retransmit), just re-ACK. + if self.state == LegState::Connected { + return match ack_buf { + Some(buf) => SipLegAction::Send(buf), + None => SipLegAction::None, + }; + } + + // Learn media endpoint from SDP. + if msg.has_sdp_body() { + if let Some(ep) = parse_sdp_endpoint(&msg.body) { + if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { + self.remote_media = Some(addr); + } + } + } + + self.state = LegState::Connected; + + match ack_buf { + Some(buf) => SipLegAction::ConnectedWithAck(buf), + None => SipLegAction::StateChange(LegState::Connected), + } + } else if code >= 300 { + self.state = LegState::Terminated; + if let Some(dialog) = &mut self.dialog { + dialog.terminate(); + } + SipLegAction::Terminated(format!("rejected_{code}")) + } else { + SipLegAction::None // 1xx provisional + } + } + + fn handle_auth_challenge(&mut self, msg: &SipMessage) -> SipLegAction { + if self.auth_attempted { + self.state = LegState::Terminated; + if let Some(dialog) = &mut self.dialog { + dialog.terminate(); + } + return SipLegAction::Terminated("auth_rejected".to_string()); + } + self.auth_attempted = true; + + let challenge_header = match msg.get_header("Proxy-Authenticate") { + Some(h) => h, + None => { + self.state = LegState::Terminated; + return SipLegAction::Terminated("407_no_challenge".to_string()); + } + }; + + let challenge = match parse_digest_challenge(challenge_header) { + Some(c) => c, + None => { + self.state = LegState::Terminated; + return SipLegAction::Terminated("407_bad_challenge".to_string()); + } + }; + + let password = match &self.config.password { + Some(p) => p.clone(), + None => { + self.state = LegState::Terminated; + return SipLegAction::Terminated("407_no_password".to_string()); + } + }; + + let aor = match &self.config.registered_aor { + Some(a) => a.clone(), + None => { + self.state = LegState::Terminated; + return SipLegAction::Terminated("407_no_aor".to_string()); + } + }; + + let username = aor + .trim_start_matches("sip:") + .trim_start_matches("sips:") + .split('@') + .next() + .unwrap_or("") + .to_string(); + + let dest_uri = self + .invite + .as_ref() + .and_then(|i| i.request_uri()) + .unwrap_or("") + .to_string(); + + let auth_value = compute_digest_auth( + &username, + &password, + &challenge.realm, + &challenge.nonce, + "INVITE", + &dest_uri, + challenge.algorithm.as_deref(), + challenge.opaque.as_deref(), + ); + + // ACK the 407. + let mut ack_buf = None; + if let Some(invite) = &self.invite { + let ack = build_non_2xx_ack(invite, msg); + ack_buf = Some(ack.serialize()); + } + + // Save original INVITE for retransmission handling. + self.orig_invite = self.invite.clone(); + + // Build authenticated INVITE with same From tag, CSeq=2. + let ip = self + .config + .public_ip + .as_deref() + .unwrap_or(&self.config.lan_ip); + let from_tag = self + .dialog + .as_ref() + .map(|d| d.local_tag.clone()) + .unwrap_or_else(generate_tag); + + let sdp = build_sdp(&SdpOptions { + ip, + port: self.config.rtp_port, + payload_types: &self.config.codecs, + ..Default::default() + }); + + let call_id = self + .dialog + .as_ref() + .map(|d| d.call_id.clone()) + .unwrap_or_default(); + + let invite_auth = SipMessage::create_request( + "INVITE", + &dest_uri, + RequestOptions { + via_host: ip.to_string(), + via_port: self.config.lan_port, + via_transport: None, + via_branch: Some(generate_branch()), + from_uri: aor, + from_display_name: None, + from_tag: Some(from_tag), + to_uri: dest_uri.clone(), + to_display_name: None, + to_tag: None, + call_id: Some(call_id), + cseq: Some(2), + contact: Some(format!("", self.config.lan_port)), + max_forwards: Some(70), + body: Some(sdp), + content_type: Some("application/sdp".to_string()), + extra_headers: Some(vec![ + ("Proxy-Authorization".to_string(), auth_value), + ("User-Agent".to_string(), "SipRouter/1.0".to_string()), + ]), + }, + ); + + self.invite = Some(invite_auth.clone()); + if let Some(dialog) = &mut self.dialog { + dialog.local_cseq = 2; + } + + // Return both the ACK for the 407 and the new authenticated INVITE. + let invite_buf = invite_auth.serialize(); + SipLegAction::AuthRetry { + ack_407: ack_buf, + invite_with_auth: invite_buf, + } + } + + fn handle_request(&mut self, msg: &SipMessage) -> SipLegAction { + let method = msg.method().unwrap_or(""); + + if method == "BYE" { + let ok = SipMessage::create_response(200, "OK", msg, None); + self.state = LegState::Terminated; + if let Some(dialog) = &mut self.dialog { + dialog.terminate(); + } + return SipLegAction::SendAndTerminate(ok.serialize(), "bye".to_string()); + } + + if method == "INFO" { + let ok = SipMessage::create_response(200, "OK", msg, None); + return SipLegAction::Send(ok.serialize()); + } + + SipLegAction::None + } + + /// Build a BYE or CANCEL to tear down this leg. + pub fn build_hangup(&mut self) -> Option> { + let dialog = self.dialog.as_mut()?; + + let msg = if dialog.state == DialogState::Confirmed { + dialog.create_request("BYE", None, None, None) + } else if dialog.state == DialogState::Early { + if let Some(invite) = &self.invite { + dialog.create_cancel(invite) + } else { + return None; + } + } else { + return None; + }; + + self.state = LegState::Terminating; + dialog.terminate(); + Some(msg.serialize()) + } + + /// Get the SIP Call-ID for routing. + pub fn sip_call_id(&self) -> Option<&str> { + self.dialog.as_ref().map(|d| d.call_id.as_str()) + } +} + +/// Actions produced by the SipLeg message handler. +pub enum SipLegAction { + /// No action needed. + None, + /// Send a SIP message (ACK, 200 OK to INFO, etc.). + Send(Vec), + /// Leg state changed. + StateChange(LegState), + /// Connected — send this ACK. + ConnectedWithAck(Vec), + /// Terminated with a reason. + Terminated(String), + /// Send 200 OK and terminate. + SendAndTerminate(Vec, String), + /// 407 auth retry — send ACK for 407, then send new INVITE with auth. + AuthRetry { + ack_407: Option>, + invite_with_auth: Vec, + }, +} + +/// Build an ACK for a non-2xx response (same transaction as the INVITE). +fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage { + let via = original_invite.get_header("Via").unwrap_or("").to_string(); + let from = original_invite + .get_header("From") + .unwrap_or("") + .to_string(); + let to = response.get_header("To").unwrap_or("").to_string(); + let call_id = original_invite.call_id().to_string(); + let cseq_num: u32 = original_invite + .get_header("CSeq") + .and_then(|s| s.split_whitespace().next()) + .and_then(|s| s.parse().ok()) + .unwrap_or(1); + + let ruri = original_invite + .request_uri() + .unwrap_or("sip:unknown") + .to_string(); + + SipMessage::new( + format!("ACK {ruri} SIP/2.0"), + vec![ + ("Via".to_string(), via), + ("From".to_string(), from), + ("To".to_string(), to), + ("Call-ID".to_string(), call_id), + ("CSeq".to_string(), format!("{cseq_num} ACK")), + ("Max-Forwards".to_string(), "70".to_string()), + ("Content-Length".to_string(), "0".to_string()), + ], + String::new(), + ) +} diff --git a/rust/crates/proxy-engine/src/webrtc_engine.rs b/rust/crates/proxy-engine/src/webrtc_engine.rs index b3852ec..467b1c3 100644 --- a/rust/crates/proxy-engine/src/webrtc_engine.rs +++ b/rust/crates/proxy-engine/src/webrtc_engine.rs @@ -29,8 +29,10 @@ pub struct SipBridgeInfo { pub provider_media: SocketAddr, /// Provider's codec payload type (e.g. 9 for G.722). pub sip_pt: u8, - /// The SIP UDP socket for sending RTP to the provider. - pub sip_socket: Arc, + /// The allocated RTP socket for bidirectional audio with the provider. + /// This is the socket whose port was advertised in SDP, so the provider + /// sends RTP here and expects RTP from this port. + pub rtp_socket: Arc, } /// A managed WebRTC session. @@ -206,7 +208,10 @@ impl WebRtcEngine { Ok(answer_sdp) } - /// Link a WebRTC session to a SIP call — sets up the audio bridge. + /// Link a WebRTC session to a SIP call — sets up bidirectional audio bridge. + /// - Browser→SIP: already running via on_track handler, will start forwarding + /// once bridge info is set. + /// - SIP→Browser: spawned here, reads from the RTP socket and sends to browser. pub async fn link_to_sip( &mut self, session_id: &str, @@ -215,6 +220,18 @@ impl WebRtcEngine { ) -> bool { if let Some(session) = self.sessions.get_mut(session_id) { session.call_id = Some(call_id.to_string()); + + // Spawn SIP → browser audio loop (provider RTP → transcode → Opus → WebRTC track). + let local_track = session.local_track.clone(); + let rtp_socket = bridge_info.rtp_socket.clone(); + let sip_pt = bridge_info.sip_pt; + let out_tx = self.out_tx.clone(); + let sid = session_id.to_string(); + tokio::spawn(sip_to_browser_loop( + rtp_socket, local_track, sip_pt, out_tx, sid, + )); + + // Set bridge info — this unblocks the browser→SIP loop (already running). let mut bridge = session.sip_bridge.lock().await; *bridge = Some(bridge_info); true @@ -223,45 +240,6 @@ impl WebRtcEngine { } } - /// Send transcoded audio from the SIP side to the browser. - /// Called by the RTP relay when it receives a packet from the provider. - pub async fn forward_sip_to_browser( - &self, - session_id: &str, - sip_rtp_payload: &[u8], - sip_pt: u8, - ) -> Result<(), String> { - let session = self - .sessions - .get(session_id) - .ok_or_else(|| format!("session {session_id} not found"))?; - - // Transcode SIP codec → Opus. - // We create a temporary TranscodeState per packet for simplicity. - // TODO: Use a per-session persistent state for proper codec continuity. - let mut transcoder = TranscodeState::new().map_err(|e| format!("codec: {e}"))?; - let opus_payload = transcoder - .transcode(sip_rtp_payload, sip_pt, PT_OPUS, Some("to_browser")) - .map_err(|e| format!("transcode: {e}"))?; - - if opus_payload.is_empty() { - return Ok(()); - } - - // Build RTP header for Opus. - // TODO: Track seq/ts/ssrc per session for proper continuity. - let header = build_rtp_header(PT_OPUS, 0, 0, 0); - let mut packet = header.to_vec(); - packet.extend_from_slice(&opus_payload); - - session - .local_track - .write(&packet) - .await - .map(|_| ()) - .map_err(|e| format!("write: {e}")) - } - pub async fn add_ice_candidate( &self, session_id: &str, @@ -365,9 +343,9 @@ async fn browser_to_sip_loop( to_sip_seq = to_sip_seq.wrapping_add(1); to_sip_ts = to_sip_ts.wrapping_add(rtp_clock_increment(bridge_info.sip_pt)); - // Send to provider. + // Send to provider via the RTP socket (correct source port matching our SDP). let _ = bridge_info - .sip_socket + .rtp_socket .send_to(&sip_rtp, bridge_info.provider_media) .await; @@ -387,3 +365,86 @@ async fn browser_to_sip_loop( } } } + +/// SIP → Browser audio forwarding loop. +/// Reads RTP from the provider (via the allocated RTP socket), transcodes to Opus, +/// and writes to the WebRTC local track for delivery to the browser. +async fn sip_to_browser_loop( + rtp_socket: Arc, + local_track: Arc, + sip_pt: u8, + out_tx: OutTx, + session_id: String, +) { + let mut transcoder = match TranscodeState::new() { + Ok(t) => t, + Err(e) => { + emit_event( + &out_tx, + "webrtc_error", + serde_json::json!({ + "session_id": session_id, + "error": format!("sip_to_browser codec init: {e}"), + }), + ); + return; + } + }; + + let mut buf = vec![0u8; 1500]; + let mut count = 0u64; + let mut seq: u16 = 0; + let mut ts: u32 = 0; + let ssrc: u32 = rand::random(); + + loop { + match rtp_socket.recv_from(&mut buf).await { + Ok((n, _from)) => { + if n < 12 { + continue; // Too small for RTP header. + } + count += 1; + + // Extract payload (skip 12-byte RTP header). + let payload = &buf[12..n]; + if payload.is_empty() { + continue; + } + + // Transcode SIP codec → Opus. + let opus_payload = match transcoder.transcode( + payload, + sip_pt, + PT_OPUS, + Some("sip_to_browser"), + ) { + Ok(p) if !p.is_empty() => p, + _ => continue, + }; + + // Build Opus RTP packet. + let header = build_rtp_header(PT_OPUS, seq, ts, ssrc); + let mut packet = header.to_vec(); + packet.extend_from_slice(&opus_payload); + + seq = seq.wrapping_add(1); + ts = ts.wrapping_add(960); // Opus: 48000 Hz × 20ms = 960 samples + + let _ = local_track.write(&packet).await; + + if count == 1 || count == 50 || count % 500 == 0 { + emit_event( + &out_tx, + "webrtc_audio_rx", + serde_json::json!({ + "session_id": session_id, + "direction": "sip_to_browser", + "packet_count": count, + }), + ); + } + } + Err(_) => break, // Socket closed. + } + } +} diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index cf6150f..dc3db2a 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.12.0', + version: '1.13.0', description: 'undefined' } diff --git a/ts/frontend.ts b/ts/frontend.ts index ea5cf45..e4bf19b 100644 --- a/ts/frontend.ts +++ b/ts/frontend.ts @@ -339,11 +339,13 @@ export function initWebUi( onHangupCall: (callId: string) => boolean, onConfigSaved?: () => void, callManager?: CallManager, + voiceboxManager?: VoiceboxManager, /** WebRTC signaling handlers — forwarded to Rust proxy-engine. */ onWebRtcOffer?: (sessionId: string, sdp: string, ws: WebSocket) => Promise, onWebRtcIce?: (sessionId: string, candidate: any) => Promise, onWebRtcClose?: (sessionId: string) => Promise, - voiceboxManager?: VoiceboxManager, + /** Called when browser sends webrtc-accept (callId + sessionId linking). */ + onWebRtcAccept?: (callId: string, sessionId: string) => void, ): void { const WEB_PORT = 3060; @@ -382,6 +384,7 @@ export function initWebUi( if (msg.type === 'webrtc-offer' && msg.sessionId) { // Forward to Rust proxy-engine for WebRTC handling. if (onWebRtcOffer) { + log(`[webrtc-ws] offer msg keys: ${Object.keys(msg).join(',')}, sdp type: ${typeof msg.sdp}, sdp len: ${msg.sdp?.length || 0}`); onWebRtcOffer(msg.sessionId, msg.sdp, socket as any).catch((e: any) => log(`[webrtc] offer error: ${e.message}`)); } @@ -394,8 +397,10 @@ export function initWebUi( onWebRtcClose(msg.sessionId).catch(() => {}); } } else if (msg.type === 'webrtc-accept' && msg.callId) { - // TODO: Wire to Rust call linking. log(`[webrtc] accept: call=${msg.callId} session=${msg.sessionId || 'none'}`); + if (onWebRtcAccept && msg.sessionId) { + onWebRtcAccept(msg.callId, msg.sessionId); + } } else if (msg.type?.startsWith('webrtc-')) { msg._remoteIp = remoteIp; handleWebRtcSignaling(socket as any, msg); diff --git a/ts/sipproxy.ts b/ts/sipproxy.ts index 4bf7070..16bfc0b 100644 --- a/ts/sipproxy.ts +++ b/ts/sipproxy.ts @@ -37,6 +37,7 @@ import { shutdownProxyEngine, webrtcOffer, webrtcIce, + webrtcLink, webrtcClose, } from './proxybridge.ts'; import type { @@ -118,6 +119,12 @@ const activeCalls = new Map(); const callHistory: ICallHistoryEntry[] = []; const MAX_HISTORY = 100; +// WebRTC session ↔ call linking state. +// Both pieces (session accept + call media info) can arrive in any order. +const webrtcSessionToCall = new Map(); // sessionId → callId +const webrtcCallToSession = new Map(); // callId → sessionId +const pendingCallMedia = new Map(); // callId → provider media info + // Initialize provider statuses from config (all start as unregistered). for (const p of appConfig.providers) { providerStatuses.set(p.id, { @@ -271,6 +278,17 @@ async function startProxyEngine(): Promise { state: 'setting-up', startedAt: Date.now(), }); + + // Notify all browser devices — they can connect via WebRTC to listen/talk. + const browserIds = getAllBrowserDeviceIds(); + for (const bid of browserIds) { + sendToBrowserDevice(bid, { + type: 'webrtc-incoming', + callId: data.call_id, + from: data.number, + deviceId: bid, + }); + } }); onProxyEvent('call_ringing', (data: { call_id: string }) => { @@ -278,12 +296,33 @@ async function startProxyEngine(): Promise { if (call) call.state = 'ringing'; }); - onProxyEvent('call_answered', (data: { call_id: string }) => { + onProxyEvent('call_answered', (data: { call_id: string; provider_media_addr?: string; provider_media_port?: number; sip_pt?: number }) => { const call = activeCalls.get(data.call_id); if (call) { call.state = 'connected'; log(`[call] ${data.call_id} connected`); } + + // Try to link WebRTC session to this call for audio bridging. + if (data.provider_media_addr && data.provider_media_port) { + const sessionId = webrtcCallToSession.get(data.call_id); + if (sessionId) { + // Both session and media info available — link now. + const sipPt = data.sip_pt ?? 9; + log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${data.call_id} media=${data.provider_media_addr}:${data.provider_media_port} pt=${sipPt}`); + webrtcLink(sessionId, data.call_id, data.provider_media_addr, data.provider_media_port, sipPt).then((ok) => { + log(`[webrtc] link result: ${ok}`); + }); + } else { + // Session not yet accepted — store media info for when it arrives. + pendingCallMedia.set(data.call_id, { + addr: data.provider_media_addr, + port: data.provider_media_port, + sipPt: data.sip_pt ?? 9, + }); + log(`[webrtc] media info cached for call=${data.call_id}, waiting for session accept`); + } + } }); onProxyEvent('call_ended', (data: ICallEndedEvent) => { @@ -301,6 +340,18 @@ async function startProxyEngine(): Promise { }); if (callHistory.length > MAX_HISTORY) callHistory.pop(); activeCalls.delete(data.call_id); + + // Notify browser(s) that the call ended. + broadcastWs('webrtc-call-ended', { callId: data.call_id }); + + // Clean up WebRTC session mappings. + const sessionId = webrtcCallToSession.get(data.call_id); + if (sessionId) { + webrtcCallToSession.delete(data.call_id); + webrtcSessionToCall.delete(sessionId); + webrtcClose(sessionId).catch(() => {}); + } + pendingCallMedia.delete(data.call_id); } }); @@ -467,14 +518,22 @@ initWebUi( } }, undefined, // callManager — legacy, replaced by Rust proxy-engine - voiceboxManager, + voiceboxManager, // voiceboxManager // WebRTC signaling → forwarded to Rust proxy-engine. async (sessionId, sdp, ws) => { - log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)}`); + log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)} sdp_type=${typeof sdp} sdp_len=${sdp?.length || 0}`); + if (!sdp || typeof sdp !== 'string' || sdp.length < 10) { + log(`[webrtc] WARNING: invalid SDP (type=${typeof sdp}), skipping offer`); + return; + } + log(`[webrtc] sending offer to Rust (${sdp.length}b)...`); const result = await webrtcOffer(sessionId, sdp); + log(`[webrtc] Rust result: ${JSON.stringify(result)?.slice(0, 200)}`); if (result?.sdp) { ws.send(JSON.stringify({ type: 'webrtc-answer', sessionId, sdp: result.sdp })); log(`[webrtc] answer sent to browser session=${sessionId.slice(0, 8)}`); + } else { + log(`[webrtc] ERROR: no answer SDP from Rust`); } }, async (sessionId, candidate) => { @@ -483,6 +542,26 @@ initWebUi( async (sessionId) => { await webrtcClose(sessionId); }, + // onWebRtcAccept — browser has accepted a call, linking session to call. + (callId: string, sessionId: string) => { + log(`[webrtc] accept: callId=${callId} sessionId=${sessionId.slice(0, 8)}`); + + // Store bidirectional mapping. + webrtcSessionToCall.set(sessionId, callId); + webrtcCallToSession.set(callId, sessionId); + + // Check if we already have media info for this call (provider answered first). + const media = pendingCallMedia.get(callId); + if (media) { + pendingCallMedia.delete(callId); + log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${callId} media=${media.addr}:${media.port} pt=${media.sipPt}`); + webrtcLink(sessionId, callId, media.addr, media.port, media.sipPt).then((ok) => { + log(`[webrtc] link result: ${ok}`); + }); + } else { + log(`[webrtc] session ${sessionId.slice(0, 8)} accepted, waiting for call_answered media info`); + } + }, ); // --------------------------------------------------------------------------- diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index cf6150f..dc3db2a 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.12.0', + version: '1.13.0', description: 'undefined' }