feat(proxy-engine,webrtc): add B2BUA SIP leg handling and WebRTC call bridging for outbound calls

This commit is contained in:
2026-04-10 12:19:20 +00:00
parent 82f2742db5
commit 9e5aa35fee
9 changed files with 869 additions and 144 deletions

View File

@@ -1,5 +1,14 @@
# Changelog
## 2026-04-10 - 1.13.0 - feat(proxy-engine,webrtc)
add B2BUA SIP leg handling and WebRTC call bridging for outbound calls
- introduce a new SipLeg module to manage outbound provider dialogs, including INVITE lifecycle, digest auth retries, ACK handling, media endpoint tracking, and termination
- store outbound dashboard calls as B2BUA calls in the call manager and emit provider media details on call_answered for bridge setup
- separate SIP and WebRTC engine locking to avoid contention and deadlocks while linking sessions to call RTP sockets
- add bidirectional RTP bridging between provider SIP media and browser WebRTC audio using the allocated RTP socket
- wire browser webrtc-accept events in the frontend and sipproxy so session-to-call linking can occur when media and acceptance arrive in either order
## 2026-04-10 - 1.12.0 - feat(proxy-engine)
add Rust-based outbound calling, WebRTC bridging, and voicemail handling

View File

@@ -15,6 +15,7 @@ use crate::dtmf::DtmfDetector;
use crate::ipc::{emit_event, OutTx};
use crate::registrar::Registrar;
use crate::rtp::RtpPortPool;
use crate::sip_leg::{LegState, SipLeg, SipLegAction, SipLegConfig};
use sip_proto::helpers::parse_sdp_endpoint;
use sip_proto::message::SipMessage;
use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri};
@@ -24,9 +25,23 @@ use std::sync::Arc;
use std::time::Instant;
use tokio::net::UdpSocket;
/// A B2BUA call with a SipLeg for the provider side.
/// The other side is either a WebRTC session or another SipLeg.
pub struct B2buaCall {
pub id: String,
pub provider_leg: SipLeg,
pub webrtc_session_id: Option<String>,
pub number: String,
pub created_at: std::time::Instant,
/// RTP socket allocated for the provider leg (used for WebRTC audio bridging).
pub rtp_socket: Option<Arc<UdpSocket>>,
}
pub struct CallManager {
/// Active passthrough calls, keyed by SIP Call-ID.
calls: HashMap<String, PassthroughCall>,
/// Active B2BUA calls, keyed by SIP Call-ID of the provider leg.
b2bua_calls: HashMap<String, B2buaCall>,
/// Call ID counter.
next_call_num: u64,
/// Output channel for events.
@@ -37,6 +52,7 @@ impl CallManager {
pub fn new(out_tx: OutTx) -> Self {
Self {
calls: HashMap::new(),
b2bua_calls: HashMap::new(),
next_call_num: 0,
out_tx,
}
@@ -68,7 +84,12 @@ impl CallManager {
) -> bool {
let sip_call_id = msg.call_id().to_string();
// Check if this Call-ID belongs to an active call.
// Check B2BUA calls first (provider legs with dialog management).
if self.b2bua_calls.contains_key(&sip_call_id) {
return self.route_b2bua_message(&sip_call_id, msg, from_addr, socket).await;
}
// Check passthrough calls.
if !self.calls.contains_key(&sip_call_id) {
return false;
}
@@ -494,14 +515,89 @@ impl CallManager {
/// Check if a SIP Call-ID belongs to any active call.
pub fn has_call(&self, sip_call_id: &str) -> bool {
self.calls.contains_key(sip_call_id)
self.calls.contains_key(sip_call_id) || self.b2bua_calls.contains_key(sip_call_id)
}
// --- Dashboard outbound call (B2BUA) ---
/// Get the RTP socket for a B2BUA call (by our internal call ID).
/// Used by webrtc_link to set up the audio bridge.
pub fn get_b2bua_rtp_socket(&self, call_id: &str) -> Option<Arc<UdpSocket>> {
for b2bua in self.b2bua_calls.values() {
if b2bua.id == call_id {
return b2bua.rtp_socket.clone();
}
}
None
}
/// Initiate an outbound call from the dashboard.
/// Builds an INVITE from scratch and sends it to the provider.
/// The browser connects separately via WebRTC and gets linked to this call.
// --- B2BUA outbound call ---
/// Route a SIP message to a B2BUA call's provider leg.
async fn route_b2bua_message(
&mut self,
sip_call_id: &str,
msg: &SipMessage,
from_addr: SocketAddr,
socket: &UdpSocket,
) -> bool {
let b2bua = match self.b2bua_calls.get_mut(sip_call_id) {
Some(c) => c,
None => return false,
};
let call_id = b2bua.id.clone();
let action = b2bua.provider_leg.handle_message(msg);
match action {
SipLegAction::None => {}
SipLegAction::Send(buf) => {
let _ = socket.send_to(&buf, b2bua.provider_leg.config.sip_target).await;
}
SipLegAction::StateChange(LegState::Ringing) => {
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
}
SipLegAction::ConnectedWithAck(ack_buf) => {
let _ = socket.send_to(&ack_buf, b2bua.provider_leg.config.sip_target).await;
let remote = b2bua.provider_leg.remote_media;
let sip_pt = b2bua.provider_leg.config.codecs.first().copied().unwrap_or(9);
emit_event(&self.out_tx, "call_answered", serde_json::json!({
"call_id": call_id,
"provider_media_addr": remote.map(|a| a.ip().to_string()),
"provider_media_port": remote.map(|a| a.port()),
"sip_pt": sip_pt,
}));
}
SipLegAction::Terminated(reason) => {
let duration = b2bua.created_at.elapsed().as_secs();
emit_event(&self.out_tx, "call_ended", serde_json::json!({
"call_id": call_id, "reason": reason, "duration": duration,
}));
self.b2bua_calls.remove(sip_call_id);
return true;
}
SipLegAction::SendAndTerminate(buf, reason) => {
let _ = socket.send_to(&buf, from_addr).await;
let duration = b2bua.created_at.elapsed().as_secs();
emit_event(&self.out_tx, "call_ended", serde_json::json!({
"call_id": call_id, "reason": reason, "duration": duration,
}));
self.b2bua_calls.remove(sip_call_id);
return true;
}
SipLegAction::AuthRetry { ack_407, invite_with_auth } => {
let target = b2bua.provider_leg.config.sip_target;
if let Some(ack) = ack_407 {
let _ = socket.send_to(&ack, target).await;
}
let _ = socket.send_to(&invite_with_auth, target).await;
}
_ => {}
}
true
}
/// Initiate an outbound call from the dashboard using B2BUA mode.
/// Creates a SipLeg for the provider side with proper dialog + auth handling.
pub async fn make_outbound_call(
&mut self,
number: &str,
@@ -515,7 +611,6 @@ impl CallManager {
let call_id = self.next_call_id();
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let pub_ip = public_ip.unwrap_or(lan_ip.as_str());
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
Some(a) => a,
@@ -528,70 +623,38 @@ impl CallManager {
None => return None,
};
// Build the SIP Call-ID for this new dialog.
// Build the SIP Call-ID for the provider dialog.
let sip_call_id = sip_proto::helpers::generate_call_id(None);
// Build SDP offer.
let sdp = sip_proto::helpers::build_sdp(&sip_proto::helpers::SdpOptions {
ip: pub_ip,
port: rtp_alloc.port,
payload_types: &provider_config.codecs,
..Default::default()
});
// Build INVITE.
let to_uri = format!("sip:{number}@{}", provider_config.domain);
let invite = SipMessage::create_request(
"INVITE",
&to_uri,
sip_proto::message::RequestOptions {
via_host: pub_ip.to_string(),
via_port: lan_port,
via_transport: None,
via_branch: Some(sip_proto::helpers::generate_branch()),
from_uri: registered_aor.to_string(),
from_display_name: None,
from_tag: Some(sip_proto::helpers::generate_tag()),
to_uri: to_uri.clone(),
to_display_name: None,
to_tag: None,
call_id: Some(sip_call_id.clone()),
cseq: Some(1),
contact: Some(format!("<sip:{pub_ip}:{lan_port}>")),
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
("Allow".to_string(), "INVITE, ACK, OPTIONS, CANCEL, BYE, INFO".to_string()),
]),
},
);
// Send INVITE to provider.
let _ = socket.send_to(&invite.serialize(), provider_dest).await;
// Create call entry — device_addr is a dummy (WebRTC will be linked later).
let dummy_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let call = PassthroughCall {
id: call_id.clone(),
sip_call_id: sip_call_id.clone(),
state: CallState::SettingUp,
direction: CallDirection::Outbound,
created_at: Instant::now(),
caller_number: Some(registered_aor.to_string()),
callee_number: Some(number.to_string()),
provider_id: provider_config.id.clone(),
provider_addr: provider_dest,
provider_media: None,
device_addr: dummy_addr,
device_media: None,
// Create a SipLeg with provider credentials for auth handling.
let leg_config = SipLegConfig {
lan_ip: lan_ip.clone(),
lan_port,
public_ip: public_ip.map(|s| s.to_string()),
sip_target: provider_dest,
username: Some(provider_config.username.clone()),
password: Some(provider_config.password.clone()),
registered_aor: Some(registered_aor.to_string()),
codecs: provider_config.codecs.clone(),
rtp_port: rtp_alloc.port,
rtp_socket: rtp_alloc.socket.clone(),
pkt_from_device: 0,
pkt_from_provider: 0,
};
self.calls.insert(sip_call_id, call);
let mut leg = SipLeg::new(format!("{call_id}-prov"), leg_config);
// Send the INVITE.
let to_uri = format!("sip:{number}@{}", provider_config.domain);
leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
// Store as B2BUA call.
let b2bua = B2buaCall {
id: call_id.clone(),
provider_leg: leg,
webrtc_session_id: None,
number: number.to_string(),
created_at: std::time::Instant::now(),
rtp_socket: Some(rtp_alloc.socket.clone()),
};
self.b2bua_calls.insert(sip_call_id, b2bua);
Some(call_id)
}

View File

@@ -16,6 +16,7 @@ mod provider;
mod recorder;
mod registrar;
mod rtp;
mod sip_leg;
mod sip_transport;
mod voicemail;
mod webrtc_engine;
@@ -35,14 +36,15 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, Mutex};
/// Shared mutable state for the proxy engine.
/// Shared mutable state for the proxy engine (SIP side).
/// WebRTC is intentionally kept in a separate lock to avoid contention
/// between SIP packet handlers and WebRTC command handlers.
struct ProxyEngine {
config: Option<AppConfig>,
transport: Option<SipTransport>,
provider_mgr: ProviderManager,
registrar: Registrar,
call_mgr: CallManager,
webrtc: WebRtcEngine,
rtp_pool: Option<RtpPortPool>,
out_tx: OutTx,
}
@@ -55,7 +57,6 @@ impl ProxyEngine {
provider_mgr: ProviderManager::new(out_tx.clone()),
registrar: Registrar::new(out_tx.clone()),
call_mgr: CallManager::new(out_tx.clone()),
webrtc: WebRtcEngine::new(out_tx.clone()),
rtp_pool: None,
out_tx,
}
@@ -83,9 +84,12 @@ async fn main() {
// Emit ready event.
emit_event(&out_tx, "ready", serde_json::json!({}));
// Shared engine state.
// Shared engine state (SIP side).
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone())));
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
// Read commands from stdin.
let stdin = tokio::io::stdin();
let reader = BufReader::new(stdin);
@@ -105,25 +109,34 @@ async fn main() {
};
let engine = engine.clone();
let webrtc = webrtc.clone();
let out_tx = out_tx.clone();
// Handle commands — some are async, so we spawn.
tokio::spawn(async move {
handle_command(engine, &out_tx, cmd).await;
handle_command(engine, webrtc, &out_tx, cmd).await;
});
}
}
async fn handle_command(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: Command) {
async fn handle_command(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
out_tx: &OutTx,
cmd: Command,
) {
match cmd.method.as_str() {
// SIP commands — lock engine only.
"configure" => handle_configure(engine, out_tx, &cmd).await,
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
"make_call" => handle_make_call(engine, out_tx, &cmd).await,
"get_status" => handle_get_status(engine, out_tx, &cmd).await,
"webrtc_offer" => handle_webrtc_offer(engine, out_tx, &cmd).await,
"webrtc_ice" => handle_webrtc_ice(engine, out_tx, &cmd).await,
"webrtc_link" => handle_webrtc_link(engine, out_tx, &cmd).await,
"webrtc_close" => handle_webrtc_close(engine, out_tx, &cmd).await,
// WebRTC commands — lock webrtc only (no engine contention).
"webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await,
"webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await,
"webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await,
// webrtc_link needs both: engine (for RTP socket) and webrtc (for session).
"webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
}
@@ -524,7 +537,8 @@ async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Co
}
/// Handle `webrtc_offer` — browser sends SDP offer, we create PeerConnection and return answer.
async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Uses only the WebRTC lock — no contention with SIP handlers.
async fn handle_webrtc_offer(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
@@ -534,8 +548,8 @@ async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; }
};
let mut eng = engine.lock().await;
match eng.webrtc.handle_offer(&session_id, &offer_sdp).await {
let mut wrtc = webrtc.lock().await;
match wrtc.handle_offer(&session_id, &offer_sdp).await {
Ok(answer_sdp) => {
respond_ok(out_tx, &cmd.id, serde_json::json!({
"session_id": session_id,
@@ -547,7 +561,8 @@ async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
}
/// Handle `webrtc_ice` — forward ICE candidate from browser to Rust PeerConnection.
async fn handle_webrtc_ice(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Uses only the WebRTC lock.
async fn handle_webrtc_ice(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
@@ -556,15 +571,22 @@ async fn handle_webrtc_ice(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd:
let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str());
let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16);
let eng = engine.lock().await;
match eng.webrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
let wrtc = webrtc.lock().await;
match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}
/// Handle `webrtc_link` — link a WebRTC session to a SIP call for audio bridging.
async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Briefly locks engine to get the RTP socket, then locks webrtc to set up the bridge.
/// Locks are never held simultaneously — no deadlock possible.
async fn handle_webrtc_link(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
@@ -588,19 +610,29 @@ async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd
Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad address: {e}")); return; }
};
let mut eng = engine.lock().await;
let sip_socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
// Briefly lock engine to get the B2BUA call's RTP socket.
let rtp_socket = {
let eng = engine.lock().await;
eng.call_mgr.get_b2bua_rtp_socket(&call_id)
}; // engine lock released here
let rtp_socket = match rtp_socket {
Some(s) => s,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found or no RTP socket"));
return;
}
};
let bridge_info = crate::webrtc_engine::SipBridgeInfo {
provider_media,
sip_pt,
sip_socket,
rtp_socket,
};
if eng.webrtc.link_to_sip(&session_id, &call_id, bridge_info).await {
// Lock webrtc to set up the audio bridge.
let mut wrtc = webrtc.lock().await;
if wrtc.link_to_sip(&session_id, &call_id, bridge_info).await {
respond_ok(out_tx, &cmd.id, serde_json::json!({
"session_id": session_id,
"call_id": call_id,
@@ -612,14 +644,15 @@ async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd
}
/// Handle `webrtc_close` — close a WebRTC session.
async fn handle_webrtc_close(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Uses only the WebRTC lock.
async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
};
let mut eng = engine.lock().await;
match eng.webrtc.close_session(&session_id).await {
let mut wrtc = webrtc.lock().await;
match wrtc.close_session(&session_id).await {
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}

View File

@@ -0,0 +1,475 @@
//! SipLeg — manages one side of a B2BUA call.
//!
//! Handles the full INVITE lifecycle:
//! - Send INVITE with SDP
//! - Handle 407 Proxy Authentication (digest auth retry)
//! - Handle 200 OK (ACK, learn media endpoint)
//! - Handle BYE/CANCEL (teardown)
//! - Track SIP dialog state (early → confirmed → terminated)
//!
//! Ported from ts/call/sip-leg.ts.
use sip_proto::dialog::{DialogState, SipDialog};
use sip_proto::helpers::{
build_sdp, compute_digest_auth, generate_branch, generate_tag, parse_digest_challenge,
parse_sdp_endpoint, SdpOptions,
};
use sip_proto::message::{RequestOptions, SipMessage};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
/// State of a SIP leg.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegState {
Inviting,
Ringing,
Connected,
Terminating,
Terminated,
}
/// Configuration for creating a SIP leg.
pub struct SipLegConfig {
/// Proxy LAN IP (for Via, Contact, SDP).
pub lan_ip: String,
/// Proxy LAN port.
pub lan_port: u16,
/// Public IP (for provider-facing legs).
pub public_ip: Option<String>,
/// SIP target endpoint (provider outbound proxy or device address).
pub sip_target: SocketAddr,
/// Provider credentials (for 407 auth).
pub username: Option<String>,
pub password: Option<String>,
pub registered_aor: Option<String>,
/// Codec payload types to offer.
pub codecs: Vec<u8>,
/// Our RTP port for SDP.
pub rtp_port: u16,
}
/// A SIP leg with full dialog management.
pub struct SipLeg {
pub id: String,
pub state: LegState,
pub config: SipLegConfig,
pub dialog: Option<SipDialog>,
/// The INVITE we sent (needed for CANCEL and 407 ACK).
invite: Option<SipMessage>,
/// Original unauthenticated INVITE (for re-ACKing retransmitted 407s).
orig_invite: Option<SipMessage>,
/// Whether we've attempted digest auth.
auth_attempted: bool,
/// Remote media endpoint (learned from SDP in 200 OK).
pub remote_media: Option<SocketAddr>,
}
impl SipLeg {
pub fn new(id: String, config: SipLegConfig) -> Self {
Self {
id,
state: LegState::Inviting,
config,
dialog: None,
invite: None,
orig_invite: None,
auth_attempted: false,
remote_media: None,
}
}
/// Build and send an INVITE to establish this leg.
pub async fn send_invite(
&mut self,
from_uri: &str,
to_uri: &str,
sip_call_id: &str,
socket: &UdpSocket,
) {
let ip = self
.config
.public_ip
.as_deref()
.unwrap_or(&self.config.lan_ip);
let sdp = build_sdp(&SdpOptions {
ip,
port: self.config.rtp_port,
payload_types: &self.config.codecs,
..Default::default()
});
let invite = SipMessage::create_request(
"INVITE",
to_uri,
RequestOptions {
via_host: ip.to_string(),
via_port: self.config.lan_port,
via_transport: None,
via_branch: Some(generate_branch()),
from_uri: from_uri.to_string(),
from_display_name: None,
from_tag: Some(generate_tag()),
to_uri: to_uri.to_string(),
to_display_name: None,
to_tag: None,
call_id: Some(sip_call_id.to_string()),
cseq: Some(1),
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
},
);
self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port));
self.invite = Some(invite.clone());
self.state = LegState::Inviting;
let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await;
}
/// Handle an incoming SIP message routed to this leg.
/// Returns an optional reply to send (e.g. ACK, auth retry INVITE).
pub fn handle_message(&mut self, msg: &SipMessage) -> SipLegAction {
if msg.is_response() {
self.handle_response(msg)
} else {
self.handle_request(msg)
}
}
fn handle_response(&mut self, msg: &SipMessage) -> SipLegAction {
let code = msg.status_code().unwrap_or(0);
let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase();
if cseq_method != "INVITE" {
return SipLegAction::None;
}
// Handle retransmitted 407 for the original unauthenticated INVITE.
if self.auth_attempted {
if let Some(dialog) = &self.dialog {
let response_cseq: u32 = msg
.get_header("CSeq")
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if response_cseq < dialog.local_cseq && code >= 400 {
// ACK the retransmitted error response.
if let Some(orig) = &self.orig_invite {
let ack = build_non_2xx_ack(orig, msg);
return SipLegAction::Send(ack.serialize());
}
return SipLegAction::None;
}
}
}
// Handle 407 Proxy Authentication Required.
if code == 407 {
return self.handle_auth_challenge(msg);
}
// Update dialog state.
if let Some(dialog) = &mut self.dialog {
dialog.process_response(msg);
}
if code == 180 || code == 183 {
self.state = LegState::Ringing;
SipLegAction::StateChange(LegState::Ringing)
} else if code >= 200 && code < 300 {
// ACK the 200 OK.
let ack_buf = if let Some(dialog) = &self.dialog {
let ack = dialog.create_ack();
Some(ack.serialize())
} else {
None
};
// If already connected (200 retransmit), just re-ACK.
if self.state == LegState::Connected {
return match ack_buf {
Some(buf) => SipLegAction::Send(buf),
None => SipLegAction::None,
};
}
// Learn media endpoint from SDP.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
self.remote_media = Some(addr);
}
}
}
self.state = LegState::Connected;
match ack_buf {
Some(buf) => SipLegAction::ConnectedWithAck(buf),
None => SipLegAction::StateChange(LegState::Connected),
}
} else if code >= 300 {
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
SipLegAction::Terminated(format!("rejected_{code}"))
} else {
SipLegAction::None // 1xx provisional
}
}
fn handle_auth_challenge(&mut self, msg: &SipMessage) -> SipLegAction {
if self.auth_attempted {
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
return SipLegAction::Terminated("auth_rejected".to_string());
}
self.auth_attempted = true;
let challenge_header = match msg.get_header("Proxy-Authenticate") {
Some(h) => h,
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_challenge".to_string());
}
};
let challenge = match parse_digest_challenge(challenge_header) {
Some(c) => c,
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_bad_challenge".to_string());
}
};
let password = match &self.config.password {
Some(p) => p.clone(),
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_password".to_string());
}
};
let aor = match &self.config.registered_aor {
Some(a) => a.clone(),
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_aor".to_string());
}
};
let username = aor
.trim_start_matches("sip:")
.trim_start_matches("sips:")
.split('@')
.next()
.unwrap_or("")
.to_string();
let dest_uri = self
.invite
.as_ref()
.and_then(|i| i.request_uri())
.unwrap_or("")
.to_string();
let auth_value = compute_digest_auth(
&username,
&password,
&challenge.realm,
&challenge.nonce,
"INVITE",
&dest_uri,
challenge.algorithm.as_deref(),
challenge.opaque.as_deref(),
);
// ACK the 407.
let mut ack_buf = None;
if let Some(invite) = &self.invite {
let ack = build_non_2xx_ack(invite, msg);
ack_buf = Some(ack.serialize());
}
// Save original INVITE for retransmission handling.
self.orig_invite = self.invite.clone();
// Build authenticated INVITE with same From tag, CSeq=2.
let ip = self
.config
.public_ip
.as_deref()
.unwrap_or(&self.config.lan_ip);
let from_tag = self
.dialog
.as_ref()
.map(|d| d.local_tag.clone())
.unwrap_or_else(generate_tag);
let sdp = build_sdp(&SdpOptions {
ip,
port: self.config.rtp_port,
payload_types: &self.config.codecs,
..Default::default()
});
let call_id = self
.dialog
.as_ref()
.map(|d| d.call_id.clone())
.unwrap_or_default();
let invite_auth = SipMessage::create_request(
"INVITE",
&dest_uri,
RequestOptions {
via_host: ip.to_string(),
via_port: self.config.lan_port,
via_transport: None,
via_branch: Some(generate_branch()),
from_uri: aor,
from_display_name: None,
from_tag: Some(from_tag),
to_uri: dest_uri.clone(),
to_display_name: None,
to_tag: None,
call_id: Some(call_id),
cseq: Some(2),
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("Proxy-Authorization".to_string(), auth_value),
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
},
);
self.invite = Some(invite_auth.clone());
if let Some(dialog) = &mut self.dialog {
dialog.local_cseq = 2;
}
// Return both the ACK for the 407 and the new authenticated INVITE.
let invite_buf = invite_auth.serialize();
SipLegAction::AuthRetry {
ack_407: ack_buf,
invite_with_auth: invite_buf,
}
}
fn handle_request(&mut self, msg: &SipMessage) -> SipLegAction {
let method = msg.method().unwrap_or("");
if method == "BYE" {
let ok = SipMessage::create_response(200, "OK", msg, None);
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
return SipLegAction::SendAndTerminate(ok.serialize(), "bye".to_string());
}
if method == "INFO" {
let ok = SipMessage::create_response(200, "OK", msg, None);
return SipLegAction::Send(ok.serialize());
}
SipLegAction::None
}
/// Build a BYE or CANCEL to tear down this leg.
pub fn build_hangup(&mut self) -> Option<Vec<u8>> {
let dialog = self.dialog.as_mut()?;
let msg = if dialog.state == DialogState::Confirmed {
dialog.create_request("BYE", None, None, None)
} else if dialog.state == DialogState::Early {
if let Some(invite) = &self.invite {
dialog.create_cancel(invite)
} else {
return None;
}
} else {
return None;
};
self.state = LegState::Terminating;
dialog.terminate();
Some(msg.serialize())
}
/// Get the SIP Call-ID for routing.
pub fn sip_call_id(&self) -> Option<&str> {
self.dialog.as_ref().map(|d| d.call_id.as_str())
}
}
/// Actions produced by the SipLeg message handler.
pub enum SipLegAction {
/// No action needed.
None,
/// Send a SIP message (ACK, 200 OK to INFO, etc.).
Send(Vec<u8>),
/// Leg state changed.
StateChange(LegState),
/// Connected — send this ACK.
ConnectedWithAck(Vec<u8>),
/// Terminated with a reason.
Terminated(String),
/// Send 200 OK and terminate.
SendAndTerminate(Vec<u8>, String),
/// 407 auth retry — send ACK for 407, then send new INVITE with auth.
AuthRetry {
ack_407: Option<Vec<u8>>,
invite_with_auth: Vec<u8>,
},
}
/// Build an ACK for a non-2xx response (same transaction as the INVITE).
fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage {
let via = original_invite.get_header("Via").unwrap_or("").to_string();
let from = original_invite
.get_header("From")
.unwrap_or("")
.to_string();
let to = response.get_header("To").unwrap_or("").to_string();
let call_id = original_invite.call_id().to_string();
let cseq_num: u32 = original_invite
.get_header("CSeq")
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let ruri = original_invite
.request_uri()
.unwrap_or("sip:unknown")
.to_string();
SipMessage::new(
format!("ACK {ruri} SIP/2.0"),
vec![
("Via".to_string(), via),
("From".to_string(), from),
("To".to_string(), to),
("Call-ID".to_string(), call_id),
("CSeq".to_string(), format!("{cseq_num} ACK")),
("Max-Forwards".to_string(), "70".to_string()),
("Content-Length".to_string(), "0".to_string()),
],
String::new(),
)
}

View File

@@ -29,8 +29,10 @@ pub struct SipBridgeInfo {
pub provider_media: SocketAddr,
/// Provider's codec payload type (e.g. 9 for G.722).
pub sip_pt: u8,
/// The SIP UDP socket for sending RTP to the provider.
pub sip_socket: Arc<UdpSocket>,
/// The allocated RTP socket for bidirectional audio with the provider.
/// This is the socket whose port was advertised in SDP, so the provider
/// sends RTP here and expects RTP from this port.
pub rtp_socket: Arc<UdpSocket>,
}
/// A managed WebRTC session.
@@ -206,7 +208,10 @@ impl WebRtcEngine {
Ok(answer_sdp)
}
/// Link a WebRTC session to a SIP call — sets up the audio bridge.
/// Link a WebRTC session to a SIP call — sets up bidirectional audio bridge.
/// - Browser→SIP: already running via on_track handler, will start forwarding
/// once bridge info is set.
/// - SIP→Browser: spawned here, reads from the RTP socket and sends to browser.
pub async fn link_to_sip(
&mut self,
session_id: &str,
@@ -215,6 +220,18 @@ impl WebRtcEngine {
) -> bool {
if let Some(session) = self.sessions.get_mut(session_id) {
session.call_id = Some(call_id.to_string());
// Spawn SIP → browser audio loop (provider RTP → transcode → Opus → WebRTC track).
let local_track = session.local_track.clone();
let rtp_socket = bridge_info.rtp_socket.clone();
let sip_pt = bridge_info.sip_pt;
let out_tx = self.out_tx.clone();
let sid = session_id.to_string();
tokio::spawn(sip_to_browser_loop(
rtp_socket, local_track, sip_pt, out_tx, sid,
));
// Set bridge info — this unblocks the browser→SIP loop (already running).
let mut bridge = session.sip_bridge.lock().await;
*bridge = Some(bridge_info);
true
@@ -223,45 +240,6 @@ impl WebRtcEngine {
}
}
/// Send transcoded audio from the SIP side to the browser.
/// Called by the RTP relay when it receives a packet from the provider.
pub async fn forward_sip_to_browser(
&self,
session_id: &str,
sip_rtp_payload: &[u8],
sip_pt: u8,
) -> Result<(), String> {
let session = self
.sessions
.get(session_id)
.ok_or_else(|| format!("session {session_id} not found"))?;
// Transcode SIP codec → Opus.
// We create a temporary TranscodeState per packet for simplicity.
// TODO: Use a per-session persistent state for proper codec continuity.
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec: {e}"))?;
let opus_payload = transcoder
.transcode(sip_rtp_payload, sip_pt, PT_OPUS, Some("to_browser"))
.map_err(|e| format!("transcode: {e}"))?;
if opus_payload.is_empty() {
return Ok(());
}
// Build RTP header for Opus.
// TODO: Track seq/ts/ssrc per session for proper continuity.
let header = build_rtp_header(PT_OPUS, 0, 0, 0);
let mut packet = header.to_vec();
packet.extend_from_slice(&opus_payload);
session
.local_track
.write(&packet)
.await
.map(|_| ())
.map_err(|e| format!("write: {e}"))
}
pub async fn add_ice_candidate(
&self,
session_id: &str,
@@ -365,9 +343,9 @@ async fn browser_to_sip_loop(
to_sip_seq = to_sip_seq.wrapping_add(1);
to_sip_ts = to_sip_ts.wrapping_add(rtp_clock_increment(bridge_info.sip_pt));
// Send to provider.
// Send to provider via the RTP socket (correct source port matching our SDP).
let _ = bridge_info
.sip_socket
.rtp_socket
.send_to(&sip_rtp, bridge_info.provider_media)
.await;
@@ -387,3 +365,86 @@ async fn browser_to_sip_loop(
}
}
}
/// SIP → Browser audio forwarding loop.
/// Reads RTP from the provider (via the allocated RTP socket), transcodes to Opus,
/// and writes to the WebRTC local track for delivery to the browser.
async fn sip_to_browser_loop(
rtp_socket: Arc<UdpSocket>,
local_track: Arc<TrackLocalStaticRTP>,
sip_pt: u8,
out_tx: OutTx,
session_id: String,
) {
let mut transcoder = match TranscodeState::new() {
Ok(t) => t,
Err(e) => {
emit_event(
&out_tx,
"webrtc_error",
serde_json::json!({
"session_id": session_id,
"error": format!("sip_to_browser codec init: {e}"),
}),
);
return;
}
};
let mut buf = vec![0u8; 1500];
let mut count = 0u64;
let mut seq: u16 = 0;
let mut ts: u32 = 0;
let ssrc: u32 = rand::random();
loop {
match rtp_socket.recv_from(&mut buf).await {
Ok((n, _from)) => {
if n < 12 {
continue; // Too small for RTP header.
}
count += 1;
// Extract payload (skip 12-byte RTP header).
let payload = &buf[12..n];
if payload.is_empty() {
continue;
}
// Transcode SIP codec → Opus.
let opus_payload = match transcoder.transcode(
payload,
sip_pt,
PT_OPUS,
Some("sip_to_browser"),
) {
Ok(p) if !p.is_empty() => p,
_ => continue,
};
// Build Opus RTP packet.
let header = build_rtp_header(PT_OPUS, seq, ts, ssrc);
let mut packet = header.to_vec();
packet.extend_from_slice(&opus_payload);
seq = seq.wrapping_add(1);
ts = ts.wrapping_add(960); // Opus: 48000 Hz × 20ms = 960 samples
let _ = local_track.write(&packet).await;
if count == 1 || count == 50 || count % 500 == 0 {
emit_event(
&out_tx,
"webrtc_audio_rx",
serde_json::json!({
"session_id": session_id,
"direction": "sip_to_browser",
"packet_count": count,
}),
);
}
}
Err(_) => break, // Socket closed.
}
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.12.0',
version: '1.13.0',
description: 'undefined'
}

View File

@@ -339,11 +339,13 @@ export function initWebUi(
onHangupCall: (callId: string) => boolean,
onConfigSaved?: () => void,
callManager?: CallManager,
voiceboxManager?: VoiceboxManager,
/** WebRTC signaling handlers — forwarded to Rust proxy-engine. */
onWebRtcOffer?: (sessionId: string, sdp: string, ws: WebSocket) => Promise<void>,
onWebRtcIce?: (sessionId: string, candidate: any) => Promise<void>,
onWebRtcClose?: (sessionId: string) => Promise<void>,
voiceboxManager?: VoiceboxManager,
/** Called when browser sends webrtc-accept (callId + sessionId linking). */
onWebRtcAccept?: (callId: string, sessionId: string) => void,
): void {
const WEB_PORT = 3060;
@@ -382,6 +384,7 @@ export function initWebUi(
if (msg.type === 'webrtc-offer' && msg.sessionId) {
// Forward to Rust proxy-engine for WebRTC handling.
if (onWebRtcOffer) {
log(`[webrtc-ws] offer msg keys: ${Object.keys(msg).join(',')}, sdp type: ${typeof msg.sdp}, sdp len: ${msg.sdp?.length || 0}`);
onWebRtcOffer(msg.sessionId, msg.sdp, socket as any).catch((e: any) =>
log(`[webrtc] offer error: ${e.message}`));
}
@@ -394,8 +397,10 @@ export function initWebUi(
onWebRtcClose(msg.sessionId).catch(() => {});
}
} else if (msg.type === 'webrtc-accept' && msg.callId) {
// TODO: Wire to Rust call linking.
log(`[webrtc] accept: call=${msg.callId} session=${msg.sessionId || 'none'}`);
if (onWebRtcAccept && msg.sessionId) {
onWebRtcAccept(msg.callId, msg.sessionId);
}
} else if (msg.type?.startsWith('webrtc-')) {
msg._remoteIp = remoteIp;
handleWebRtcSignaling(socket as any, msg);

View File

@@ -37,6 +37,7 @@ import {
shutdownProxyEngine,
webrtcOffer,
webrtcIce,
webrtcLink,
webrtcClose,
} from './proxybridge.ts';
import type {
@@ -118,6 +119,12 @@ const activeCalls = new Map<string, IActiveCall>();
const callHistory: ICallHistoryEntry[] = [];
const MAX_HISTORY = 100;
// WebRTC session ↔ call linking state.
// Both pieces (session accept + call media info) can arrive in any order.
const webrtcSessionToCall = new Map<string, string>(); // sessionId → callId
const webrtcCallToSession = new Map<string, string>(); // callId → sessionId
const pendingCallMedia = new Map<string, { addr: string; port: number; sipPt: number }>(); // callId → provider media info
// Initialize provider statuses from config (all start as unregistered).
for (const p of appConfig.providers) {
providerStatuses.set(p.id, {
@@ -271,6 +278,17 @@ async function startProxyEngine(): Promise<void> {
state: 'setting-up',
startedAt: Date.now(),
});
// Notify all browser devices — they can connect via WebRTC to listen/talk.
const browserIds = getAllBrowserDeviceIds();
for (const bid of browserIds) {
sendToBrowserDevice(bid, {
type: 'webrtc-incoming',
callId: data.call_id,
from: data.number,
deviceId: bid,
});
}
});
onProxyEvent('call_ringing', (data: { call_id: string }) => {
@@ -278,12 +296,33 @@ async function startProxyEngine(): Promise<void> {
if (call) call.state = 'ringing';
});
onProxyEvent('call_answered', (data: { call_id: string }) => {
onProxyEvent('call_answered', (data: { call_id: string; provider_media_addr?: string; provider_media_port?: number; sip_pt?: number }) => {
const call = activeCalls.get(data.call_id);
if (call) {
call.state = 'connected';
log(`[call] ${data.call_id} connected`);
}
// Try to link WebRTC session to this call for audio bridging.
if (data.provider_media_addr && data.provider_media_port) {
const sessionId = webrtcCallToSession.get(data.call_id);
if (sessionId) {
// Both session and media info available — link now.
const sipPt = data.sip_pt ?? 9;
log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${data.call_id} media=${data.provider_media_addr}:${data.provider_media_port} pt=${sipPt}`);
webrtcLink(sessionId, data.call_id, data.provider_media_addr, data.provider_media_port, sipPt).then((ok) => {
log(`[webrtc] link result: ${ok}`);
});
} else {
// Session not yet accepted — store media info for when it arrives.
pendingCallMedia.set(data.call_id, {
addr: data.provider_media_addr,
port: data.provider_media_port,
sipPt: data.sip_pt ?? 9,
});
log(`[webrtc] media info cached for call=${data.call_id}, waiting for session accept`);
}
}
});
onProxyEvent('call_ended', (data: ICallEndedEvent) => {
@@ -301,6 +340,18 @@ async function startProxyEngine(): Promise<void> {
});
if (callHistory.length > MAX_HISTORY) callHistory.pop();
activeCalls.delete(data.call_id);
// Notify browser(s) that the call ended.
broadcastWs('webrtc-call-ended', { callId: data.call_id });
// Clean up WebRTC session mappings.
const sessionId = webrtcCallToSession.get(data.call_id);
if (sessionId) {
webrtcCallToSession.delete(data.call_id);
webrtcSessionToCall.delete(sessionId);
webrtcClose(sessionId).catch(() => {});
}
pendingCallMedia.delete(data.call_id);
}
});
@@ -467,14 +518,22 @@ initWebUi(
}
},
undefined, // callManager — legacy, replaced by Rust proxy-engine
voiceboxManager,
voiceboxManager, // voiceboxManager
// WebRTC signaling → forwarded to Rust proxy-engine.
async (sessionId, sdp, ws) => {
log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)}`);
log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)} sdp_type=${typeof sdp} sdp_len=${sdp?.length || 0}`);
if (!sdp || typeof sdp !== 'string' || sdp.length < 10) {
log(`[webrtc] WARNING: invalid SDP (type=${typeof sdp}), skipping offer`);
return;
}
log(`[webrtc] sending offer to Rust (${sdp.length}b)...`);
const result = await webrtcOffer(sessionId, sdp);
log(`[webrtc] Rust result: ${JSON.stringify(result)?.slice(0, 200)}`);
if (result?.sdp) {
ws.send(JSON.stringify({ type: 'webrtc-answer', sessionId, sdp: result.sdp }));
log(`[webrtc] answer sent to browser session=${sessionId.slice(0, 8)}`);
} else {
log(`[webrtc] ERROR: no answer SDP from Rust`);
}
},
async (sessionId, candidate) => {
@@ -483,6 +542,26 @@ initWebUi(
async (sessionId) => {
await webrtcClose(sessionId);
},
// onWebRtcAccept — browser has accepted a call, linking session to call.
(callId: string, sessionId: string) => {
log(`[webrtc] accept: callId=${callId} sessionId=${sessionId.slice(0, 8)}`);
// Store bidirectional mapping.
webrtcSessionToCall.set(sessionId, callId);
webrtcCallToSession.set(callId, sessionId);
// Check if we already have media info for this call (provider answered first).
const media = pendingCallMedia.get(callId);
if (media) {
pendingCallMedia.delete(callId);
log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${callId} media=${media.addr}:${media.port} pt=${media.sipPt}`);
webrtcLink(sessionId, callId, media.addr, media.port, media.sipPt).then((ok) => {
log(`[webrtc] link result: ${ok}`);
});
} else {
log(`[webrtc] session ${sessionId.slice(0, 8)} accepted, waiting for call_answered media info`);
}
},
);
// ---------------------------------------------------------------------------

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.12.0',
version: '1.13.0',
description: 'undefined'
}