4 Commits

15 changed files with 2172 additions and 684 deletions

View File

@@ -1,5 +1,22 @@
# Changelog
## 2026-04-10 - 1.14.0 - feat(proxy-engine)
add multiparty call mixing with dynamic SIP and WebRTC leg management
- replace passthrough call handling with a mixer-backed call model that tracks multiple legs and exposes leg status in call state output
- add mixer and leg I/O infrastructure to bridge SIP RTP and WebRTC audio through channel-based mix-minus processing
- introduce add_leg and remove_leg proxy commands and wire frontend bridge APIs to manage external call legs
- emit leg lifecycle events for observability and mark unimplemented device-leg and transfer HTTP endpoints with 501 responses
## 2026-04-10 - 1.13.0 - feat(proxy-engine,webrtc)
add B2BUA SIP leg handling and WebRTC call bridging for outbound calls
- introduce a new SipLeg module to manage outbound provider dialogs, including INVITE lifecycle, digest auth retries, ACK handling, media endpoint tracking, and termination
- store outbound dashboard calls as B2BUA calls in the call manager and emit provider media details on call_answered for bridge setup
- separate SIP and WebRTC engine locking to avoid contention and deadlocks while linking sessions to call RTP sockets
- add bidirectional RTP bridging between provider SIP media and browser WebRTC audio using the allocated RTP socket
- wire browser webrtc-accept events in the frontend and sipproxy so session-to-call linking can occur when media and acceptance arrive in either order
## 2026-04-10 - 1.12.0 - feat(proxy-engine)
add Rust-based outbound calling, WebRTC bridging, and voicemail handling

Binary file not shown.

View File

@@ -1,6 +1,6 @@
{
"name": "siprouter",
"version": "1.12.0",
"version": "1.14.0",
"private": true,
"type": "module",
"scripts": {

View File

@@ -1,12 +1,19 @@
//! Call hub — owns legs and bridges media.
//! Call hub — owns N legs and a mixer task.
//!
//! Each Call has a unique ID and tracks its state, direction, and associated
//! SIP Call-IDs for message routing.
//! Every call has a central mixer that provides mix-minus audio to all
//! participants. Legs can be added and removed dynamically mid-call.
use crate::mixer::{MixerCommand, RtpPacket};
use crate::sip_leg::SipLeg;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
pub type LegId = String;
/// Call state machine.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -15,8 +22,6 @@ pub enum CallState {
Ringing,
Connected,
Voicemail,
Ivr,
Terminating,
Terminated,
}
@@ -27,8 +32,6 @@ impl CallState {
Self::Ringing => "ringing",
Self::Connected => "connected",
Self::Voicemail => "voicemail",
Self::Ivr => "ivr",
Self::Terminating => "terminating",
Self::Terminated => "terminated",
}
}
@@ -49,43 +52,172 @@ impl CallDirection {
}
}
/// A passthrough call — both sides share the same SIP Call-ID.
/// The proxy rewrites SDP/Contact/Request-URI and relays RTP.
pub struct PassthroughCall {
/// The type of a call leg.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegKind {
SipProvider,
SipDevice,
WebRtc,
Media, // voicemail playback, IVR, recording
}
impl LegKind {
pub fn as_str(&self) -> &'static str {
match self {
Self::SipProvider => "sip-provider",
Self::SipDevice => "sip-device",
Self::WebRtc => "webrtc",
Self::Media => "media",
}
}
}
/// Per-leg state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegState {
Inviting,
Ringing,
Connected,
Terminated,
}
impl LegState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Inviting => "inviting",
Self::Ringing => "ringing",
Self::Connected => "connected",
Self::Terminated => "terminated",
}
}
}
/// Information about a single leg in a call.
pub struct LegInfo {
pub id: LegId,
pub kind: LegKind,
pub state: LegState,
pub codec_pt: u8,
/// For SIP legs: the SIP dialog manager (handles 407 auth, BYE, etc).
pub sip_leg: Option<SipLeg>,
/// For SIP legs: the SIP Call-ID for message routing.
pub sip_call_id: Option<String>,
/// For WebRTC legs: the session ID in WebRtcEngine.
pub webrtc_session_id: Option<String>,
/// The RTP socket allocated for this leg.
pub rtp_socket: Option<Arc<UdpSocket>>,
/// The RTP port number.
pub rtp_port: u16,
/// The remote media endpoint (learned from SDP or address learning).
pub remote_media: Option<SocketAddr>,
/// SIP signaling address (provider or device).
pub signaling_addr: Option<SocketAddr>,
}
/// A multiparty call with N legs and a central mixer.
pub struct Call {
pub id: String,
pub sip_call_id: String,
pub state: CallState,
pub direction: CallDirection,
pub created_at: Instant,
// Call metadata.
// Metadata.
pub caller_number: Option<String>,
pub callee_number: Option<String>,
pub provider_id: String,
// Provider side.
pub provider_addr: SocketAddr,
pub provider_media: Option<SocketAddr>,
/// All legs in this call, keyed by leg ID.
pub legs: HashMap<LegId, LegInfo>,
// Device side.
pub device_addr: SocketAddr,
pub device_media: Option<SocketAddr>,
/// Channel to send commands to the mixer task.
pub mixer_cmd_tx: mpsc::Sender<MixerCommand>,
// RTP relay.
pub rtp_port: u16,
pub rtp_socket: Arc<UdpSocket>,
// Packet counters.
pub pkt_from_device: u64,
pub pkt_from_provider: u64,
/// Handle to the mixer task (aborted on call teardown).
mixer_task: Option<JoinHandle<()>>,
}
impl PassthroughCall {
impl Call {
pub fn new(
id: String,
direction: CallDirection,
provider_id: String,
mixer_cmd_tx: mpsc::Sender<MixerCommand>,
mixer_task: JoinHandle<()>,
) -> Self {
Self {
id,
state: CallState::SettingUp,
direction,
created_at: Instant::now(),
caller_number: None,
callee_number: None,
provider_id,
legs: HashMap::new(),
mixer_cmd_tx,
mixer_task: Some(mixer_task),
}
}
/// Add a leg to the mixer. Sends the AddLeg command with channel endpoints.
pub async fn add_leg_to_mixer(
&self,
leg_id: &str,
codec_pt: u8,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
) {
let _ = self
.mixer_cmd_tx
.send(MixerCommand::AddLeg {
leg_id: leg_id.to_string(),
codec_pt,
inbound_rx,
outbound_tx,
})
.await;
}
/// Remove a leg from the mixer.
pub async fn remove_leg_from_mixer(&self, leg_id: &str) {
let _ = self
.mixer_cmd_tx
.send(MixerCommand::RemoveLeg {
leg_id: leg_id.to_string(),
})
.await;
}
pub fn duration_secs(&self) -> u64 {
self.created_at.elapsed().as_secs()
}
/// Shut down the mixer and abort its task.
pub async fn shutdown_mixer(&mut self) {
let _ = self.mixer_cmd_tx.send(MixerCommand::Shutdown).await;
if let Some(handle) = self.mixer_task.take() {
handle.abort();
}
}
/// Produce a JSON status snapshot for the dashboard.
pub fn to_status_json(&self) -> serde_json::Value {
let legs: Vec<serde_json::Value> = self
.legs
.values()
.filter(|l| l.state != LegState::Terminated)
.map(|l| {
serde_json::json!({
"id": l.id,
"type": l.kind.as_str(),
"state": l.state.as_str(),
"codec": sip_proto::helpers::codec_name(l.codec_pt),
"rtpPort": l.rtp_port,
"remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
})
})
.collect();
serde_json::json!({
"id": self.id,
"state": self.state.as_str(),
@@ -93,11 +225,8 @@ impl PassthroughCall {
"callerNumber": self.caller_number,
"calleeNumber": self.callee_number,
"providerUsed": self.provider_id,
"createdAt": self.created_at.elapsed().as_millis(),
"duration": self.duration_secs(),
"rtpPort": self.rtp_port,
"pktFromDevice": self.pkt_from_device,
"pktFromProvider": self.pkt_from_provider,
"legs": legs,
})
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,80 @@
//! Leg I/O task spawners.
//!
//! Each SIP leg gets two tasks:
//! - Inbound: recv_from on RTP socket → strip header → send RtpPacket to mixer channel
//! - Outbound: recv encoded RTP from mixer channel → send_to remote media endpoint
//!
//! WebRTC leg I/O is handled inside webrtc_engine.rs (on_track + track.write).
use crate::mixer::RtpPacket;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
/// Channel pair for connecting a leg to the mixer.
pub struct LegChannels {
/// Mixer receives decoded packets from this leg.
pub inbound_tx: mpsc::Sender<RtpPacket>,
pub inbound_rx: mpsc::Receiver<RtpPacket>,
/// Mixer sends encoded RTP to this leg.
pub outbound_tx: mpsc::Sender<Vec<u8>>,
pub outbound_rx: mpsc::Receiver<Vec<u8>>,
}
/// Create a channel pair for a leg.
pub fn create_leg_channels() -> LegChannels {
let (inbound_tx, inbound_rx) = mpsc::channel::<RtpPacket>(64);
let (outbound_tx, outbound_rx) = mpsc::channel::<Vec<u8>>(8);
LegChannels {
inbound_tx,
inbound_rx,
outbound_tx,
outbound_rx,
}
}
/// Spawn the inbound I/O task for a SIP leg.
/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer.
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
pub fn spawn_sip_inbound(
rtp_socket: Arc<UdpSocket>,
inbound_tx: mpsc::Sender<RtpPacket>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut buf = vec![0u8; 1500];
loop {
match rtp_socket.recv_from(&mut buf).await {
Ok((n, _from)) => {
if n < 12 {
continue; // Too small for RTP header.
}
let pt = buf[1] & 0x7F;
let payload = buf[12..n].to_vec();
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt }).await.is_err() {
break; // Channel closed — leg removed.
}
}
Err(_) => break, // Socket error.
}
}
})
}
/// Spawn the outbound I/O task for a SIP leg.
/// Reads encoded RTP packets from the mixer and sends them to the remote media endpoint.
/// Returns the JoinHandle (exits when the outbound_rx channel is closed).
pub fn spawn_sip_outbound(
rtp_socket: Arc<UdpSocket>,
remote_media: SocketAddr,
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(rtp_data) = outbound_rx.recv().await {
let _ = rtp_socket.send_to(&rtp_data, remote_media).await;
}
})
}

View File

@@ -12,10 +12,13 @@ mod call_manager;
mod config;
mod dtmf;
mod ipc;
mod leg_io;
mod mixer;
mod provider;
mod recorder;
mod registrar;
mod rtp;
mod sip_leg;
mod sip_transport;
mod voicemail;
mod webrtc_engine;
@@ -35,14 +38,15 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, Mutex};
/// Shared mutable state for the proxy engine.
/// Shared mutable state for the proxy engine (SIP side).
/// WebRTC is intentionally kept in a separate lock to avoid contention
/// between SIP packet handlers and WebRTC command handlers.
struct ProxyEngine {
config: Option<AppConfig>,
transport: Option<SipTransport>,
provider_mgr: ProviderManager,
registrar: Registrar,
call_mgr: CallManager,
webrtc: WebRtcEngine,
rtp_pool: Option<RtpPortPool>,
out_tx: OutTx,
}
@@ -55,7 +59,6 @@ impl ProxyEngine {
provider_mgr: ProviderManager::new(out_tx.clone()),
registrar: Registrar::new(out_tx.clone()),
call_mgr: CallManager::new(out_tx.clone()),
webrtc: WebRtcEngine::new(out_tx.clone()),
rtp_pool: None,
out_tx,
}
@@ -83,9 +86,12 @@ async fn main() {
// Emit ready event.
emit_event(&out_tx, "ready", serde_json::json!({}));
// Shared engine state.
// Shared engine state (SIP side).
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone())));
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
// Read commands from stdin.
let stdin = tokio::io::stdin();
let reader = BufReader::new(stdin);
@@ -105,25 +111,36 @@ async fn main() {
};
let engine = engine.clone();
let webrtc = webrtc.clone();
let out_tx = out_tx.clone();
// Handle commands — some are async, so we spawn.
tokio::spawn(async move {
handle_command(engine, &out_tx, cmd).await;
handle_command(engine, webrtc, &out_tx, cmd).await;
});
}
}
async fn handle_command(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: Command) {
async fn handle_command(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
out_tx: &OutTx,
cmd: Command,
) {
match cmd.method.as_str() {
// SIP commands — lock engine only.
"configure" => handle_configure(engine, out_tx, &cmd).await,
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
"make_call" => handle_make_call(engine, out_tx, &cmd).await,
"get_status" => handle_get_status(engine, out_tx, &cmd).await,
"webrtc_offer" => handle_webrtc_offer(engine, out_tx, &cmd).await,
"webrtc_ice" => handle_webrtc_ice(engine, out_tx, &cmd).await,
"webrtc_link" => handle_webrtc_link(engine, out_tx, &cmd).await,
"webrtc_close" => handle_webrtc_close(engine, out_tx, &cmd).await,
"add_leg" => handle_add_leg(engine, out_tx, &cmd).await,
"remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await,
// WebRTC commands — lock webrtc only (no engine contention).
"webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await,
"webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await,
"webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await,
// webrtc_link needs both: engine (for mixer channels) and webrtc (for session).
"webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
}
@@ -246,14 +263,11 @@ async fn handle_sip_packet(
}
// 3. Route to existing call by SIP Call-ID.
// Check if this Call-ID belongs to an active call (avoids borrow conflict).
if eng.call_mgr.has_call(msg.call_id()) {
let config_ref = eng.config.as_ref().unwrap().clone();
// Temporarily take registrar to avoid overlapping borrows.
let registrar_dummy = Registrar::new(eng.out_tx.clone());
if eng
.call_mgr
.route_sip_message(&msg, from_addr, socket, &config_ref, &registrar_dummy)
.route_sip_message(&msg, from_addr, socket, &config_ref)
.await
{
return;
@@ -524,7 +538,8 @@ async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Co
}
/// Handle `webrtc_offer` — browser sends SDP offer, we create PeerConnection and return answer.
async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Uses only the WebRTC lock — no contention with SIP handlers.
async fn handle_webrtc_offer(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
@@ -534,8 +549,8 @@ async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; }
};
let mut eng = engine.lock().await;
match eng.webrtc.handle_offer(&session_id, &offer_sdp).await {
let mut wrtc = webrtc.lock().await;
match wrtc.handle_offer(&session_id, &offer_sdp).await {
Ok(answer_sdp) => {
respond_ok(out_tx, &cmd.id, serde_json::json!({
"session_id": session_id,
@@ -547,7 +562,8 @@ async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
}
/// Handle `webrtc_ice` — forward ICE candidate from browser to Rust PeerConnection.
async fn handle_webrtc_ice(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Uses only the WebRTC lock.
async fn handle_webrtc_ice(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
@@ -556,15 +572,22 @@ async fn handle_webrtc_ice(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd:
let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str());
let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16);
let eng = engine.lock().await;
match eng.webrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
let wrtc = webrtc.lock().await;
match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}
/// Handle `webrtc_link` — link a WebRTC session to a SIP call for audio bridging.
async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Handle `webrtc_link` — link a WebRTC session to a call's mixer for audio bridging.
/// Creates channels, adds WebRTC leg to the call, wires the WebRTC engine.
/// Locks are never held simultaneously — no deadlock possible.
async fn handle_webrtc_link(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
@@ -573,34 +596,67 @@ async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let provider_addr = match cmd.params.get("provider_media_addr").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing provider_media_addr"); return; }
};
let provider_port = match cmd.params.get("provider_media_port").and_then(|v| v.as_u64()) {
Some(p) => p as u16,
None => { respond_err(out_tx, &cmd.id, "missing provider_media_port"); return; }
};
let sip_pt = cmd.params.get("sip_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8;
let provider_media: SocketAddr = match format!("{provider_addr}:{provider_port}").parse() {
Ok(a) => a,
Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad address: {e}")); return; }
};
// Create channels for the WebRTC leg.
let channels = crate::leg_io::create_leg_channels();
let mut eng = engine.lock().await;
let sip_socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
// Briefly lock engine to add the WebRTC leg to the call's mixer.
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
// Add to mixer via channel.
call.add_leg_to_mixer(
&session_id,
codec_lib::PT_OPUS,
channels.inbound_rx,
channels.outbound_tx,
)
.await;
} // engine lock released
let bridge_info = crate::webrtc_engine::SipBridgeInfo {
provider_media,
sip_pt,
sip_socket,
};
// Lock webrtc to wire the channels.
let mut wrtc = webrtc.lock().await;
if wrtc
.link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx)
.await
{
// Also store the WebRTC leg info in the call.
drop(wrtc); // Release webrtc lock before re-acquiring engine.
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
call.legs.insert(
session_id.clone(),
crate::call::LegInfo {
id: session_id.clone(),
kind: crate::call::LegKind::WebRtc,
state: crate::call::LegState::Connected,
codec_pt: codec_lib::PT_OPUS,
sip_leg: None,
sip_call_id: None,
webrtc_session_id: Some(session_id.clone()),
rtp_socket: None,
rtp_port: 0,
remote_media: None,
signaling_addr: None,
},
);
}
}
emit_event(out_tx, "leg_added", serde_json::json!({
"call_id": call_id,
"leg_id": session_id,
"kind": "webrtc",
"state": "connected",
}));
if eng.webrtc.link_to_sip(&session_id, &call_id, bridge_info).await {
respond_ok(out_tx, &cmd.id, serde_json::json!({
"session_id": session_id,
"call_id": call_id,
@@ -611,15 +667,108 @@ async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd
}
}
/// Handle `add_leg` — add a new SIP leg to an existing call.
async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
};
// Resolve provider.
let provider_config = if let Some(pid) = provider_id {
config_ref.providers.iter().find(|p| p.id == pid).cloned()
} else {
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
};
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
};
// Get registered AOR.
let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let ps = ps_arc.lock().await;
ps.registered_aor.clone()
} else {
format!("sip:{}@{}", provider_config.username, provider_config.domain)
};
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let ps = ps_arc.lock().await;
ps.public_ip.clone()
} else {
None
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let leg_id = call_mgr.add_external_leg(
&call_id, &number, &provider_config, &config_ref,
rtp_pool, &socket, public_ip.as_deref(), &registered_aor,
).await;
match leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
None => respond_err(out_tx, &cmd.id, "failed to add leg"),
}
}
/// Handle `remove_leg` — remove a leg from a call.
async fn handle_remove_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let mut eng = engine.lock().await;
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await {
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
} else {
respond_err(out_tx, &cmd.id, &format!("call/leg not found"));
}
}
/// Handle `webrtc_close` — close a WebRTC session.
async fn handle_webrtc_close(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
/// Uses only the WebRTC lock.
async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
};
let mut eng = engine.lock().await;
match eng.webrtc.close_session(&session_id).await {
let mut wrtc = webrtc.lock().await;
match wrtc.close_session(&session_id).await {
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}

View File

@@ -0,0 +1,232 @@
//! Audio mixer — mix-minus engine for multiparty calls.
//!
//! Each Call spawns one mixer task. Legs communicate with the mixer via
//! tokio mpsc channels — no shared mutable state, no lock contention.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
//! 2. Compute total mix (sum of all legs' PCM as i32)
//! 3. For each leg: mix-minus = total - own, resample to leg codec rate, encode, send
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, TranscodeState};
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
const MIX_RATE: u32 = 16000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
pub payload: Vec<u8>,
pub payload_type: u8,
}
/// Commands sent to the mixer task via a control channel.
pub enum MixerCommand {
/// Add a new leg to the mix.
AddLeg {
leg_id: String,
codec_pt: u8,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
},
/// Remove a leg from the mix (channels are dropped, I/O tasks exit).
RemoveLeg { leg_id: String },
/// Shut down the mixer.
Shutdown,
}
/// Internal per-leg state inside the mixer.
struct MixerLegSlot {
codec_pt: u8,
transcoder: TranscodeState,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
last_pcm_frame: Vec<i16>,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
// RTP output state.
rtp_seq: u16,
rtp_ts: u32,
rtp_ssrc: u32,
}
/// Spawn the mixer task for a call. Returns the command sender and task handle.
pub fn spawn_mixer(
call_id: String,
out_tx: OutTx,
) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
let handle = tokio::spawn(async move {
mixer_loop(call_id, cmd_rx, out_tx).await;
});
(cmd_tx, handle)
}
/// The 20ms mixing loop.
async fn mixer_loop(
call_id: String,
mut cmd_rx: mpsc::Receiver<MixerCommand>,
out_tx: OutTx,
) {
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
let mut interval = time::interval(Duration::from_millis(20));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
// 1. Process control commands (non-blocking).
loop {
match cmd_rx.try_recv() {
Ok(MixerCommand::AddLeg {
leg_id,
codec_pt,
inbound_rx,
outbound_tx,
}) => {
let transcoder = match TranscodeState::new() {
Ok(t) => t,
Err(e) => {
emit_event(
&out_tx,
"mixer_error",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"error": format!("codec init: {e}"),
}),
);
continue;
}
};
legs.insert(
leg_id,
MixerLegSlot {
codec_pt,
transcoder,
inbound_rx,
outbound_tx,
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
rtp_ssrc: rand::random(),
},
);
}
Ok(MixerCommand::RemoveLeg { leg_id }) => {
legs.remove(&leg_id);
// Channels drop → I/O tasks exit cleanly.
}
Ok(MixerCommand::Shutdown) => return,
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => return,
}
}
if legs.is_empty() {
continue;
}
// 2. Drain inbound packets, decode to 16kHz PCM.
let leg_ids: Vec<String> = legs.keys().cloned().collect();
for lid in &leg_ids {
let slot = legs.get_mut(lid).unwrap();
// Drain channel, keep only the latest packet (simple jitter handling).
let mut latest: Option<RtpPacket> = None;
loop {
match slot.inbound_rx.try_recv() {
Ok(pkt) => latest = Some(pkt),
Err(_) => break,
}
}
if let Some(pkt) = latest {
slot.silent_ticks = 0;
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to mixing rate if needed.
let pcm_mix = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = pcm_mix;
frame.resize(MIX_FRAME_SIZE, 0);
slot.last_pcm_frame = frame;
}
Err(_) => {
// Decode failed — use silence.
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
}
}
} else {
slot.silent_ticks += 1;
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
}
}
}
// 3. Compute total mix (sum of all legs as i32 to avoid overflow).
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
for slot in legs.values() {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as i32;
}
}
// 4. For each leg: mix-minus, resample, encode, send.
for slot in legs.values_mut() {
// Mix-minus: total minus this leg's own contribution.
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as i32).clamp(-32768, 32767) as i16;
mix_minus.push(sample);
}
// Resample from 16kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Encode to the leg's codec.
let encoded = match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
// Build RTP packet with header.
let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc);
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
// Non-blocking send — drop frame if channel is full.
let _ = slot.outbound_tx.try_send(rtp);
}
}
}

View File

@@ -0,0 +1,475 @@
//! SipLeg — manages one side of a B2BUA call.
//!
//! Handles the full INVITE lifecycle:
//! - Send INVITE with SDP
//! - Handle 407 Proxy Authentication (digest auth retry)
//! - Handle 200 OK (ACK, learn media endpoint)
//! - Handle BYE/CANCEL (teardown)
//! - Track SIP dialog state (early → confirmed → terminated)
//!
//! Ported from ts/call/sip-leg.ts.
use sip_proto::dialog::{DialogState, SipDialog};
use sip_proto::helpers::{
build_sdp, compute_digest_auth, generate_branch, generate_tag, parse_digest_challenge,
parse_sdp_endpoint, SdpOptions,
};
use sip_proto::message::{RequestOptions, SipMessage};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
/// State of a SIP leg.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegState {
Inviting,
Ringing,
Connected,
Terminating,
Terminated,
}
/// Configuration for creating a SIP leg.
pub struct SipLegConfig {
/// Proxy LAN IP (for Via, Contact, SDP).
pub lan_ip: String,
/// Proxy LAN port.
pub lan_port: u16,
/// Public IP (for provider-facing legs).
pub public_ip: Option<String>,
/// SIP target endpoint (provider outbound proxy or device address).
pub sip_target: SocketAddr,
/// Provider credentials (for 407 auth).
pub username: Option<String>,
pub password: Option<String>,
pub registered_aor: Option<String>,
/// Codec payload types to offer.
pub codecs: Vec<u8>,
/// Our RTP port for SDP.
pub rtp_port: u16,
}
/// A SIP leg with full dialog management.
pub struct SipLeg {
pub id: String,
pub state: LegState,
pub config: SipLegConfig,
pub dialog: Option<SipDialog>,
/// The INVITE we sent (needed for CANCEL and 407 ACK).
invite: Option<SipMessage>,
/// Original unauthenticated INVITE (for re-ACKing retransmitted 407s).
orig_invite: Option<SipMessage>,
/// Whether we've attempted digest auth.
auth_attempted: bool,
/// Remote media endpoint (learned from SDP in 200 OK).
pub remote_media: Option<SocketAddr>,
}
impl SipLeg {
pub fn new(id: String, config: SipLegConfig) -> Self {
Self {
id,
state: LegState::Inviting,
config,
dialog: None,
invite: None,
orig_invite: None,
auth_attempted: false,
remote_media: None,
}
}
/// Build and send an INVITE to establish this leg.
pub async fn send_invite(
&mut self,
from_uri: &str,
to_uri: &str,
sip_call_id: &str,
socket: &UdpSocket,
) {
let ip = self
.config
.public_ip
.as_deref()
.unwrap_or(&self.config.lan_ip);
let sdp = build_sdp(&SdpOptions {
ip,
port: self.config.rtp_port,
payload_types: &self.config.codecs,
..Default::default()
});
let invite = SipMessage::create_request(
"INVITE",
to_uri,
RequestOptions {
via_host: ip.to_string(),
via_port: self.config.lan_port,
via_transport: None,
via_branch: Some(generate_branch()),
from_uri: from_uri.to_string(),
from_display_name: None,
from_tag: Some(generate_tag()),
to_uri: to_uri.to_string(),
to_display_name: None,
to_tag: None,
call_id: Some(sip_call_id.to_string()),
cseq: Some(1),
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
},
);
self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port));
self.invite = Some(invite.clone());
self.state = LegState::Inviting;
let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await;
}
/// Handle an incoming SIP message routed to this leg.
/// Returns an optional reply to send (e.g. ACK, auth retry INVITE).
pub fn handle_message(&mut self, msg: &SipMessage) -> SipLegAction {
if msg.is_response() {
self.handle_response(msg)
} else {
self.handle_request(msg)
}
}
fn handle_response(&mut self, msg: &SipMessage) -> SipLegAction {
let code = msg.status_code().unwrap_or(0);
let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase();
if cseq_method != "INVITE" {
return SipLegAction::None;
}
// Handle retransmitted 407 for the original unauthenticated INVITE.
if self.auth_attempted {
if let Some(dialog) = &self.dialog {
let response_cseq: u32 = msg
.get_header("CSeq")
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if response_cseq < dialog.local_cseq && code >= 400 {
// ACK the retransmitted error response.
if let Some(orig) = &self.orig_invite {
let ack = build_non_2xx_ack(orig, msg);
return SipLegAction::Send(ack.serialize());
}
return SipLegAction::None;
}
}
}
// Handle 407 Proxy Authentication Required.
if code == 407 {
return self.handle_auth_challenge(msg);
}
// Update dialog state.
if let Some(dialog) = &mut self.dialog {
dialog.process_response(msg);
}
if code == 180 || code == 183 {
self.state = LegState::Ringing;
SipLegAction::StateChange(LegState::Ringing)
} else if code >= 200 && code < 300 {
// ACK the 200 OK.
let ack_buf = if let Some(dialog) = &self.dialog {
let ack = dialog.create_ack();
Some(ack.serialize())
} else {
None
};
// If already connected (200 retransmit), just re-ACK.
if self.state == LegState::Connected {
return match ack_buf {
Some(buf) => SipLegAction::Send(buf),
None => SipLegAction::None,
};
}
// Learn media endpoint from SDP.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
self.remote_media = Some(addr);
}
}
}
self.state = LegState::Connected;
match ack_buf {
Some(buf) => SipLegAction::ConnectedWithAck(buf),
None => SipLegAction::StateChange(LegState::Connected),
}
} else if code >= 300 {
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
SipLegAction::Terminated(format!("rejected_{code}"))
} else {
SipLegAction::None // 1xx provisional
}
}
fn handle_auth_challenge(&mut self, msg: &SipMessage) -> SipLegAction {
if self.auth_attempted {
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
return SipLegAction::Terminated("auth_rejected".to_string());
}
self.auth_attempted = true;
let challenge_header = match msg.get_header("Proxy-Authenticate") {
Some(h) => h,
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_challenge".to_string());
}
};
let challenge = match parse_digest_challenge(challenge_header) {
Some(c) => c,
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_bad_challenge".to_string());
}
};
let password = match &self.config.password {
Some(p) => p.clone(),
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_password".to_string());
}
};
let aor = match &self.config.registered_aor {
Some(a) => a.clone(),
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_aor".to_string());
}
};
let username = aor
.trim_start_matches("sip:")
.trim_start_matches("sips:")
.split('@')
.next()
.unwrap_or("")
.to_string();
let dest_uri = self
.invite
.as_ref()
.and_then(|i| i.request_uri())
.unwrap_or("")
.to_string();
let auth_value = compute_digest_auth(
&username,
&password,
&challenge.realm,
&challenge.nonce,
"INVITE",
&dest_uri,
challenge.algorithm.as_deref(),
challenge.opaque.as_deref(),
);
// ACK the 407.
let mut ack_buf = None;
if let Some(invite) = &self.invite {
let ack = build_non_2xx_ack(invite, msg);
ack_buf = Some(ack.serialize());
}
// Save original INVITE for retransmission handling.
self.orig_invite = self.invite.clone();
// Build authenticated INVITE with same From tag, CSeq=2.
let ip = self
.config
.public_ip
.as_deref()
.unwrap_or(&self.config.lan_ip);
let from_tag = self
.dialog
.as_ref()
.map(|d| d.local_tag.clone())
.unwrap_or_else(generate_tag);
let sdp = build_sdp(&SdpOptions {
ip,
port: self.config.rtp_port,
payload_types: &self.config.codecs,
..Default::default()
});
let call_id = self
.dialog
.as_ref()
.map(|d| d.call_id.clone())
.unwrap_or_default();
let invite_auth = SipMessage::create_request(
"INVITE",
&dest_uri,
RequestOptions {
via_host: ip.to_string(),
via_port: self.config.lan_port,
via_transport: None,
via_branch: Some(generate_branch()),
from_uri: aor,
from_display_name: None,
from_tag: Some(from_tag),
to_uri: dest_uri.clone(),
to_display_name: None,
to_tag: None,
call_id: Some(call_id),
cseq: Some(2),
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("Proxy-Authorization".to_string(), auth_value),
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
},
);
self.invite = Some(invite_auth.clone());
if let Some(dialog) = &mut self.dialog {
dialog.local_cseq = 2;
}
// Return both the ACK for the 407 and the new authenticated INVITE.
let invite_buf = invite_auth.serialize();
SipLegAction::AuthRetry {
ack_407: ack_buf,
invite_with_auth: invite_buf,
}
}
fn handle_request(&mut self, msg: &SipMessage) -> SipLegAction {
let method = msg.method().unwrap_or("");
if method == "BYE" {
let ok = SipMessage::create_response(200, "OK", msg, None);
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
return SipLegAction::SendAndTerminate(ok.serialize(), "bye".to_string());
}
if method == "INFO" {
let ok = SipMessage::create_response(200, "OK", msg, None);
return SipLegAction::Send(ok.serialize());
}
SipLegAction::None
}
/// Build a BYE or CANCEL to tear down this leg.
pub fn build_hangup(&mut self) -> Option<Vec<u8>> {
let dialog = self.dialog.as_mut()?;
let msg = if dialog.state == DialogState::Confirmed {
dialog.create_request("BYE", None, None, None)
} else if dialog.state == DialogState::Early {
if let Some(invite) = &self.invite {
dialog.create_cancel(invite)
} else {
return None;
}
} else {
return None;
};
self.state = LegState::Terminating;
dialog.terminate();
Some(msg.serialize())
}
/// Get the SIP Call-ID for routing.
pub fn sip_call_id(&self) -> Option<&str> {
self.dialog.as_ref().map(|d| d.call_id.as_str())
}
}
/// Actions produced by the SipLeg message handler.
pub enum SipLegAction {
/// No action needed.
None,
/// Send a SIP message (ACK, 200 OK to INFO, etc.).
Send(Vec<u8>),
/// Leg state changed.
StateChange(LegState),
/// Connected — send this ACK.
ConnectedWithAck(Vec<u8>),
/// Terminated with a reason.
Terminated(String),
/// Send 200 OK and terminate.
SendAndTerminate(Vec<u8>, String),
/// 407 auth retry — send ACK for 407, then send new INVITE with auth.
AuthRetry {
ack_407: Option<Vec<u8>>,
invite_with_auth: Vec<u8>,
},
}
/// Build an ACK for a non-2xx response (same transaction as the INVITE).
fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage {
let via = original_invite.get_header("Via").unwrap_or("").to_string();
let from = original_invite
.get_header("From")
.unwrap_or("")
.to_string();
let to = response.get_header("To").unwrap_or("").to_string();
let call_id = original_invite.call_id().to_string();
let cseq_num: u32 = original_invite
.get_header("CSeq")
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let ruri = original_invite
.request_uri()
.unwrap_or("sip:unknown")
.to_string();
SipMessage::new(
format!("ACK {ruri} SIP/2.0"),
vec![
("Via".to_string(), via),
("From".to_string(), from),
("To".to_string(), to),
("Call-ID".to_string(), call_id),
("CSeq".to_string(), format!("{cseq_num} ACK")),
("Max-Forwards".to_string(), "70".to_string()),
("Content-Length".to_string(), "0".to_string()),
],
String::new(),
)
}

View File

@@ -1,16 +1,17 @@
//! WebRTC engine — manages browser PeerConnections with SIP audio bridging.
//! WebRTC engine — manages browser PeerConnections.
//!
//! Browser Opus audio → Rust PeerConnection → transcode via codec-lib → SIP RTP
//! SIP RTP → transcode via codec-lib → Rust PeerConnection → Browser Opus
//! Audio bridging is now channel-based:
//! - Browser Opus audio → on_track → mixer inbound channel
//! - Mixer outbound channel → Opus RTP → TrackLocalStaticRTP → browser
//!
//! The mixer handles all transcoding. The WebRTC engine just shuttles raw Opus.
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{TranscodeState, PT_G722, PT_OPUS};
use crate::mixer::RtpPacket;
use codec_lib::PT_OPUS;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
@@ -22,24 +23,14 @@ use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
/// SIP-side bridge info for a WebRTC session.
#[derive(Clone)]
pub struct SipBridgeInfo {
/// Provider's media endpoint (RTP destination).
pub provider_media: SocketAddr,
/// Provider's codec payload type (e.g. 9 for G.722).
pub sip_pt: u8,
/// The SIP UDP socket for sending RTP to the provider.
pub sip_socket: Arc<UdpSocket>,
}
/// A managed WebRTC session.
struct WebRtcSession {
pc: Arc<RTCPeerConnection>,
local_track: Arc<TrackLocalStaticRTP>,
call_id: Option<String>,
/// SIP bridge — set when the session is linked to a call.
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
/// Channel sender for forwarding browser Opus audio to the mixer.
/// Set when the session is linked to a call via link_to_mixer().
mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>>,
}
/// Manages all WebRTC sessions.
@@ -56,7 +47,7 @@ impl WebRtcEngine {
}
}
/// Handle a WebRTC offer from a browser.
/// Handle a WebRTC offer from a browser — create PeerConnection, return SDP answer.
pub async fn handle_offer(
&mut self,
session_id: &str,
@@ -99,8 +90,9 @@ impl WebRtcEngine {
.await
.map_err(|e| format!("add track: {e}"))?;
// Shared SIP bridge info (populated when linked to a call).
let sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>> = Arc::new(Mutex::new(None));
// Shared mixer channel sender (populated when linked to a call).
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> =
Arc::new(Mutex::new(None));
// ICE candidate handler.
let out_tx_ice = self.out_tx.clone();
@@ -151,14 +143,14 @@ impl WebRtcEngine {
}));
// Track handler — receives Opus audio from the browser.
// When SIP bridge is set, transcodes and forwards to provider.
// Forwards raw Opus payload to the mixer channel (when linked).
let out_tx_track = self.out_tx.clone();
let sid_track = session_id.to_string();
let sip_bridge_for_track = sip_bridge.clone();
let mixer_tx_for_track = mixer_tx.clone();
pc.on_track(Box::new(move |track, _receiver, _transceiver| {
let out_tx = out_tx_track.clone();
let sid = sid_track.clone();
let bridge = sip_bridge_for_track.clone();
let mixer_tx = mixer_tx_for_track.clone();
Box::pin(async move {
let codec_info = track.codec();
emit_event(
@@ -171,8 +163,8 @@ impl WebRtcEngine {
}),
);
// Spawn the browser→SIP audio forwarding task.
tokio::spawn(browser_to_sip_loop(track, bridge, out_tx, sid));
// Spawn browser→mixer forwarding task.
tokio::spawn(browser_to_mixer_loop(track, mixer_tx, out_tx, sid));
})
}));
@@ -199,67 +191,41 @@ impl WebRtcEngine {
pc,
local_track,
call_id: None,
sip_bridge,
mixer_tx,
},
);
Ok(answer_sdp)
}
/// Link a WebRTC session to a SIP call — sets up the audio bridge.
pub async fn link_to_sip(
/// Link a WebRTC session to a call's mixer via channels.
/// - `inbound_tx`: browser audio goes TO the mixer through this channel
/// - `outbound_rx`: mixed audio comes FROM the mixer through this channel
pub async fn link_to_mixer(
&mut self,
session_id: &str,
call_id: &str,
bridge_info: SipBridgeInfo,
inbound_tx: mpsc::Sender<RtpPacket>,
outbound_rx: mpsc::Receiver<Vec<u8>>,
) -> bool {
if let Some(session) = self.sessions.get_mut(session_id) {
session.call_id = Some(call_id.to_string());
let mut bridge = session.sip_bridge.lock().await;
*bridge = Some(bridge_info);
true
} else {
false
}
}
let session = match self.sessions.get_mut(session_id) {
Some(s) => s,
None => return false,
};
/// Send transcoded audio from the SIP side to the browser.
/// Called by the RTP relay when it receives a packet from the provider.
pub async fn forward_sip_to_browser(
&self,
session_id: &str,
sip_rtp_payload: &[u8],
sip_pt: u8,
) -> Result<(), String> {
let session = self
.sessions
.get(session_id)
.ok_or_else(|| format!("session {session_id} not found"))?;
session.call_id = Some(call_id.to_string());
// Transcode SIP codec → Opus.
// We create a temporary TranscodeState per packet for simplicity.
// TODO: Use a per-session persistent state for proper codec continuity.
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec: {e}"))?;
let opus_payload = transcoder
.transcode(sip_rtp_payload, sip_pt, PT_OPUS, Some("to_browser"))
.map_err(|e| format!("transcode: {e}"))?;
if opus_payload.is_empty() {
return Ok(());
// Set the mixer sender so the on_track loop starts forwarding.
{
let mut tx = session.mixer_tx.lock().await;
*tx = Some(inbound_tx);
}
// Build RTP header for Opus.
// TODO: Track seq/ts/ssrc per session for proper continuity.
let header = build_rtp_header(PT_OPUS, 0, 0, 0);
let mut packet = header.to_vec();
packet.extend_from_slice(&opus_payload);
// Spawn mixer→browser outbound task.
let local_track = session.local_track.clone();
tokio::spawn(mixer_to_browser_loop(outbound_rx, local_track));
session
.local_track
.write(&packet)
.await
.map(|_| ())
.map_err(|e| format!("write: {e}"))
true
}
pub async fn add_ice_candidate(
@@ -294,90 +260,48 @@ impl WebRtcEngine {
}
Ok(())
}
pub fn has_session(&self, session_id: &str) -> bool {
self.sessions.contains_key(session_id)
}
}
/// Browser → SIP audio forwarding loop.
/// Reads Opus RTP from the browser, transcodes to the SIP codec, sends to provider.
async fn browser_to_sip_loop(
/// Browser → Mixer audio forwarding loop.
/// Reads Opus RTP from the browser track, sends raw Opus payload to the mixer channel.
async fn browser_to_mixer_loop(
track: Arc<webrtc::track::track_remote::TrackRemote>,
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>>,
out_tx: OutTx,
session_id: String,
) {
// Create a persistent codec state for this direction.
let mut transcoder = match TranscodeState::new() {
Ok(t) => t,
Err(e) => {
emit_event(
&out_tx,
"webrtc_error",
serde_json::json!({ "session_id": session_id, "error": format!("codec init: {e}") }),
);
return;
}
};
let mut buf = vec![0u8; 1500];
let mut count = 0u64;
let mut to_sip_seq: u16 = 0;
let mut to_sip_ts: u32 = 0;
let to_sip_ssrc: u32 = rand::random();
loop {
match track.read(&mut buf).await {
Ok((rtp_packet, _attributes)) => {
count += 1;
// Get the SIP bridge info (may not be set yet if call isn't linked).
let bridge = sip_bridge.lock().await;
let bridge_info = match bridge.as_ref() {
Some(b) => b.clone(),
None => continue, // Not linked to a SIP call yet — drop the packet.
};
drop(bridge); // Release lock before doing I/O.
// Extract Opus payload from the RTP packet (skip 12-byte header).
let payload = &rtp_packet.payload;
if payload.is_empty() {
continue;
}
// Transcode Opus → SIP codec (e.g. G.722).
let sip_payload = match transcoder.transcode(
payload,
PT_OPUS,
bridge_info.sip_pt,
Some("to_sip"),
) {
Ok(p) if !p.is_empty() => p,
_ => continue,
};
// Build SIP RTP packet.
let header = build_rtp_header(bridge_info.sip_pt, to_sip_seq, to_sip_ts, to_sip_ssrc);
let mut sip_rtp = header.to_vec();
sip_rtp.extend_from_slice(&sip_payload);
to_sip_seq = to_sip_seq.wrapping_add(1);
to_sip_ts = to_sip_ts.wrapping_add(rtp_clock_increment(bridge_info.sip_pt));
// Send to provider.
let _ = bridge_info
.sip_socket
.send_to(&sip_rtp, bridge_info.provider_media)
.await;
// Send raw Opus payload to mixer (if linked).
let tx = mixer_tx.lock().await;
if let Some(ref tx) = *tx {
let _ = tx
.send(RtpPacket {
payload: payload.to_vec(),
payload_type: PT_OPUS,
})
.await;
}
drop(tx);
if count == 1 || count == 50 || count % 500 == 0 {
emit_event(
&out_tx,
"webrtc_audio_tx",
"webrtc_audio_rx",
serde_json::json!({
"session_id": session_id,
"direction": "browser_to_sip",
"direction": "browser_to_mixer",
"packet_count": count,
}),
);
@@ -387,3 +311,14 @@ async fn browser_to_sip_loop(
}
}
}
/// Mixer → Browser audio forwarding loop.
/// Reads Opus-encoded RTP packets from the mixer and writes to the WebRTC track.
async fn mixer_to_browser_loop(
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
local_track: Arc<TrackLocalStaticRTP>,
) {
while let Some(rtp_data) = outbound_rx.recv().await {
let _ = local_track.write(&rtp_data).await;
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.12.0',
version: '1.14.0',
description: 'undefined'
}

View File

@@ -128,14 +128,14 @@ async function handleRequest(
}
}
// API: add leg to call.
// API: add leg to call (device — not yet implemented, needs device-to-call routing).
if (url.pathname.startsWith('/api/call/') && url.pathname.endsWith('/addleg') && method === 'POST') {
try {
const callId = url.pathname.split('/')[3];
const body = await readJsonBody(req);
if (!body?.deviceId) return sendJson(res, { ok: false, error: 'missing deviceId' }, 400);
const ok = callManager?.addDeviceToCall(callId, body.deviceId) ?? false;
return sendJson(res, { ok });
// TODO: implement device leg addition (needs SIP INVITE to device).
return sendJson(res, { ok: false, error: 'not yet implemented' }, 501);
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}
@@ -147,8 +147,9 @@ async function handleRequest(
const callId = url.pathname.split('/')[3];
const body = await readJsonBody(req);
if (!body?.number) return sendJson(res, { ok: false, error: 'missing number' }, 400);
const ok = callManager?.addExternalToCall(callId, body.number, body.providerId) ?? false;
return sendJson(res, { ok });
const { addLeg: addLegFn } = await import('./proxybridge.ts');
const legId = await addLegFn(callId, body.number, body.providerId);
return sendJson(res, { ok: !!legId, legId });
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}
@@ -160,22 +161,22 @@ async function handleRequest(
const callId = url.pathname.split('/')[3];
const body = await readJsonBody(req);
if (!body?.legId) return sendJson(res, { ok: false, error: 'missing legId' }, 400);
const ok = callManager?.removeLegFromCall(callId, body.legId) ?? false;
const { removeLeg: removeLegFn } = await import('./proxybridge.ts');
const ok = await removeLegFn(callId, body.legId);
return sendJson(res, { ok });
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}
}
// API: transfer leg.
// API: transfer leg (not yet implemented).
if (url.pathname === '/api/transfer' && method === 'POST') {
try {
const body = await readJsonBody(req);
if (!body?.sourceCallId || !body?.legId || !body?.targetCallId) {
return sendJson(res, { ok: false, error: 'missing sourceCallId, legId, or targetCallId' }, 400);
}
const ok = callManager?.transferLeg(body.sourceCallId, body.legId, body.targetCallId) ?? false;
return sendJson(res, { ok });
return sendJson(res, { ok: false, error: 'not yet implemented' }, 501);
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}
@@ -339,11 +340,13 @@ export function initWebUi(
onHangupCall: (callId: string) => boolean,
onConfigSaved?: () => void,
callManager?: CallManager,
voiceboxManager?: VoiceboxManager,
/** WebRTC signaling handlers — forwarded to Rust proxy-engine. */
onWebRtcOffer?: (sessionId: string, sdp: string, ws: WebSocket) => Promise<void>,
onWebRtcIce?: (sessionId: string, candidate: any) => Promise<void>,
onWebRtcClose?: (sessionId: string) => Promise<void>,
voiceboxManager?: VoiceboxManager,
/** Called when browser sends webrtc-accept (callId + sessionId linking). */
onWebRtcAccept?: (callId: string, sessionId: string) => void,
): void {
const WEB_PORT = 3060;
@@ -382,6 +385,7 @@ export function initWebUi(
if (msg.type === 'webrtc-offer' && msg.sessionId) {
// Forward to Rust proxy-engine for WebRTC handling.
if (onWebRtcOffer) {
log(`[webrtc-ws] offer msg keys: ${Object.keys(msg).join(',')}, sdp type: ${typeof msg.sdp}, sdp len: ${msg.sdp?.length || 0}`);
onWebRtcOffer(msg.sessionId, msg.sdp, socket as any).catch((e: any) =>
log(`[webrtc] offer error: ${e.message}`));
}
@@ -394,8 +398,10 @@ export function initWebUi(
onWebRtcClose(msg.sessionId).catch(() => {});
}
} else if (msg.type === 'webrtc-accept' && msg.callId) {
// TODO: Wire to Rust call linking.
log(`[webrtc] accept: call=${msg.callId} session=${msg.sessionId || 'none'}`);
if (onWebRtcAccept && msg.sessionId) {
onWebRtcAccept(msg.callId, msg.sessionId);
}
} else if (msg.type?.startsWith('webrtc-')) {
msg._remoteIp = remoteIp;
handleWebRtcSignaling(socket as any, msg);

View File

@@ -238,6 +238,38 @@ export async function webrtcLink(sessionId: string, callId: string, providerMedi
}
}
/**
* Add an external SIP leg to an existing call (multiparty).
*/
export async function addLeg(callId: string, number: string, providerId?: string): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await bridge.sendCommand('add_leg', {
call_id: callId,
number,
provider_id: providerId,
} as any);
return (result as any)?.leg_id || null;
} catch (e: any) {
logFn?.(`[proxy-engine] add_leg error: ${e?.message || e}`);
return null;
}
}
/**
* Remove a leg from a call.
*/
export async function removeLeg(callId: string, legId: string): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await bridge.sendCommand('remove_leg', { call_id: callId, leg_id: legId } as any);
return true;
} catch (e: any) {
logFn?.(`[proxy-engine] remove_leg error: ${e?.message || e}`);
return false;
}
}
/**
* Close a WebRTC session.
*/

View File

@@ -37,7 +37,10 @@ import {
shutdownProxyEngine,
webrtcOffer,
webrtcIce,
webrtcLink,
webrtcClose,
addLeg,
removeLeg,
} from './proxybridge.ts';
import type {
IIncomingCallEvent,
@@ -118,6 +121,12 @@ const activeCalls = new Map<string, IActiveCall>();
const callHistory: ICallHistoryEntry[] = [];
const MAX_HISTORY = 100;
// WebRTC session ↔ call linking state.
// Both pieces (session accept + call media info) can arrive in any order.
const webrtcSessionToCall = new Map<string, string>(); // sessionId → callId
const webrtcCallToSession = new Map<string, string>(); // callId → sessionId
const pendingCallMedia = new Map<string, { addr: string; port: number; sipPt: number }>(); // callId → provider media info
// Initialize provider statuses from config (all start as unregistered).
for (const p of appConfig.providers) {
providerStatuses.set(p.id, {
@@ -271,6 +280,17 @@ async function startProxyEngine(): Promise<void> {
state: 'setting-up',
startedAt: Date.now(),
});
// Notify all browser devices — they can connect via WebRTC to listen/talk.
const browserIds = getAllBrowserDeviceIds();
for (const bid of browserIds) {
sendToBrowserDevice(bid, {
type: 'webrtc-incoming',
callId: data.call_id,
from: data.number,
deviceId: bid,
});
}
});
onProxyEvent('call_ringing', (data: { call_id: string }) => {
@@ -278,12 +298,33 @@ async function startProxyEngine(): Promise<void> {
if (call) call.state = 'ringing';
});
onProxyEvent('call_answered', (data: { call_id: string }) => {
onProxyEvent('call_answered', (data: { call_id: string; provider_media_addr?: string; provider_media_port?: number; sip_pt?: number }) => {
const call = activeCalls.get(data.call_id);
if (call) {
call.state = 'connected';
log(`[call] ${data.call_id} connected`);
}
// Try to link WebRTC session to this call for audio bridging.
if (data.provider_media_addr && data.provider_media_port) {
const sessionId = webrtcCallToSession.get(data.call_id);
if (sessionId) {
// Both session and media info available — link now.
const sipPt = data.sip_pt ?? 9;
log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${data.call_id} media=${data.provider_media_addr}:${data.provider_media_port} pt=${sipPt}`);
webrtcLink(sessionId, data.call_id, data.provider_media_addr, data.provider_media_port, sipPt).then((ok) => {
log(`[webrtc] link result: ${ok}`);
});
} else {
// Session not yet accepted — store media info for when it arrives.
pendingCallMedia.set(data.call_id, {
addr: data.provider_media_addr,
port: data.provider_media_port,
sipPt: data.sip_pt ?? 9,
});
log(`[webrtc] media info cached for call=${data.call_id}, waiting for session accept`);
}
}
});
onProxyEvent('call_ended', (data: ICallEndedEvent) => {
@@ -301,6 +342,18 @@ async function startProxyEngine(): Promise<void> {
});
if (callHistory.length > MAX_HISTORY) callHistory.pop();
activeCalls.delete(data.call_id);
// Notify browser(s) that the call ended.
broadcastWs('webrtc-call-ended', { callId: data.call_id });
// Clean up WebRTC session mappings.
const sessionId = webrtcCallToSession.get(data.call_id);
if (sessionId) {
webrtcCallToSession.delete(data.call_id);
webrtcSessionToCall.delete(sessionId);
webrtcClose(sessionId).catch(() => {});
}
pendingCallMedia.delete(data.call_id);
}
});
@@ -308,6 +361,19 @@ async function startProxyEngine(): Promise<void> {
log(`[sip] unhandled ${data.method_or_status} Call-ID=${data.call_id?.slice(0, 20)} from=${data.from_addr}:${data.from_port}`);
});
// Leg events (multiparty).
onProxyEvent('leg_added', (data: any) => {
log(`[leg] added: call=${data.call_id} leg=${data.leg_id} kind=${data.kind} state=${data.state}`);
});
onProxyEvent('leg_removed', (data: any) => {
log(`[leg] removed: call=${data.call_id} leg=${data.leg_id}`);
});
onProxyEvent('leg_state_changed', (data: any) => {
log(`[leg] state: call=${data.call_id} leg=${data.leg_id}${data.state}`);
});
// WebRTC events from Rust — forward ICE candidates to browser via WebSocket.
onProxyEvent('webrtc_ice_candidate', (data: any) => {
// Find the browser's WebSocket by session ID and send the ICE candidate.
@@ -467,14 +533,22 @@ initWebUi(
}
},
undefined, // callManager — legacy, replaced by Rust proxy-engine
voiceboxManager,
voiceboxManager, // voiceboxManager
// WebRTC signaling → forwarded to Rust proxy-engine.
async (sessionId, sdp, ws) => {
log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)}`);
log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)} sdp_type=${typeof sdp} sdp_len=${sdp?.length || 0}`);
if (!sdp || typeof sdp !== 'string' || sdp.length < 10) {
log(`[webrtc] WARNING: invalid SDP (type=${typeof sdp}), skipping offer`);
return;
}
log(`[webrtc] sending offer to Rust (${sdp.length}b)...`);
const result = await webrtcOffer(sessionId, sdp);
log(`[webrtc] Rust result: ${JSON.stringify(result)?.slice(0, 200)}`);
if (result?.sdp) {
ws.send(JSON.stringify({ type: 'webrtc-answer', sessionId, sdp: result.sdp }));
log(`[webrtc] answer sent to browser session=${sessionId.slice(0, 8)}`);
} else {
log(`[webrtc] ERROR: no answer SDP from Rust`);
}
},
async (sessionId, candidate) => {
@@ -483,6 +557,26 @@ initWebUi(
async (sessionId) => {
await webrtcClose(sessionId);
},
// onWebRtcAccept — browser has accepted a call, linking session to call.
(callId: string, sessionId: string) => {
log(`[webrtc] accept: callId=${callId} sessionId=${sessionId.slice(0, 8)}`);
// Store bidirectional mapping.
webrtcSessionToCall.set(sessionId, callId);
webrtcCallToSession.set(callId, sessionId);
// Check if we already have media info for this call (provider answered first).
const media = pendingCallMedia.get(callId);
if (media) {
pendingCallMedia.delete(callId);
log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${callId} media=${media.addr}:${media.port} pt=${media.sipPt}`);
webrtcLink(sessionId, callId, media.addr, media.port, media.sipPt).then((ok) => {
log(`[webrtc] link result: ${ok}`);
});
} else {
log(`[webrtc] session ${sessionId.slice(0, 8)} accepted, waiting for call_answered media info`);
}
},
);
// ---------------------------------------------------------------------------

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.12.0',
version: '1.14.0',
description: 'undefined'
}