Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6a130db7c7 | |||
| 93f671f1f9 | |||
| 36eab44e28 | |||
| 9e5aa35fee |
17
changelog.md
17
changelog.md
@@ -1,5 +1,22 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-10 - 1.14.0 - feat(proxy-engine)
|
||||
add multiparty call mixing with dynamic SIP and WebRTC leg management
|
||||
|
||||
- replace passthrough call handling with a mixer-backed call model that tracks multiple legs and exposes leg status in call state output
|
||||
- add mixer and leg I/O infrastructure to bridge SIP RTP and WebRTC audio through channel-based mix-minus processing
|
||||
- introduce add_leg and remove_leg proxy commands and wire frontend bridge APIs to manage external call legs
|
||||
- emit leg lifecycle events for observability and mark unimplemented device-leg and transfer HTTP endpoints with 501 responses
|
||||
|
||||
## 2026-04-10 - 1.13.0 - feat(proxy-engine,webrtc)
|
||||
add B2BUA SIP leg handling and WebRTC call bridging for outbound calls
|
||||
|
||||
- introduce a new SipLeg module to manage outbound provider dialogs, including INVITE lifecycle, digest auth retries, ACK handling, media endpoint tracking, and termination
|
||||
- store outbound dashboard calls as B2BUA calls in the call manager and emit provider media details on call_answered for bridge setup
|
||||
- separate SIP and WebRTC engine locking to avoid contention and deadlocks while linking sessions to call RTP sockets
|
||||
- add bidirectional RTP bridging between provider SIP media and browser WebRTC audio using the allocated RTP socket
|
||||
- wire browser webrtc-accept events in the frontend and sipproxy so session-to-call linking can occur when media and acceptance arrive in either order
|
||||
|
||||
## 2026-04-10 - 1.12.0 - feat(proxy-engine)
|
||||
add Rust-based outbound calling, WebRTC bridging, and voicemail handling
|
||||
|
||||
|
||||
BIN
nogit/voicemail/default/msg-1775825168199.wav
Normal file
BIN
nogit/voicemail/default/msg-1775825168199.wav
Normal file
Binary file not shown.
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "siprouter",
|
||||
"version": "1.12.0",
|
||||
"version": "1.14.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
|
||||
@@ -1,12 +1,19 @@
|
||||
//! Call hub — owns legs and bridges media.
|
||||
//! Call hub — owns N legs and a mixer task.
|
||||
//!
|
||||
//! Each Call has a unique ID and tracks its state, direction, and associated
|
||||
//! SIP Call-IDs for message routing.
|
||||
//! Every call has a central mixer that provides mix-minus audio to all
|
||||
//! participants. Legs can be added and removed dynamically mid-call.
|
||||
|
||||
use crate::mixer::{MixerCommand, RtpPacket};
|
||||
use crate::sip_leg::SipLeg;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub type LegId = String;
|
||||
|
||||
/// Call state machine.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
@@ -15,8 +22,6 @@ pub enum CallState {
|
||||
Ringing,
|
||||
Connected,
|
||||
Voicemail,
|
||||
Ivr,
|
||||
Terminating,
|
||||
Terminated,
|
||||
}
|
||||
|
||||
@@ -27,8 +32,6 @@ impl CallState {
|
||||
Self::Ringing => "ringing",
|
||||
Self::Connected => "connected",
|
||||
Self::Voicemail => "voicemail",
|
||||
Self::Ivr => "ivr",
|
||||
Self::Terminating => "terminating",
|
||||
Self::Terminated => "terminated",
|
||||
}
|
||||
}
|
||||
@@ -49,43 +52,172 @@ impl CallDirection {
|
||||
}
|
||||
}
|
||||
|
||||
/// A passthrough call — both sides share the same SIP Call-ID.
|
||||
/// The proxy rewrites SDP/Contact/Request-URI and relays RTP.
|
||||
pub struct PassthroughCall {
|
||||
/// The type of a call leg.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum LegKind {
|
||||
SipProvider,
|
||||
SipDevice,
|
||||
WebRtc,
|
||||
Media, // voicemail playback, IVR, recording
|
||||
}
|
||||
|
||||
impl LegKind {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Self::SipProvider => "sip-provider",
|
||||
Self::SipDevice => "sip-device",
|
||||
Self::WebRtc => "webrtc",
|
||||
Self::Media => "media",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-leg state.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum LegState {
|
||||
Inviting,
|
||||
Ringing,
|
||||
Connected,
|
||||
Terminated,
|
||||
}
|
||||
|
||||
impl LegState {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
Self::Inviting => "inviting",
|
||||
Self::Ringing => "ringing",
|
||||
Self::Connected => "connected",
|
||||
Self::Terminated => "terminated",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Information about a single leg in a call.
|
||||
pub struct LegInfo {
|
||||
pub id: LegId,
|
||||
pub kind: LegKind,
|
||||
pub state: LegState,
|
||||
pub codec_pt: u8,
|
||||
|
||||
/// For SIP legs: the SIP dialog manager (handles 407 auth, BYE, etc).
|
||||
pub sip_leg: Option<SipLeg>,
|
||||
/// For SIP legs: the SIP Call-ID for message routing.
|
||||
pub sip_call_id: Option<String>,
|
||||
/// For WebRTC legs: the session ID in WebRtcEngine.
|
||||
pub webrtc_session_id: Option<String>,
|
||||
/// The RTP socket allocated for this leg.
|
||||
pub rtp_socket: Option<Arc<UdpSocket>>,
|
||||
/// The RTP port number.
|
||||
pub rtp_port: u16,
|
||||
/// The remote media endpoint (learned from SDP or address learning).
|
||||
pub remote_media: Option<SocketAddr>,
|
||||
/// SIP signaling address (provider or device).
|
||||
pub signaling_addr: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
/// A multiparty call with N legs and a central mixer.
|
||||
pub struct Call {
|
||||
pub id: String,
|
||||
pub sip_call_id: String,
|
||||
pub state: CallState,
|
||||
pub direction: CallDirection,
|
||||
pub created_at: Instant,
|
||||
|
||||
// Call metadata.
|
||||
// Metadata.
|
||||
pub caller_number: Option<String>,
|
||||
pub callee_number: Option<String>,
|
||||
pub provider_id: String,
|
||||
|
||||
// Provider side.
|
||||
pub provider_addr: SocketAddr,
|
||||
pub provider_media: Option<SocketAddr>,
|
||||
/// All legs in this call, keyed by leg ID.
|
||||
pub legs: HashMap<LegId, LegInfo>,
|
||||
|
||||
// Device side.
|
||||
pub device_addr: SocketAddr,
|
||||
pub device_media: Option<SocketAddr>,
|
||||
/// Channel to send commands to the mixer task.
|
||||
pub mixer_cmd_tx: mpsc::Sender<MixerCommand>,
|
||||
|
||||
// RTP relay.
|
||||
pub rtp_port: u16,
|
||||
pub rtp_socket: Arc<UdpSocket>,
|
||||
|
||||
// Packet counters.
|
||||
pub pkt_from_device: u64,
|
||||
pub pkt_from_provider: u64,
|
||||
/// Handle to the mixer task (aborted on call teardown).
|
||||
mixer_task: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl PassthroughCall {
|
||||
impl Call {
|
||||
pub fn new(
|
||||
id: String,
|
||||
direction: CallDirection,
|
||||
provider_id: String,
|
||||
mixer_cmd_tx: mpsc::Sender<MixerCommand>,
|
||||
mixer_task: JoinHandle<()>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id,
|
||||
state: CallState::SettingUp,
|
||||
direction,
|
||||
created_at: Instant::now(),
|
||||
caller_number: None,
|
||||
callee_number: None,
|
||||
provider_id,
|
||||
legs: HashMap::new(),
|
||||
mixer_cmd_tx,
|
||||
mixer_task: Some(mixer_task),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a leg to the mixer. Sends the AddLeg command with channel endpoints.
|
||||
pub async fn add_leg_to_mixer(
|
||||
&self,
|
||||
leg_id: &str,
|
||||
codec_pt: u8,
|
||||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
) {
|
||||
let _ = self
|
||||
.mixer_cmd_tx
|
||||
.send(MixerCommand::AddLeg {
|
||||
leg_id: leg_id.to_string(),
|
||||
codec_pt,
|
||||
inbound_rx,
|
||||
outbound_tx,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Remove a leg from the mixer.
|
||||
pub async fn remove_leg_from_mixer(&self, leg_id: &str) {
|
||||
let _ = self
|
||||
.mixer_cmd_tx
|
||||
.send(MixerCommand::RemoveLeg {
|
||||
leg_id: leg_id.to_string(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn duration_secs(&self) -> u64 {
|
||||
self.created_at.elapsed().as_secs()
|
||||
}
|
||||
|
||||
/// Shut down the mixer and abort its task.
|
||||
pub async fn shutdown_mixer(&mut self) {
|
||||
let _ = self.mixer_cmd_tx.send(MixerCommand::Shutdown).await;
|
||||
if let Some(handle) = self.mixer_task.take() {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// Produce a JSON status snapshot for the dashboard.
|
||||
pub fn to_status_json(&self) -> serde_json::Value {
|
||||
let legs: Vec<serde_json::Value> = self
|
||||
.legs
|
||||
.values()
|
||||
.filter(|l| l.state != LegState::Terminated)
|
||||
.map(|l| {
|
||||
serde_json::json!({
|
||||
"id": l.id,
|
||||
"type": l.kind.as_str(),
|
||||
"state": l.state.as_str(),
|
||||
"codec": sip_proto::helpers::codec_name(l.codec_pt),
|
||||
"rtpPort": l.rtp_port,
|
||||
"remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
serde_json::json!({
|
||||
"id": self.id,
|
||||
"state": self.state.as_str(),
|
||||
@@ -93,11 +225,8 @@ impl PassthroughCall {
|
||||
"callerNumber": self.caller_number,
|
||||
"calleeNumber": self.callee_number,
|
||||
"providerUsed": self.provider_id,
|
||||
"createdAt": self.created_at.elapsed().as_millis(),
|
||||
"duration": self.duration_secs(),
|
||||
"rtpPort": self.rtp_port,
|
||||
"pktFromDevice": self.pkt_from_device,
|
||||
"pktFromProvider": self.pkt_from_provider,
|
||||
"legs": legs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
80
rust/crates/proxy-engine/src/leg_io.rs
Normal file
80
rust/crates/proxy-engine/src/leg_io.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
//! Leg I/O task spawners.
|
||||
//!
|
||||
//! Each SIP leg gets two tasks:
|
||||
//! - Inbound: recv_from on RTP socket → strip header → send RtpPacket to mixer channel
|
||||
//! - Outbound: recv encoded RTP from mixer channel → send_to remote media endpoint
|
||||
//!
|
||||
//! WebRTC leg I/O is handled inside webrtc_engine.rs (on_track + track.write).
|
||||
|
||||
use crate::mixer::RtpPacket;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// Channel pair for connecting a leg to the mixer.
|
||||
pub struct LegChannels {
|
||||
/// Mixer receives decoded packets from this leg.
|
||||
pub inbound_tx: mpsc::Sender<RtpPacket>,
|
||||
pub inbound_rx: mpsc::Receiver<RtpPacket>,
|
||||
/// Mixer sends encoded RTP to this leg.
|
||||
pub outbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
pub outbound_rx: mpsc::Receiver<Vec<u8>>,
|
||||
}
|
||||
|
||||
/// Create a channel pair for a leg.
|
||||
pub fn create_leg_channels() -> LegChannels {
|
||||
let (inbound_tx, inbound_rx) = mpsc::channel::<RtpPacket>(64);
|
||||
let (outbound_tx, outbound_rx) = mpsc::channel::<Vec<u8>>(8);
|
||||
LegChannels {
|
||||
inbound_tx,
|
||||
inbound_rx,
|
||||
outbound_tx,
|
||||
outbound_rx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the inbound I/O task for a SIP leg.
|
||||
/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer.
|
||||
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
|
||||
pub fn spawn_sip_inbound(
|
||||
rtp_socket: Arc<UdpSocket>,
|
||||
inbound_tx: mpsc::Sender<RtpPacket>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 1500];
|
||||
loop {
|
||||
match rtp_socket.recv_from(&mut buf).await {
|
||||
Ok((n, _from)) => {
|
||||
if n < 12 {
|
||||
continue; // Too small for RTP header.
|
||||
}
|
||||
let pt = buf[1] & 0x7F;
|
||||
let payload = buf[12..n].to_vec();
|
||||
if payload.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if inbound_tx.send(RtpPacket { payload, payload_type: pt }).await.is_err() {
|
||||
break; // Channel closed — leg removed.
|
||||
}
|
||||
}
|
||||
Err(_) => break, // Socket error.
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn the outbound I/O task for a SIP leg.
|
||||
/// Reads encoded RTP packets from the mixer and sends them to the remote media endpoint.
|
||||
/// Returns the JoinHandle (exits when the outbound_rx channel is closed).
|
||||
pub fn spawn_sip_outbound(
|
||||
rtp_socket: Arc<UdpSocket>,
|
||||
remote_media: SocketAddr,
|
||||
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
while let Some(rtp_data) = outbound_rx.recv().await {
|
||||
let _ = rtp_socket.send_to(&rtp_data, remote_media).await;
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -12,10 +12,13 @@ mod call_manager;
|
||||
mod config;
|
||||
mod dtmf;
|
||||
mod ipc;
|
||||
mod leg_io;
|
||||
mod mixer;
|
||||
mod provider;
|
||||
mod recorder;
|
||||
mod registrar;
|
||||
mod rtp;
|
||||
mod sip_leg;
|
||||
mod sip_transport;
|
||||
mod voicemail;
|
||||
mod webrtc_engine;
|
||||
@@ -35,14 +38,15 @@ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
/// Shared mutable state for the proxy engine.
|
||||
/// Shared mutable state for the proxy engine (SIP side).
|
||||
/// WebRTC is intentionally kept in a separate lock to avoid contention
|
||||
/// between SIP packet handlers and WebRTC command handlers.
|
||||
struct ProxyEngine {
|
||||
config: Option<AppConfig>,
|
||||
transport: Option<SipTransport>,
|
||||
provider_mgr: ProviderManager,
|
||||
registrar: Registrar,
|
||||
call_mgr: CallManager,
|
||||
webrtc: WebRtcEngine,
|
||||
rtp_pool: Option<RtpPortPool>,
|
||||
out_tx: OutTx,
|
||||
}
|
||||
@@ -55,7 +59,6 @@ impl ProxyEngine {
|
||||
provider_mgr: ProviderManager::new(out_tx.clone()),
|
||||
registrar: Registrar::new(out_tx.clone()),
|
||||
call_mgr: CallManager::new(out_tx.clone()),
|
||||
webrtc: WebRtcEngine::new(out_tx.clone()),
|
||||
rtp_pool: None,
|
||||
out_tx,
|
||||
}
|
||||
@@ -83,9 +86,12 @@ async fn main() {
|
||||
// Emit ready event.
|
||||
emit_event(&out_tx, "ready", serde_json::json!({}));
|
||||
|
||||
// Shared engine state.
|
||||
// Shared engine state (SIP side).
|
||||
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone())));
|
||||
|
||||
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
|
||||
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
|
||||
|
||||
// Read commands from stdin.
|
||||
let stdin = tokio::io::stdin();
|
||||
let reader = BufReader::new(stdin);
|
||||
@@ -105,25 +111,36 @@ async fn main() {
|
||||
};
|
||||
|
||||
let engine = engine.clone();
|
||||
let webrtc = webrtc.clone();
|
||||
let out_tx = out_tx.clone();
|
||||
|
||||
// Handle commands — some are async, so we spawn.
|
||||
tokio::spawn(async move {
|
||||
handle_command(engine, &out_tx, cmd).await;
|
||||
handle_command(engine, webrtc, &out_tx, cmd).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_command(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: Command) {
|
||||
async fn handle_command(
|
||||
engine: Arc<Mutex<ProxyEngine>>,
|
||||
webrtc: Arc<Mutex<WebRtcEngine>>,
|
||||
out_tx: &OutTx,
|
||||
cmd: Command,
|
||||
) {
|
||||
match cmd.method.as_str() {
|
||||
// SIP commands — lock engine only.
|
||||
"configure" => handle_configure(engine, out_tx, &cmd).await,
|
||||
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
|
||||
"make_call" => handle_make_call(engine, out_tx, &cmd).await,
|
||||
"get_status" => handle_get_status(engine, out_tx, &cmd).await,
|
||||
"webrtc_offer" => handle_webrtc_offer(engine, out_tx, &cmd).await,
|
||||
"webrtc_ice" => handle_webrtc_ice(engine, out_tx, &cmd).await,
|
||||
"webrtc_link" => handle_webrtc_link(engine, out_tx, &cmd).await,
|
||||
"webrtc_close" => handle_webrtc_close(engine, out_tx, &cmd).await,
|
||||
"add_leg" => handle_add_leg(engine, out_tx, &cmd).await,
|
||||
"remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await,
|
||||
// WebRTC commands — lock webrtc only (no engine contention).
|
||||
"webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await,
|
||||
"webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await,
|
||||
"webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await,
|
||||
// webrtc_link needs both: engine (for mixer channels) and webrtc (for session).
|
||||
"webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await,
|
||||
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
|
||||
}
|
||||
}
|
||||
@@ -246,14 +263,11 @@ async fn handle_sip_packet(
|
||||
}
|
||||
|
||||
// 3. Route to existing call by SIP Call-ID.
|
||||
// Check if this Call-ID belongs to an active call (avoids borrow conflict).
|
||||
if eng.call_mgr.has_call(msg.call_id()) {
|
||||
let config_ref = eng.config.as_ref().unwrap().clone();
|
||||
// Temporarily take registrar to avoid overlapping borrows.
|
||||
let registrar_dummy = Registrar::new(eng.out_tx.clone());
|
||||
if eng
|
||||
.call_mgr
|
||||
.route_sip_message(&msg, from_addr, socket, &config_ref, ®istrar_dummy)
|
||||
.route_sip_message(&msg, from_addr, socket, &config_ref)
|
||||
.await
|
||||
{
|
||||
return;
|
||||
@@ -524,7 +538,8 @@ async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Co
|
||||
}
|
||||
|
||||
/// Handle `webrtc_offer` — browser sends SDP offer, we create PeerConnection and return answer.
|
||||
async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
/// Uses only the WebRTC lock — no contention with SIP handlers.
|
||||
async fn handle_webrtc_offer(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
||||
@@ -534,8 +549,8 @@ async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
|
||||
None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; }
|
||||
};
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
match eng.webrtc.handle_offer(&session_id, &offer_sdp).await {
|
||||
let mut wrtc = webrtc.lock().await;
|
||||
match wrtc.handle_offer(&session_id, &offer_sdp).await {
|
||||
Ok(answer_sdp) => {
|
||||
respond_ok(out_tx, &cmd.id, serde_json::json!({
|
||||
"session_id": session_id,
|
||||
@@ -547,7 +562,8 @@ async fn handle_webrtc_offer(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
|
||||
}
|
||||
|
||||
/// Handle `webrtc_ice` — forward ICE candidate from browser to Rust PeerConnection.
|
||||
async fn handle_webrtc_ice(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
/// Uses only the WebRTC lock.
|
||||
async fn handle_webrtc_ice(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
||||
@@ -556,15 +572,22 @@ async fn handle_webrtc_ice(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd:
|
||||
let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str());
|
||||
let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16);
|
||||
|
||||
let eng = engine.lock().await;
|
||||
match eng.webrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
|
||||
let wrtc = webrtc.lock().await;
|
||||
match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
|
||||
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
|
||||
Err(e) => respond_err(out_tx, &cmd.id, &e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle `webrtc_link` — link a WebRTC session to a SIP call for audio bridging.
|
||||
async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
/// Handle `webrtc_link` — link a WebRTC session to a call's mixer for audio bridging.
|
||||
/// Creates channels, adds WebRTC leg to the call, wires the WebRTC engine.
|
||||
/// Locks are never held simultaneously — no deadlock possible.
|
||||
async fn handle_webrtc_link(
|
||||
engine: Arc<Mutex<ProxyEngine>>,
|
||||
webrtc: Arc<Mutex<WebRtcEngine>>,
|
||||
out_tx: &OutTx,
|
||||
cmd: &Command,
|
||||
) {
|
||||
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
||||
@@ -573,34 +596,67 @@ async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
||||
};
|
||||
let provider_addr = match cmd.params.get("provider_media_addr").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing provider_media_addr"); return; }
|
||||
};
|
||||
let provider_port = match cmd.params.get("provider_media_port").and_then(|v| v.as_u64()) {
|
||||
Some(p) => p as u16,
|
||||
None => { respond_err(out_tx, &cmd.id, "missing provider_media_port"); return; }
|
||||
};
|
||||
let sip_pt = cmd.params.get("sip_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8;
|
||||
|
||||
let provider_media: SocketAddr = match format!("{provider_addr}:{provider_port}").parse() {
|
||||
Ok(a) => a,
|
||||
Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad address: {e}")); return; }
|
||||
};
|
||||
// Create channels for the WebRTC leg.
|
||||
let channels = crate::leg_io::create_leg_channels();
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
let sip_socket = match &eng.transport {
|
||||
Some(t) => t.socket(),
|
||||
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
||||
};
|
||||
// Briefly lock engine to add the WebRTC leg to the call's mixer.
|
||||
{
|
||||
let eng = engine.lock().await;
|
||||
let call = match eng.call_mgr.calls.get(&call_id) {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
// Add to mixer via channel.
|
||||
call.add_leg_to_mixer(
|
||||
&session_id,
|
||||
codec_lib::PT_OPUS,
|
||||
channels.inbound_rx,
|
||||
channels.outbound_tx,
|
||||
)
|
||||
.await;
|
||||
} // engine lock released
|
||||
|
||||
let bridge_info = crate::webrtc_engine::SipBridgeInfo {
|
||||
provider_media,
|
||||
sip_pt,
|
||||
sip_socket,
|
||||
};
|
||||
// Lock webrtc to wire the channels.
|
||||
let mut wrtc = webrtc.lock().await;
|
||||
if wrtc
|
||||
.link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx)
|
||||
.await
|
||||
{
|
||||
// Also store the WebRTC leg info in the call.
|
||||
drop(wrtc); // Release webrtc lock before re-acquiring engine.
|
||||
{
|
||||
let mut eng = engine.lock().await;
|
||||
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
|
||||
call.legs.insert(
|
||||
session_id.clone(),
|
||||
crate::call::LegInfo {
|
||||
id: session_id.clone(),
|
||||
kind: crate::call::LegKind::WebRtc,
|
||||
state: crate::call::LegState::Connected,
|
||||
codec_pt: codec_lib::PT_OPUS,
|
||||
sip_leg: None,
|
||||
sip_call_id: None,
|
||||
webrtc_session_id: Some(session_id.clone()),
|
||||
rtp_socket: None,
|
||||
rtp_port: 0,
|
||||
remote_media: None,
|
||||
signaling_addr: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
emit_event(out_tx, "leg_added", serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"leg_id": session_id,
|
||||
"kind": "webrtc",
|
||||
"state": "connected",
|
||||
}));
|
||||
|
||||
if eng.webrtc.link_to_sip(&session_id, &call_id, bridge_info).await {
|
||||
respond_ok(out_tx, &cmd.id, serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"call_id": call_id,
|
||||
@@ -611,15 +667,108 @@ async fn handle_webrtc_link(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle `add_leg` — add a new SIP leg to an existing call.
|
||||
async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
||||
};
|
||||
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
|
||||
Some(n) => n.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
|
||||
};
|
||||
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
let config_ref = match &eng.config {
|
||||
Some(c) => c.clone(),
|
||||
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
|
||||
};
|
||||
|
||||
// Resolve provider.
|
||||
let provider_config = if let Some(pid) = provider_id {
|
||||
config_ref.providers.iter().find(|p| p.id == pid).cloned()
|
||||
} else {
|
||||
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
|
||||
};
|
||||
|
||||
let provider_config = match provider_config {
|
||||
Some(p) => p,
|
||||
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
|
||||
};
|
||||
|
||||
// Get registered AOR.
|
||||
let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
|
||||
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
|
||||
).await {
|
||||
let ps = ps_arc.lock().await;
|
||||
ps.registered_aor.clone()
|
||||
} else {
|
||||
format!("sip:{}@{}", provider_config.username, provider_config.domain)
|
||||
};
|
||||
|
||||
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
|
||||
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
|
||||
).await {
|
||||
let ps = ps_arc.lock().await;
|
||||
ps.public_ip.clone()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let socket = match &eng.transport {
|
||||
Some(t) => t.socket(),
|
||||
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
||||
};
|
||||
|
||||
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
|
||||
let rtp_pool = rtp_pool.as_mut().unwrap();
|
||||
|
||||
let leg_id = call_mgr.add_external_leg(
|
||||
&call_id, &number, &provider_config, &config_ref,
|
||||
rtp_pool, &socket, public_ip.as_deref(), ®istered_aor,
|
||||
).await;
|
||||
|
||||
match leg_id {
|
||||
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
|
||||
None => respond_err(out_tx, &cmd.id, "failed to add leg"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle `remove_leg` — remove a leg from a call.
|
||||
async fn handle_remove_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
||||
};
|
||||
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
|
||||
};
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
let socket = match &eng.transport {
|
||||
Some(t) => t.socket(),
|
||||
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
||||
};
|
||||
|
||||
if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await {
|
||||
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
||||
} else {
|
||||
respond_err(out_tx, &cmd.id, &format!("call/leg not found"));
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle `webrtc_close` — close a WebRTC session.
|
||||
async fn handle_webrtc_close(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
/// Uses only the WebRTC lock.
|
||||
async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
||||
};
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
match eng.webrtc.close_session(&session_id).await {
|
||||
let mut wrtc = webrtc.lock().await;
|
||||
match wrtc.close_session(&session_id).await {
|
||||
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
|
||||
Err(e) => respond_err(out_tx, &cmd.id, &e),
|
||||
}
|
||||
|
||||
232
rust/crates/proxy-engine/src/mixer.rs
Normal file
232
rust/crates/proxy-engine/src/mixer.rs
Normal file
@@ -0,0 +1,232 @@
|
||||
//! Audio mixer — mix-minus engine for multiparty calls.
|
||||
//!
|
||||
//! Each Call spawns one mixer task. Legs communicate with the mixer via
|
||||
//! tokio mpsc channels — no shared mutable state, no lock contention.
|
||||
//!
|
||||
//! The mixer runs a 20ms tick loop:
|
||||
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
|
||||
//! 2. Compute total mix (sum of all legs' PCM as i32)
|
||||
//! 3. For each leg: mix-minus = total - own, resample to leg codec rate, encode, send
|
||||
|
||||
use crate::ipc::{emit_event, OutTx};
|
||||
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
||||
use codec_lib::{codec_sample_rate, TranscodeState};
|
||||
use std::collections::HashMap;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{self, Duration, MissedTickBehavior};
|
||||
|
||||
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
|
||||
const MIX_RATE: u32 = 16000;
|
||||
/// Samples per 20ms frame at the mixing rate.
|
||||
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
|
||||
|
||||
/// A raw RTP payload received from a leg (no RTP header).
|
||||
pub struct RtpPacket {
|
||||
pub payload: Vec<u8>,
|
||||
pub payload_type: u8,
|
||||
}
|
||||
|
||||
/// Commands sent to the mixer task via a control channel.
|
||||
pub enum MixerCommand {
|
||||
/// Add a new leg to the mix.
|
||||
AddLeg {
|
||||
leg_id: String,
|
||||
codec_pt: u8,
|
||||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
},
|
||||
/// Remove a leg from the mix (channels are dropped, I/O tasks exit).
|
||||
RemoveLeg { leg_id: String },
|
||||
/// Shut down the mixer.
|
||||
Shutdown,
|
||||
}
|
||||
|
||||
/// Internal per-leg state inside the mixer.
|
||||
struct MixerLegSlot {
|
||||
codec_pt: u8,
|
||||
transcoder: TranscodeState,
|
||||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
|
||||
last_pcm_frame: Vec<i16>,
|
||||
/// Number of consecutive ticks with no inbound packet.
|
||||
silent_ticks: u32,
|
||||
// RTP output state.
|
||||
rtp_seq: u16,
|
||||
rtp_ts: u32,
|
||||
rtp_ssrc: u32,
|
||||
}
|
||||
|
||||
/// Spawn the mixer task for a call. Returns the command sender and task handle.
|
||||
pub fn spawn_mixer(
|
||||
call_id: String,
|
||||
out_tx: OutTx,
|
||||
) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
|
||||
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
mixer_loop(call_id, cmd_rx, out_tx).await;
|
||||
});
|
||||
|
||||
(cmd_tx, handle)
|
||||
}
|
||||
|
||||
/// The 20ms mixing loop.
|
||||
async fn mixer_loop(
|
||||
call_id: String,
|
||||
mut cmd_rx: mpsc::Receiver<MixerCommand>,
|
||||
out_tx: OutTx,
|
||||
) {
|
||||
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
|
||||
let mut interval = time::interval(Duration::from_millis(20));
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
// 1. Process control commands (non-blocking).
|
||||
loop {
|
||||
match cmd_rx.try_recv() {
|
||||
Ok(MixerCommand::AddLeg {
|
||||
leg_id,
|
||||
codec_pt,
|
||||
inbound_rx,
|
||||
outbound_tx,
|
||||
}) => {
|
||||
let transcoder = match TranscodeState::new() {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"mixer_error",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"leg_id": leg_id,
|
||||
"error": format!("codec init: {e}"),
|
||||
}),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
legs.insert(
|
||||
leg_id,
|
||||
MixerLegSlot {
|
||||
codec_pt,
|
||||
transcoder,
|
||||
inbound_rx,
|
||||
outbound_tx,
|
||||
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
|
||||
silent_ticks: 0,
|
||||
rtp_seq: 0,
|
||||
rtp_ts: 0,
|
||||
rtp_ssrc: rand::random(),
|
||||
},
|
||||
);
|
||||
}
|
||||
Ok(MixerCommand::RemoveLeg { leg_id }) => {
|
||||
legs.remove(&leg_id);
|
||||
// Channels drop → I/O tasks exit cleanly.
|
||||
}
|
||||
Ok(MixerCommand::Shutdown) => return,
|
||||
Err(mpsc::error::TryRecvError::Empty) => break,
|
||||
Err(mpsc::error::TryRecvError::Disconnected) => return,
|
||||
}
|
||||
}
|
||||
|
||||
if legs.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 2. Drain inbound packets, decode to 16kHz PCM.
|
||||
let leg_ids: Vec<String> = legs.keys().cloned().collect();
|
||||
for lid in &leg_ids {
|
||||
let slot = legs.get_mut(lid).unwrap();
|
||||
|
||||
// Drain channel, keep only the latest packet (simple jitter handling).
|
||||
let mut latest: Option<RtpPacket> = None;
|
||||
loop {
|
||||
match slot.inbound_rx.try_recv() {
|
||||
Ok(pkt) => latest = Some(pkt),
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(pkt) = latest {
|
||||
slot.silent_ticks = 0;
|
||||
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
|
||||
Ok((pcm, rate)) => {
|
||||
// Resample to mixing rate if needed.
|
||||
let pcm_mix = if rate == MIX_RATE {
|
||||
pcm
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&pcm, rate, MIX_RATE)
|
||||
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
|
||||
};
|
||||
// Pad or truncate to exactly MIX_FRAME_SIZE.
|
||||
let mut frame = pcm_mix;
|
||||
frame.resize(MIX_FRAME_SIZE, 0);
|
||||
slot.last_pcm_frame = frame;
|
||||
}
|
||||
Err(_) => {
|
||||
// Decode failed — use silence.
|
||||
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
|
||||
}
|
||||
}
|
||||
} else {
|
||||
slot.silent_ticks += 1;
|
||||
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
|
||||
if slot.silent_ticks > 150 {
|
||||
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Compute total mix (sum of all legs as i32 to avoid overflow).
|
||||
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
|
||||
for slot in legs.values() {
|
||||
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
|
||||
total_mix[i] += s as i32;
|
||||
}
|
||||
}
|
||||
|
||||
// 4. For each leg: mix-minus, resample, encode, send.
|
||||
for slot in legs.values_mut() {
|
||||
// Mix-minus: total minus this leg's own contribution.
|
||||
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
|
||||
for i in 0..MIX_FRAME_SIZE {
|
||||
let sample =
|
||||
(total_mix[i] - slot.last_pcm_frame[i] as i32).clamp(-32768, 32767) as i16;
|
||||
mix_minus.push(sample);
|
||||
}
|
||||
|
||||
// Resample from 16kHz to the leg's codec native rate.
|
||||
let target_rate = codec_sample_rate(slot.codec_pt);
|
||||
let resampled = if target_rate == MIX_RATE {
|
||||
mix_minus
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&mix_minus, MIX_RATE, target_rate)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
// Encode to the leg's codec.
|
||||
let encoded = match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
|
||||
Ok(e) if !e.is_empty() => e,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
// Build RTP packet with header.
|
||||
let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc);
|
||||
let mut rtp = header.to_vec();
|
||||
rtp.extend_from_slice(&encoded);
|
||||
|
||||
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
|
||||
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
|
||||
|
||||
// Non-blocking send — drop frame if channel is full.
|
||||
let _ = slot.outbound_tx.try_send(rtp);
|
||||
}
|
||||
}
|
||||
}
|
||||
475
rust/crates/proxy-engine/src/sip_leg.rs
Normal file
475
rust/crates/proxy-engine/src/sip_leg.rs
Normal file
@@ -0,0 +1,475 @@
|
||||
//! SipLeg — manages one side of a B2BUA call.
|
||||
//!
|
||||
//! Handles the full INVITE lifecycle:
|
||||
//! - Send INVITE with SDP
|
||||
//! - Handle 407 Proxy Authentication (digest auth retry)
|
||||
//! - Handle 200 OK (ACK, learn media endpoint)
|
||||
//! - Handle BYE/CANCEL (teardown)
|
||||
//! - Track SIP dialog state (early → confirmed → terminated)
|
||||
//!
|
||||
//! Ported from ts/call/sip-leg.ts.
|
||||
|
||||
use sip_proto::dialog::{DialogState, SipDialog};
|
||||
use sip_proto::helpers::{
|
||||
build_sdp, compute_digest_auth, generate_branch, generate_tag, parse_digest_challenge,
|
||||
parse_sdp_endpoint, SdpOptions,
|
||||
};
|
||||
use sip_proto::message::{RequestOptions, SipMessage};
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
/// State of a SIP leg.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum LegState {
|
||||
Inviting,
|
||||
Ringing,
|
||||
Connected,
|
||||
Terminating,
|
||||
Terminated,
|
||||
}
|
||||
|
||||
/// Configuration for creating a SIP leg.
|
||||
pub struct SipLegConfig {
|
||||
/// Proxy LAN IP (for Via, Contact, SDP).
|
||||
pub lan_ip: String,
|
||||
/// Proxy LAN port.
|
||||
pub lan_port: u16,
|
||||
/// Public IP (for provider-facing legs).
|
||||
pub public_ip: Option<String>,
|
||||
/// SIP target endpoint (provider outbound proxy or device address).
|
||||
pub sip_target: SocketAddr,
|
||||
/// Provider credentials (for 407 auth).
|
||||
pub username: Option<String>,
|
||||
pub password: Option<String>,
|
||||
pub registered_aor: Option<String>,
|
||||
/// Codec payload types to offer.
|
||||
pub codecs: Vec<u8>,
|
||||
/// Our RTP port for SDP.
|
||||
pub rtp_port: u16,
|
||||
}
|
||||
|
||||
/// A SIP leg with full dialog management.
|
||||
pub struct SipLeg {
|
||||
pub id: String,
|
||||
pub state: LegState,
|
||||
pub config: SipLegConfig,
|
||||
pub dialog: Option<SipDialog>,
|
||||
|
||||
/// The INVITE we sent (needed for CANCEL and 407 ACK).
|
||||
invite: Option<SipMessage>,
|
||||
/// Original unauthenticated INVITE (for re-ACKing retransmitted 407s).
|
||||
orig_invite: Option<SipMessage>,
|
||||
/// Whether we've attempted digest auth.
|
||||
auth_attempted: bool,
|
||||
|
||||
/// Remote media endpoint (learned from SDP in 200 OK).
|
||||
pub remote_media: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
impl SipLeg {
|
||||
pub fn new(id: String, config: SipLegConfig) -> Self {
|
||||
Self {
|
||||
id,
|
||||
state: LegState::Inviting,
|
||||
config,
|
||||
dialog: None,
|
||||
invite: None,
|
||||
orig_invite: None,
|
||||
auth_attempted: false,
|
||||
remote_media: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build and send an INVITE to establish this leg.
|
||||
pub async fn send_invite(
|
||||
&mut self,
|
||||
from_uri: &str,
|
||||
to_uri: &str,
|
||||
sip_call_id: &str,
|
||||
socket: &UdpSocket,
|
||||
) {
|
||||
let ip = self
|
||||
.config
|
||||
.public_ip
|
||||
.as_deref()
|
||||
.unwrap_or(&self.config.lan_ip);
|
||||
|
||||
let sdp = build_sdp(&SdpOptions {
|
||||
ip,
|
||||
port: self.config.rtp_port,
|
||||
payload_types: &self.config.codecs,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let invite = SipMessage::create_request(
|
||||
"INVITE",
|
||||
to_uri,
|
||||
RequestOptions {
|
||||
via_host: ip.to_string(),
|
||||
via_port: self.config.lan_port,
|
||||
via_transport: None,
|
||||
via_branch: Some(generate_branch()),
|
||||
from_uri: from_uri.to_string(),
|
||||
from_display_name: None,
|
||||
from_tag: Some(generate_tag()),
|
||||
to_uri: to_uri.to_string(),
|
||||
to_display_name: None,
|
||||
to_tag: None,
|
||||
call_id: Some(sip_call_id.to_string()),
|
||||
cseq: Some(1),
|
||||
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
|
||||
max_forwards: Some(70),
|
||||
body: Some(sdp),
|
||||
content_type: Some("application/sdp".to_string()),
|
||||
extra_headers: Some(vec![
|
||||
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
|
||||
]),
|
||||
},
|
||||
);
|
||||
|
||||
self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port));
|
||||
self.invite = Some(invite.clone());
|
||||
self.state = LegState::Inviting;
|
||||
|
||||
let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await;
|
||||
}
|
||||
|
||||
/// Handle an incoming SIP message routed to this leg.
|
||||
/// Returns an optional reply to send (e.g. ACK, auth retry INVITE).
|
||||
pub fn handle_message(&mut self, msg: &SipMessage) -> SipLegAction {
|
||||
if msg.is_response() {
|
||||
self.handle_response(msg)
|
||||
} else {
|
||||
self.handle_request(msg)
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_response(&mut self, msg: &SipMessage) -> SipLegAction {
|
||||
let code = msg.status_code().unwrap_or(0);
|
||||
let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase();
|
||||
|
||||
if cseq_method != "INVITE" {
|
||||
return SipLegAction::None;
|
||||
}
|
||||
|
||||
// Handle retransmitted 407 for the original unauthenticated INVITE.
|
||||
if self.auth_attempted {
|
||||
if let Some(dialog) = &self.dialog {
|
||||
let response_cseq: u32 = msg
|
||||
.get_header("CSeq")
|
||||
.and_then(|s| s.split_whitespace().next())
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(0);
|
||||
if response_cseq < dialog.local_cseq && code >= 400 {
|
||||
// ACK the retransmitted error response.
|
||||
if let Some(orig) = &self.orig_invite {
|
||||
let ack = build_non_2xx_ack(orig, msg);
|
||||
return SipLegAction::Send(ack.serialize());
|
||||
}
|
||||
return SipLegAction::None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle 407 Proxy Authentication Required.
|
||||
if code == 407 {
|
||||
return self.handle_auth_challenge(msg);
|
||||
}
|
||||
|
||||
// Update dialog state.
|
||||
if let Some(dialog) = &mut self.dialog {
|
||||
dialog.process_response(msg);
|
||||
}
|
||||
|
||||
if code == 180 || code == 183 {
|
||||
self.state = LegState::Ringing;
|
||||
SipLegAction::StateChange(LegState::Ringing)
|
||||
} else if code >= 200 && code < 300 {
|
||||
// ACK the 200 OK.
|
||||
let ack_buf = if let Some(dialog) = &self.dialog {
|
||||
let ack = dialog.create_ack();
|
||||
Some(ack.serialize())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// If already connected (200 retransmit), just re-ACK.
|
||||
if self.state == LegState::Connected {
|
||||
return match ack_buf {
|
||||
Some(buf) => SipLegAction::Send(buf),
|
||||
None => SipLegAction::None,
|
||||
};
|
||||
}
|
||||
|
||||
// Learn media endpoint from SDP.
|
||||
if msg.has_sdp_body() {
|
||||
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
|
||||
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
|
||||
self.remote_media = Some(addr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.state = LegState::Connected;
|
||||
|
||||
match ack_buf {
|
||||
Some(buf) => SipLegAction::ConnectedWithAck(buf),
|
||||
None => SipLegAction::StateChange(LegState::Connected),
|
||||
}
|
||||
} else if code >= 300 {
|
||||
self.state = LegState::Terminated;
|
||||
if let Some(dialog) = &mut self.dialog {
|
||||
dialog.terminate();
|
||||
}
|
||||
SipLegAction::Terminated(format!("rejected_{code}"))
|
||||
} else {
|
||||
SipLegAction::None // 1xx provisional
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_auth_challenge(&mut self, msg: &SipMessage) -> SipLegAction {
|
||||
if self.auth_attempted {
|
||||
self.state = LegState::Terminated;
|
||||
if let Some(dialog) = &mut self.dialog {
|
||||
dialog.terminate();
|
||||
}
|
||||
return SipLegAction::Terminated("auth_rejected".to_string());
|
||||
}
|
||||
self.auth_attempted = true;
|
||||
|
||||
let challenge_header = match msg.get_header("Proxy-Authenticate") {
|
||||
Some(h) => h,
|
||||
None => {
|
||||
self.state = LegState::Terminated;
|
||||
return SipLegAction::Terminated("407_no_challenge".to_string());
|
||||
}
|
||||
};
|
||||
|
||||
let challenge = match parse_digest_challenge(challenge_header) {
|
||||
Some(c) => c,
|
||||
None => {
|
||||
self.state = LegState::Terminated;
|
||||
return SipLegAction::Terminated("407_bad_challenge".to_string());
|
||||
}
|
||||
};
|
||||
|
||||
let password = match &self.config.password {
|
||||
Some(p) => p.clone(),
|
||||
None => {
|
||||
self.state = LegState::Terminated;
|
||||
return SipLegAction::Terminated("407_no_password".to_string());
|
||||
}
|
||||
};
|
||||
|
||||
let aor = match &self.config.registered_aor {
|
||||
Some(a) => a.clone(),
|
||||
None => {
|
||||
self.state = LegState::Terminated;
|
||||
return SipLegAction::Terminated("407_no_aor".to_string());
|
||||
}
|
||||
};
|
||||
|
||||
let username = aor
|
||||
.trim_start_matches("sip:")
|
||||
.trim_start_matches("sips:")
|
||||
.split('@')
|
||||
.next()
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
|
||||
let dest_uri = self
|
||||
.invite
|
||||
.as_ref()
|
||||
.and_then(|i| i.request_uri())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
|
||||
let auth_value = compute_digest_auth(
|
||||
&username,
|
||||
&password,
|
||||
&challenge.realm,
|
||||
&challenge.nonce,
|
||||
"INVITE",
|
||||
&dest_uri,
|
||||
challenge.algorithm.as_deref(),
|
||||
challenge.opaque.as_deref(),
|
||||
);
|
||||
|
||||
// ACK the 407.
|
||||
let mut ack_buf = None;
|
||||
if let Some(invite) = &self.invite {
|
||||
let ack = build_non_2xx_ack(invite, msg);
|
||||
ack_buf = Some(ack.serialize());
|
||||
}
|
||||
|
||||
// Save original INVITE for retransmission handling.
|
||||
self.orig_invite = self.invite.clone();
|
||||
|
||||
// Build authenticated INVITE with same From tag, CSeq=2.
|
||||
let ip = self
|
||||
.config
|
||||
.public_ip
|
||||
.as_deref()
|
||||
.unwrap_or(&self.config.lan_ip);
|
||||
let from_tag = self
|
||||
.dialog
|
||||
.as_ref()
|
||||
.map(|d| d.local_tag.clone())
|
||||
.unwrap_or_else(generate_tag);
|
||||
|
||||
let sdp = build_sdp(&SdpOptions {
|
||||
ip,
|
||||
port: self.config.rtp_port,
|
||||
payload_types: &self.config.codecs,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let call_id = self
|
||||
.dialog
|
||||
.as_ref()
|
||||
.map(|d| d.call_id.clone())
|
||||
.unwrap_or_default();
|
||||
|
||||
let invite_auth = SipMessage::create_request(
|
||||
"INVITE",
|
||||
&dest_uri,
|
||||
RequestOptions {
|
||||
via_host: ip.to_string(),
|
||||
via_port: self.config.lan_port,
|
||||
via_transport: None,
|
||||
via_branch: Some(generate_branch()),
|
||||
from_uri: aor,
|
||||
from_display_name: None,
|
||||
from_tag: Some(from_tag),
|
||||
to_uri: dest_uri.clone(),
|
||||
to_display_name: None,
|
||||
to_tag: None,
|
||||
call_id: Some(call_id),
|
||||
cseq: Some(2),
|
||||
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
|
||||
max_forwards: Some(70),
|
||||
body: Some(sdp),
|
||||
content_type: Some("application/sdp".to_string()),
|
||||
extra_headers: Some(vec![
|
||||
("Proxy-Authorization".to_string(), auth_value),
|
||||
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
|
||||
]),
|
||||
},
|
||||
);
|
||||
|
||||
self.invite = Some(invite_auth.clone());
|
||||
if let Some(dialog) = &mut self.dialog {
|
||||
dialog.local_cseq = 2;
|
||||
}
|
||||
|
||||
// Return both the ACK for the 407 and the new authenticated INVITE.
|
||||
let invite_buf = invite_auth.serialize();
|
||||
SipLegAction::AuthRetry {
|
||||
ack_407: ack_buf,
|
||||
invite_with_auth: invite_buf,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_request(&mut self, msg: &SipMessage) -> SipLegAction {
|
||||
let method = msg.method().unwrap_or("");
|
||||
|
||||
if method == "BYE" {
|
||||
let ok = SipMessage::create_response(200, "OK", msg, None);
|
||||
self.state = LegState::Terminated;
|
||||
if let Some(dialog) = &mut self.dialog {
|
||||
dialog.terminate();
|
||||
}
|
||||
return SipLegAction::SendAndTerminate(ok.serialize(), "bye".to_string());
|
||||
}
|
||||
|
||||
if method == "INFO" {
|
||||
let ok = SipMessage::create_response(200, "OK", msg, None);
|
||||
return SipLegAction::Send(ok.serialize());
|
||||
}
|
||||
|
||||
SipLegAction::None
|
||||
}
|
||||
|
||||
/// Build a BYE or CANCEL to tear down this leg.
|
||||
pub fn build_hangup(&mut self) -> Option<Vec<u8>> {
|
||||
let dialog = self.dialog.as_mut()?;
|
||||
|
||||
let msg = if dialog.state == DialogState::Confirmed {
|
||||
dialog.create_request("BYE", None, None, None)
|
||||
} else if dialog.state == DialogState::Early {
|
||||
if let Some(invite) = &self.invite {
|
||||
dialog.create_cancel(invite)
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
|
||||
self.state = LegState::Terminating;
|
||||
dialog.terminate();
|
||||
Some(msg.serialize())
|
||||
}
|
||||
|
||||
/// Get the SIP Call-ID for routing.
|
||||
pub fn sip_call_id(&self) -> Option<&str> {
|
||||
self.dialog.as_ref().map(|d| d.call_id.as_str())
|
||||
}
|
||||
}
|
||||
|
||||
/// Actions produced by the SipLeg message handler.
|
||||
pub enum SipLegAction {
|
||||
/// No action needed.
|
||||
None,
|
||||
/// Send a SIP message (ACK, 200 OK to INFO, etc.).
|
||||
Send(Vec<u8>),
|
||||
/// Leg state changed.
|
||||
StateChange(LegState),
|
||||
/// Connected — send this ACK.
|
||||
ConnectedWithAck(Vec<u8>),
|
||||
/// Terminated with a reason.
|
||||
Terminated(String),
|
||||
/// Send 200 OK and terminate.
|
||||
SendAndTerminate(Vec<u8>, String),
|
||||
/// 407 auth retry — send ACK for 407, then send new INVITE with auth.
|
||||
AuthRetry {
|
||||
ack_407: Option<Vec<u8>>,
|
||||
invite_with_auth: Vec<u8>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Build an ACK for a non-2xx response (same transaction as the INVITE).
|
||||
fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage {
|
||||
let via = original_invite.get_header("Via").unwrap_or("").to_string();
|
||||
let from = original_invite
|
||||
.get_header("From")
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let to = response.get_header("To").unwrap_or("").to_string();
|
||||
let call_id = original_invite.call_id().to_string();
|
||||
let cseq_num: u32 = original_invite
|
||||
.get_header("CSeq")
|
||||
.and_then(|s| s.split_whitespace().next())
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(1);
|
||||
|
||||
let ruri = original_invite
|
||||
.request_uri()
|
||||
.unwrap_or("sip:unknown")
|
||||
.to_string();
|
||||
|
||||
SipMessage::new(
|
||||
format!("ACK {ruri} SIP/2.0"),
|
||||
vec![
|
||||
("Via".to_string(), via),
|
||||
("From".to_string(), from),
|
||||
("To".to_string(), to),
|
||||
("Call-ID".to_string(), call_id),
|
||||
("CSeq".to_string(), format!("{cseq_num} ACK")),
|
||||
("Max-Forwards".to_string(), "70".to_string()),
|
||||
("Content-Length".to_string(), "0".to_string()),
|
||||
],
|
||||
String::new(),
|
||||
)
|
||||
}
|
||||
@@ -1,16 +1,17 @@
|
||||
//! WebRTC engine — manages browser PeerConnections with SIP audio bridging.
|
||||
//! WebRTC engine — manages browser PeerConnections.
|
||||
//!
|
||||
//! Browser Opus audio → Rust PeerConnection → transcode via codec-lib → SIP RTP
|
||||
//! SIP RTP → transcode via codec-lib → Rust PeerConnection → Browser Opus
|
||||
//! Audio bridging is now channel-based:
|
||||
//! - Browser Opus audio → on_track → mixer inbound channel
|
||||
//! - Mixer outbound channel → Opus RTP → TrackLocalStaticRTP → browser
|
||||
//!
|
||||
//! The mixer handles all transcoding. The WebRTC engine just shuttles raw Opus.
|
||||
|
||||
use crate::ipc::{emit_event, OutTx};
|
||||
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
||||
use codec_lib::{TranscodeState, PT_G722, PT_OPUS};
|
||||
use crate::mixer::RtpPacket;
|
||||
use codec_lib::PT_OPUS;
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use webrtc::api::media_engine::MediaEngine;
|
||||
use webrtc::api::APIBuilder;
|
||||
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
|
||||
@@ -22,24 +23,14 @@ use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
|
||||
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
|
||||
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
|
||||
|
||||
/// SIP-side bridge info for a WebRTC session.
|
||||
#[derive(Clone)]
|
||||
pub struct SipBridgeInfo {
|
||||
/// Provider's media endpoint (RTP destination).
|
||||
pub provider_media: SocketAddr,
|
||||
/// Provider's codec payload type (e.g. 9 for G.722).
|
||||
pub sip_pt: u8,
|
||||
/// The SIP UDP socket for sending RTP to the provider.
|
||||
pub sip_socket: Arc<UdpSocket>,
|
||||
}
|
||||
|
||||
/// A managed WebRTC session.
|
||||
struct WebRtcSession {
|
||||
pc: Arc<RTCPeerConnection>,
|
||||
local_track: Arc<TrackLocalStaticRTP>,
|
||||
call_id: Option<String>,
|
||||
/// SIP bridge — set when the session is linked to a call.
|
||||
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
|
||||
/// Channel sender for forwarding browser Opus audio to the mixer.
|
||||
/// Set when the session is linked to a call via link_to_mixer().
|
||||
mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>>,
|
||||
}
|
||||
|
||||
/// Manages all WebRTC sessions.
|
||||
@@ -56,7 +47,7 @@ impl WebRtcEngine {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a WebRTC offer from a browser.
|
||||
/// Handle a WebRTC offer from a browser — create PeerConnection, return SDP answer.
|
||||
pub async fn handle_offer(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
@@ -99,8 +90,9 @@ impl WebRtcEngine {
|
||||
.await
|
||||
.map_err(|e| format!("add track: {e}"))?;
|
||||
|
||||
// Shared SIP bridge info (populated when linked to a call).
|
||||
let sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>> = Arc::new(Mutex::new(None));
|
||||
// Shared mixer channel sender (populated when linked to a call).
|
||||
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> =
|
||||
Arc::new(Mutex::new(None));
|
||||
|
||||
// ICE candidate handler.
|
||||
let out_tx_ice = self.out_tx.clone();
|
||||
@@ -151,14 +143,14 @@ impl WebRtcEngine {
|
||||
}));
|
||||
|
||||
// Track handler — receives Opus audio from the browser.
|
||||
// When SIP bridge is set, transcodes and forwards to provider.
|
||||
// Forwards raw Opus payload to the mixer channel (when linked).
|
||||
let out_tx_track = self.out_tx.clone();
|
||||
let sid_track = session_id.to_string();
|
||||
let sip_bridge_for_track = sip_bridge.clone();
|
||||
let mixer_tx_for_track = mixer_tx.clone();
|
||||
pc.on_track(Box::new(move |track, _receiver, _transceiver| {
|
||||
let out_tx = out_tx_track.clone();
|
||||
let sid = sid_track.clone();
|
||||
let bridge = sip_bridge_for_track.clone();
|
||||
let mixer_tx = mixer_tx_for_track.clone();
|
||||
Box::pin(async move {
|
||||
let codec_info = track.codec();
|
||||
emit_event(
|
||||
@@ -171,8 +163,8 @@ impl WebRtcEngine {
|
||||
}),
|
||||
);
|
||||
|
||||
// Spawn the browser→SIP audio forwarding task.
|
||||
tokio::spawn(browser_to_sip_loop(track, bridge, out_tx, sid));
|
||||
// Spawn browser→mixer forwarding task.
|
||||
tokio::spawn(browser_to_mixer_loop(track, mixer_tx, out_tx, sid));
|
||||
})
|
||||
}));
|
||||
|
||||
@@ -199,67 +191,41 @@ impl WebRtcEngine {
|
||||
pc,
|
||||
local_track,
|
||||
call_id: None,
|
||||
sip_bridge,
|
||||
mixer_tx,
|
||||
},
|
||||
);
|
||||
|
||||
Ok(answer_sdp)
|
||||
}
|
||||
|
||||
/// Link a WebRTC session to a SIP call — sets up the audio bridge.
|
||||
pub async fn link_to_sip(
|
||||
/// Link a WebRTC session to a call's mixer via channels.
|
||||
/// - `inbound_tx`: browser audio goes TO the mixer through this channel
|
||||
/// - `outbound_rx`: mixed audio comes FROM the mixer through this channel
|
||||
pub async fn link_to_mixer(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bridge_info: SipBridgeInfo,
|
||||
inbound_tx: mpsc::Sender<RtpPacket>,
|
||||
outbound_rx: mpsc::Receiver<Vec<u8>>,
|
||||
) -> bool {
|
||||
if let Some(session) = self.sessions.get_mut(session_id) {
|
||||
session.call_id = Some(call_id.to_string());
|
||||
let mut bridge = session.sip_bridge.lock().await;
|
||||
*bridge = Some(bridge_info);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
let session = match self.sessions.get_mut(session_id) {
|
||||
Some(s) => s,
|
||||
None => return false,
|
||||
};
|
||||
|
||||
/// Send transcoded audio from the SIP side to the browser.
|
||||
/// Called by the RTP relay when it receives a packet from the provider.
|
||||
pub async fn forward_sip_to_browser(
|
||||
&self,
|
||||
session_id: &str,
|
||||
sip_rtp_payload: &[u8],
|
||||
sip_pt: u8,
|
||||
) -> Result<(), String> {
|
||||
let session = self
|
||||
.sessions
|
||||
.get(session_id)
|
||||
.ok_or_else(|| format!("session {session_id} not found"))?;
|
||||
session.call_id = Some(call_id.to_string());
|
||||
|
||||
// Transcode SIP codec → Opus.
|
||||
// We create a temporary TranscodeState per packet for simplicity.
|
||||
// TODO: Use a per-session persistent state for proper codec continuity.
|
||||
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec: {e}"))?;
|
||||
let opus_payload = transcoder
|
||||
.transcode(sip_rtp_payload, sip_pt, PT_OPUS, Some("to_browser"))
|
||||
.map_err(|e| format!("transcode: {e}"))?;
|
||||
|
||||
if opus_payload.is_empty() {
|
||||
return Ok(());
|
||||
// Set the mixer sender so the on_track loop starts forwarding.
|
||||
{
|
||||
let mut tx = session.mixer_tx.lock().await;
|
||||
*tx = Some(inbound_tx);
|
||||
}
|
||||
|
||||
// Build RTP header for Opus.
|
||||
// TODO: Track seq/ts/ssrc per session for proper continuity.
|
||||
let header = build_rtp_header(PT_OPUS, 0, 0, 0);
|
||||
let mut packet = header.to_vec();
|
||||
packet.extend_from_slice(&opus_payload);
|
||||
// Spawn mixer→browser outbound task.
|
||||
let local_track = session.local_track.clone();
|
||||
tokio::spawn(mixer_to_browser_loop(outbound_rx, local_track));
|
||||
|
||||
session
|
||||
.local_track
|
||||
.write(&packet)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|e| format!("write: {e}"))
|
||||
true
|
||||
}
|
||||
|
||||
pub async fn add_ice_candidate(
|
||||
@@ -294,90 +260,48 @@ impl WebRtcEngine {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn has_session(&self, session_id: &str) -> bool {
|
||||
self.sessions.contains_key(session_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Browser → SIP audio forwarding loop.
|
||||
/// Reads Opus RTP from the browser, transcodes to the SIP codec, sends to provider.
|
||||
async fn browser_to_sip_loop(
|
||||
/// Browser → Mixer audio forwarding loop.
|
||||
/// Reads Opus RTP from the browser track, sends raw Opus payload to the mixer channel.
|
||||
async fn browser_to_mixer_loop(
|
||||
track: Arc<webrtc::track::track_remote::TrackRemote>,
|
||||
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
|
||||
mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>>,
|
||||
out_tx: OutTx,
|
||||
session_id: String,
|
||||
) {
|
||||
// Create a persistent codec state for this direction.
|
||||
let mut transcoder = match TranscodeState::new() {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"webrtc_error",
|
||||
serde_json::json!({ "session_id": session_id, "error": format!("codec init: {e}") }),
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut buf = vec![0u8; 1500];
|
||||
let mut count = 0u64;
|
||||
let mut to_sip_seq: u16 = 0;
|
||||
let mut to_sip_ts: u32 = 0;
|
||||
let to_sip_ssrc: u32 = rand::random();
|
||||
|
||||
loop {
|
||||
match track.read(&mut buf).await {
|
||||
Ok((rtp_packet, _attributes)) => {
|
||||
count += 1;
|
||||
|
||||
// Get the SIP bridge info (may not be set yet if call isn't linked).
|
||||
let bridge = sip_bridge.lock().await;
|
||||
let bridge_info = match bridge.as_ref() {
|
||||
Some(b) => b.clone(),
|
||||
None => continue, // Not linked to a SIP call yet — drop the packet.
|
||||
};
|
||||
drop(bridge); // Release lock before doing I/O.
|
||||
|
||||
// Extract Opus payload from the RTP packet (skip 12-byte header).
|
||||
let payload = &rtp_packet.payload;
|
||||
if payload.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Transcode Opus → SIP codec (e.g. G.722).
|
||||
let sip_payload = match transcoder.transcode(
|
||||
payload,
|
||||
PT_OPUS,
|
||||
bridge_info.sip_pt,
|
||||
Some("to_sip"),
|
||||
) {
|
||||
Ok(p) if !p.is_empty() => p,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
// Build SIP RTP packet.
|
||||
let header = build_rtp_header(bridge_info.sip_pt, to_sip_seq, to_sip_ts, to_sip_ssrc);
|
||||
let mut sip_rtp = header.to_vec();
|
||||
sip_rtp.extend_from_slice(&sip_payload);
|
||||
|
||||
to_sip_seq = to_sip_seq.wrapping_add(1);
|
||||
to_sip_ts = to_sip_ts.wrapping_add(rtp_clock_increment(bridge_info.sip_pt));
|
||||
|
||||
// Send to provider.
|
||||
let _ = bridge_info
|
||||
.sip_socket
|
||||
.send_to(&sip_rtp, bridge_info.provider_media)
|
||||
.await;
|
||||
// Send raw Opus payload to mixer (if linked).
|
||||
let tx = mixer_tx.lock().await;
|
||||
if let Some(ref tx) = *tx {
|
||||
let _ = tx
|
||||
.send(RtpPacket {
|
||||
payload: payload.to_vec(),
|
||||
payload_type: PT_OPUS,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
drop(tx);
|
||||
|
||||
if count == 1 || count == 50 || count % 500 == 0 {
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"webrtc_audio_tx",
|
||||
"webrtc_audio_rx",
|
||||
serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"direction": "browser_to_sip",
|
||||
"direction": "browser_to_mixer",
|
||||
"packet_count": count,
|
||||
}),
|
||||
);
|
||||
@@ -387,3 +311,14 @@ async fn browser_to_sip_loop(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Mixer → Browser audio forwarding loop.
|
||||
/// Reads Opus-encoded RTP packets from the mixer and writes to the WebRTC track.
|
||||
async fn mixer_to_browser_loop(
|
||||
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
|
||||
local_track: Arc<TrackLocalStaticRTP>,
|
||||
) {
|
||||
while let Some(rtp_data) = outbound_rx.recv().await {
|
||||
let _ = local_track.write(&rtp_data).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: 'siprouter',
|
||||
version: '1.12.0',
|
||||
version: '1.14.0',
|
||||
description: 'undefined'
|
||||
}
|
||||
|
||||
@@ -128,14 +128,14 @@ async function handleRequest(
|
||||
}
|
||||
}
|
||||
|
||||
// API: add leg to call.
|
||||
// API: add leg to call (device — not yet implemented, needs device-to-call routing).
|
||||
if (url.pathname.startsWith('/api/call/') && url.pathname.endsWith('/addleg') && method === 'POST') {
|
||||
try {
|
||||
const callId = url.pathname.split('/')[3];
|
||||
const body = await readJsonBody(req);
|
||||
if (!body?.deviceId) return sendJson(res, { ok: false, error: 'missing deviceId' }, 400);
|
||||
const ok = callManager?.addDeviceToCall(callId, body.deviceId) ?? false;
|
||||
return sendJson(res, { ok });
|
||||
// TODO: implement device leg addition (needs SIP INVITE to device).
|
||||
return sendJson(res, { ok: false, error: 'not yet implemented' }, 501);
|
||||
} catch (e: any) {
|
||||
return sendJson(res, { ok: false, error: e.message }, 400);
|
||||
}
|
||||
@@ -147,8 +147,9 @@ async function handleRequest(
|
||||
const callId = url.pathname.split('/')[3];
|
||||
const body = await readJsonBody(req);
|
||||
if (!body?.number) return sendJson(res, { ok: false, error: 'missing number' }, 400);
|
||||
const ok = callManager?.addExternalToCall(callId, body.number, body.providerId) ?? false;
|
||||
return sendJson(res, { ok });
|
||||
const { addLeg: addLegFn } = await import('./proxybridge.ts');
|
||||
const legId = await addLegFn(callId, body.number, body.providerId);
|
||||
return sendJson(res, { ok: !!legId, legId });
|
||||
} catch (e: any) {
|
||||
return sendJson(res, { ok: false, error: e.message }, 400);
|
||||
}
|
||||
@@ -160,22 +161,22 @@ async function handleRequest(
|
||||
const callId = url.pathname.split('/')[3];
|
||||
const body = await readJsonBody(req);
|
||||
if (!body?.legId) return sendJson(res, { ok: false, error: 'missing legId' }, 400);
|
||||
const ok = callManager?.removeLegFromCall(callId, body.legId) ?? false;
|
||||
const { removeLeg: removeLegFn } = await import('./proxybridge.ts');
|
||||
const ok = await removeLegFn(callId, body.legId);
|
||||
return sendJson(res, { ok });
|
||||
} catch (e: any) {
|
||||
return sendJson(res, { ok: false, error: e.message }, 400);
|
||||
}
|
||||
}
|
||||
|
||||
// API: transfer leg.
|
||||
// API: transfer leg (not yet implemented).
|
||||
if (url.pathname === '/api/transfer' && method === 'POST') {
|
||||
try {
|
||||
const body = await readJsonBody(req);
|
||||
if (!body?.sourceCallId || !body?.legId || !body?.targetCallId) {
|
||||
return sendJson(res, { ok: false, error: 'missing sourceCallId, legId, or targetCallId' }, 400);
|
||||
}
|
||||
const ok = callManager?.transferLeg(body.sourceCallId, body.legId, body.targetCallId) ?? false;
|
||||
return sendJson(res, { ok });
|
||||
return sendJson(res, { ok: false, error: 'not yet implemented' }, 501);
|
||||
} catch (e: any) {
|
||||
return sendJson(res, { ok: false, error: e.message }, 400);
|
||||
}
|
||||
@@ -339,11 +340,13 @@ export function initWebUi(
|
||||
onHangupCall: (callId: string) => boolean,
|
||||
onConfigSaved?: () => void,
|
||||
callManager?: CallManager,
|
||||
voiceboxManager?: VoiceboxManager,
|
||||
/** WebRTC signaling handlers — forwarded to Rust proxy-engine. */
|
||||
onWebRtcOffer?: (sessionId: string, sdp: string, ws: WebSocket) => Promise<void>,
|
||||
onWebRtcIce?: (sessionId: string, candidate: any) => Promise<void>,
|
||||
onWebRtcClose?: (sessionId: string) => Promise<void>,
|
||||
voiceboxManager?: VoiceboxManager,
|
||||
/** Called when browser sends webrtc-accept (callId + sessionId linking). */
|
||||
onWebRtcAccept?: (callId: string, sessionId: string) => void,
|
||||
): void {
|
||||
const WEB_PORT = 3060;
|
||||
|
||||
@@ -382,6 +385,7 @@ export function initWebUi(
|
||||
if (msg.type === 'webrtc-offer' && msg.sessionId) {
|
||||
// Forward to Rust proxy-engine for WebRTC handling.
|
||||
if (onWebRtcOffer) {
|
||||
log(`[webrtc-ws] offer msg keys: ${Object.keys(msg).join(',')}, sdp type: ${typeof msg.sdp}, sdp len: ${msg.sdp?.length || 0}`);
|
||||
onWebRtcOffer(msg.sessionId, msg.sdp, socket as any).catch((e: any) =>
|
||||
log(`[webrtc] offer error: ${e.message}`));
|
||||
}
|
||||
@@ -394,8 +398,10 @@ export function initWebUi(
|
||||
onWebRtcClose(msg.sessionId).catch(() => {});
|
||||
}
|
||||
} else if (msg.type === 'webrtc-accept' && msg.callId) {
|
||||
// TODO: Wire to Rust call linking.
|
||||
log(`[webrtc] accept: call=${msg.callId} session=${msg.sessionId || 'none'}`);
|
||||
if (onWebRtcAccept && msg.sessionId) {
|
||||
onWebRtcAccept(msg.callId, msg.sessionId);
|
||||
}
|
||||
} else if (msg.type?.startsWith('webrtc-')) {
|
||||
msg._remoteIp = remoteIp;
|
||||
handleWebRtcSignaling(socket as any, msg);
|
||||
|
||||
@@ -238,6 +238,38 @@ export async function webrtcLink(sessionId: string, callId: string, providerMedi
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an external SIP leg to an existing call (multiparty).
|
||||
*/
|
||||
export async function addLeg(callId: string, number: string, providerId?: string): Promise<string | null> {
|
||||
if (!bridge || !initialized) return null;
|
||||
try {
|
||||
const result = await bridge.sendCommand('add_leg', {
|
||||
call_id: callId,
|
||||
number,
|
||||
provider_id: providerId,
|
||||
} as any);
|
||||
return (result as any)?.leg_id || null;
|
||||
} catch (e: any) {
|
||||
logFn?.(`[proxy-engine] add_leg error: ${e?.message || e}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a leg from a call.
|
||||
*/
|
||||
export async function removeLeg(callId: string, legId: string): Promise<boolean> {
|
||||
if (!bridge || !initialized) return false;
|
||||
try {
|
||||
await bridge.sendCommand('remove_leg', { call_id: callId, leg_id: legId } as any);
|
||||
return true;
|
||||
} catch (e: any) {
|
||||
logFn?.(`[proxy-engine] remove_leg error: ${e?.message || e}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close a WebRTC session.
|
||||
*/
|
||||
|
||||
100
ts/sipproxy.ts
100
ts/sipproxy.ts
@@ -37,7 +37,10 @@ import {
|
||||
shutdownProxyEngine,
|
||||
webrtcOffer,
|
||||
webrtcIce,
|
||||
webrtcLink,
|
||||
webrtcClose,
|
||||
addLeg,
|
||||
removeLeg,
|
||||
} from './proxybridge.ts';
|
||||
import type {
|
||||
IIncomingCallEvent,
|
||||
@@ -118,6 +121,12 @@ const activeCalls = new Map<string, IActiveCall>();
|
||||
const callHistory: ICallHistoryEntry[] = [];
|
||||
const MAX_HISTORY = 100;
|
||||
|
||||
// WebRTC session ↔ call linking state.
|
||||
// Both pieces (session accept + call media info) can arrive in any order.
|
||||
const webrtcSessionToCall = new Map<string, string>(); // sessionId → callId
|
||||
const webrtcCallToSession = new Map<string, string>(); // callId → sessionId
|
||||
const pendingCallMedia = new Map<string, { addr: string; port: number; sipPt: number }>(); // callId → provider media info
|
||||
|
||||
// Initialize provider statuses from config (all start as unregistered).
|
||||
for (const p of appConfig.providers) {
|
||||
providerStatuses.set(p.id, {
|
||||
@@ -271,6 +280,17 @@ async function startProxyEngine(): Promise<void> {
|
||||
state: 'setting-up',
|
||||
startedAt: Date.now(),
|
||||
});
|
||||
|
||||
// Notify all browser devices — they can connect via WebRTC to listen/talk.
|
||||
const browserIds = getAllBrowserDeviceIds();
|
||||
for (const bid of browserIds) {
|
||||
sendToBrowserDevice(bid, {
|
||||
type: 'webrtc-incoming',
|
||||
callId: data.call_id,
|
||||
from: data.number,
|
||||
deviceId: bid,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
onProxyEvent('call_ringing', (data: { call_id: string }) => {
|
||||
@@ -278,12 +298,33 @@ async function startProxyEngine(): Promise<void> {
|
||||
if (call) call.state = 'ringing';
|
||||
});
|
||||
|
||||
onProxyEvent('call_answered', (data: { call_id: string }) => {
|
||||
onProxyEvent('call_answered', (data: { call_id: string; provider_media_addr?: string; provider_media_port?: number; sip_pt?: number }) => {
|
||||
const call = activeCalls.get(data.call_id);
|
||||
if (call) {
|
||||
call.state = 'connected';
|
||||
log(`[call] ${data.call_id} connected`);
|
||||
}
|
||||
|
||||
// Try to link WebRTC session to this call for audio bridging.
|
||||
if (data.provider_media_addr && data.provider_media_port) {
|
||||
const sessionId = webrtcCallToSession.get(data.call_id);
|
||||
if (sessionId) {
|
||||
// Both session and media info available — link now.
|
||||
const sipPt = data.sip_pt ?? 9;
|
||||
log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${data.call_id} media=${data.provider_media_addr}:${data.provider_media_port} pt=${sipPt}`);
|
||||
webrtcLink(sessionId, data.call_id, data.provider_media_addr, data.provider_media_port, sipPt).then((ok) => {
|
||||
log(`[webrtc] link result: ${ok}`);
|
||||
});
|
||||
} else {
|
||||
// Session not yet accepted — store media info for when it arrives.
|
||||
pendingCallMedia.set(data.call_id, {
|
||||
addr: data.provider_media_addr,
|
||||
port: data.provider_media_port,
|
||||
sipPt: data.sip_pt ?? 9,
|
||||
});
|
||||
log(`[webrtc] media info cached for call=${data.call_id}, waiting for session accept`);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
onProxyEvent('call_ended', (data: ICallEndedEvent) => {
|
||||
@@ -301,6 +342,18 @@ async function startProxyEngine(): Promise<void> {
|
||||
});
|
||||
if (callHistory.length > MAX_HISTORY) callHistory.pop();
|
||||
activeCalls.delete(data.call_id);
|
||||
|
||||
// Notify browser(s) that the call ended.
|
||||
broadcastWs('webrtc-call-ended', { callId: data.call_id });
|
||||
|
||||
// Clean up WebRTC session mappings.
|
||||
const sessionId = webrtcCallToSession.get(data.call_id);
|
||||
if (sessionId) {
|
||||
webrtcCallToSession.delete(data.call_id);
|
||||
webrtcSessionToCall.delete(sessionId);
|
||||
webrtcClose(sessionId).catch(() => {});
|
||||
}
|
||||
pendingCallMedia.delete(data.call_id);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -308,6 +361,19 @@ async function startProxyEngine(): Promise<void> {
|
||||
log(`[sip] unhandled ${data.method_or_status} Call-ID=${data.call_id?.slice(0, 20)} from=${data.from_addr}:${data.from_port}`);
|
||||
});
|
||||
|
||||
// Leg events (multiparty).
|
||||
onProxyEvent('leg_added', (data: any) => {
|
||||
log(`[leg] added: call=${data.call_id} leg=${data.leg_id} kind=${data.kind} state=${data.state}`);
|
||||
});
|
||||
|
||||
onProxyEvent('leg_removed', (data: any) => {
|
||||
log(`[leg] removed: call=${data.call_id} leg=${data.leg_id}`);
|
||||
});
|
||||
|
||||
onProxyEvent('leg_state_changed', (data: any) => {
|
||||
log(`[leg] state: call=${data.call_id} leg=${data.leg_id} → ${data.state}`);
|
||||
});
|
||||
|
||||
// WebRTC events from Rust — forward ICE candidates to browser via WebSocket.
|
||||
onProxyEvent('webrtc_ice_candidate', (data: any) => {
|
||||
// Find the browser's WebSocket by session ID and send the ICE candidate.
|
||||
@@ -467,14 +533,22 @@ initWebUi(
|
||||
}
|
||||
},
|
||||
undefined, // callManager — legacy, replaced by Rust proxy-engine
|
||||
voiceboxManager,
|
||||
voiceboxManager, // voiceboxManager
|
||||
// WebRTC signaling → forwarded to Rust proxy-engine.
|
||||
async (sessionId, sdp, ws) => {
|
||||
log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)}`);
|
||||
log(`[webrtc] offer from browser session=${sessionId.slice(0, 8)} sdp_type=${typeof sdp} sdp_len=${sdp?.length || 0}`);
|
||||
if (!sdp || typeof sdp !== 'string' || sdp.length < 10) {
|
||||
log(`[webrtc] WARNING: invalid SDP (type=${typeof sdp}), skipping offer`);
|
||||
return;
|
||||
}
|
||||
log(`[webrtc] sending offer to Rust (${sdp.length}b)...`);
|
||||
const result = await webrtcOffer(sessionId, sdp);
|
||||
log(`[webrtc] Rust result: ${JSON.stringify(result)?.slice(0, 200)}`);
|
||||
if (result?.sdp) {
|
||||
ws.send(JSON.stringify({ type: 'webrtc-answer', sessionId, sdp: result.sdp }));
|
||||
log(`[webrtc] answer sent to browser session=${sessionId.slice(0, 8)}`);
|
||||
} else {
|
||||
log(`[webrtc] ERROR: no answer SDP from Rust`);
|
||||
}
|
||||
},
|
||||
async (sessionId, candidate) => {
|
||||
@@ -483,6 +557,26 @@ initWebUi(
|
||||
async (sessionId) => {
|
||||
await webrtcClose(sessionId);
|
||||
},
|
||||
// onWebRtcAccept — browser has accepted a call, linking session to call.
|
||||
(callId: string, sessionId: string) => {
|
||||
log(`[webrtc] accept: callId=${callId} sessionId=${sessionId.slice(0, 8)}`);
|
||||
|
||||
// Store bidirectional mapping.
|
||||
webrtcSessionToCall.set(sessionId, callId);
|
||||
webrtcCallToSession.set(callId, sessionId);
|
||||
|
||||
// Check if we already have media info for this call (provider answered first).
|
||||
const media = pendingCallMedia.get(callId);
|
||||
if (media) {
|
||||
pendingCallMedia.delete(callId);
|
||||
log(`[webrtc] linking session=${sessionId.slice(0, 8)} to call=${callId} media=${media.addr}:${media.port} pt=${media.sipPt}`);
|
||||
webrtcLink(sessionId, callId, media.addr, media.port, media.sipPt).then((ok) => {
|
||||
log(`[webrtc] link result: ${ok}`);
|
||||
});
|
||||
} else {
|
||||
log(`[webrtc] session ${sessionId.slice(0, 8)} accepted, waiting for call_answered media info`);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: 'siprouter',
|
||||
version: '1.12.0',
|
||||
version: '1.14.0',
|
||||
description: 'undefined'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user