feat(proxy-engine): add multiparty call mixing with dynamic SIP and WebRTC leg management

This commit is contained in:
2026-04-10 12:52:48 +00:00
parent 36eab44e28
commit 93f671f1f9
13 changed files with 1572 additions and 809 deletions

View File

@@ -1,5 +1,13 @@
# Changelog
## 2026-04-10 - 1.14.0 - feat(proxy-engine)
add multiparty call mixing with dynamic SIP and WebRTC leg management
- replace passthrough call handling with a mixer-backed call model that tracks multiple legs and exposes leg status in call state output
- add mixer and leg I/O infrastructure to bridge SIP RTP and WebRTC audio through channel-based mix-minus processing
- introduce add_leg and remove_leg proxy commands and wire frontend bridge APIs to manage external call legs
- emit leg lifecycle events for observability and mark unimplemented device-leg and transfer HTTP endpoints with 501 responses
## 2026-04-10 - 1.13.0 - feat(proxy-engine,webrtc)
add B2BUA SIP leg handling and WebRTC call bridging for outbound calls

Binary file not shown.

View File

@@ -1,12 +1,19 @@
//! Call hub — owns legs and bridges media.
//! Call hub — owns N legs and a mixer task.
//!
//! Each Call has a unique ID and tracks its state, direction, and associated
//! SIP Call-IDs for message routing.
//! Every call has a central mixer that provides mix-minus audio to all
//! participants. Legs can be added and removed dynamically mid-call.
use crate::mixer::{MixerCommand, RtpPacket};
use crate::sip_leg::SipLeg;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
pub type LegId = String;
/// Call state machine.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -15,8 +22,6 @@ pub enum CallState {
Ringing,
Connected,
Voicemail,
Ivr,
Terminating,
Terminated,
}
@@ -27,8 +32,6 @@ impl CallState {
Self::Ringing => "ringing",
Self::Connected => "connected",
Self::Voicemail => "voicemail",
Self::Ivr => "ivr",
Self::Terminating => "terminating",
Self::Terminated => "terminated",
}
}
@@ -49,43 +52,172 @@ impl CallDirection {
}
}
/// A passthrough call — both sides share the same SIP Call-ID.
/// The proxy rewrites SDP/Contact/Request-URI and relays RTP.
pub struct PassthroughCall {
/// The type of a call leg.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegKind {
SipProvider,
SipDevice,
WebRtc,
Media, // voicemail playback, IVR, recording
}
impl LegKind {
pub fn as_str(&self) -> &'static str {
match self {
Self::SipProvider => "sip-provider",
Self::SipDevice => "sip-device",
Self::WebRtc => "webrtc",
Self::Media => "media",
}
}
}
/// Per-leg state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegState {
Inviting,
Ringing,
Connected,
Terminated,
}
impl LegState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Inviting => "inviting",
Self::Ringing => "ringing",
Self::Connected => "connected",
Self::Terminated => "terminated",
}
}
}
/// Information about a single leg in a call.
pub struct LegInfo {
pub id: LegId,
pub kind: LegKind,
pub state: LegState,
pub codec_pt: u8,
/// For SIP legs: the SIP dialog manager (handles 407 auth, BYE, etc).
pub sip_leg: Option<SipLeg>,
/// For SIP legs: the SIP Call-ID for message routing.
pub sip_call_id: Option<String>,
/// For WebRTC legs: the session ID in WebRtcEngine.
pub webrtc_session_id: Option<String>,
/// The RTP socket allocated for this leg.
pub rtp_socket: Option<Arc<UdpSocket>>,
/// The RTP port number.
pub rtp_port: u16,
/// The remote media endpoint (learned from SDP or address learning).
pub remote_media: Option<SocketAddr>,
/// SIP signaling address (provider or device).
pub signaling_addr: Option<SocketAddr>,
}
/// A multiparty call with N legs and a central mixer.
pub struct Call {
pub id: String,
pub sip_call_id: String,
pub state: CallState,
pub direction: CallDirection,
pub created_at: Instant,
// Call metadata.
// Metadata.
pub caller_number: Option<String>,
pub callee_number: Option<String>,
pub provider_id: String,
// Provider side.
pub provider_addr: SocketAddr,
pub provider_media: Option<SocketAddr>,
/// All legs in this call, keyed by leg ID.
pub legs: HashMap<LegId, LegInfo>,
// Device side.
pub device_addr: SocketAddr,
pub device_media: Option<SocketAddr>,
/// Channel to send commands to the mixer task.
pub mixer_cmd_tx: mpsc::Sender<MixerCommand>,
// RTP relay.
pub rtp_port: u16,
pub rtp_socket: Arc<UdpSocket>,
// Packet counters.
pub pkt_from_device: u64,
pub pkt_from_provider: u64,
/// Handle to the mixer task (aborted on call teardown).
mixer_task: Option<JoinHandle<()>>,
}
impl PassthroughCall {
impl Call {
pub fn new(
id: String,
direction: CallDirection,
provider_id: String,
mixer_cmd_tx: mpsc::Sender<MixerCommand>,
mixer_task: JoinHandle<()>,
) -> Self {
Self {
id,
state: CallState::SettingUp,
direction,
created_at: Instant::now(),
caller_number: None,
callee_number: None,
provider_id,
legs: HashMap::new(),
mixer_cmd_tx,
mixer_task: Some(mixer_task),
}
}
/// Add a leg to the mixer. Sends the AddLeg command with channel endpoints.
pub async fn add_leg_to_mixer(
&self,
leg_id: &str,
codec_pt: u8,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
) {
let _ = self
.mixer_cmd_tx
.send(MixerCommand::AddLeg {
leg_id: leg_id.to_string(),
codec_pt,
inbound_rx,
outbound_tx,
})
.await;
}
/// Remove a leg from the mixer.
pub async fn remove_leg_from_mixer(&self, leg_id: &str) {
let _ = self
.mixer_cmd_tx
.send(MixerCommand::RemoveLeg {
leg_id: leg_id.to_string(),
})
.await;
}
pub fn duration_secs(&self) -> u64 {
self.created_at.elapsed().as_secs()
}
/// Shut down the mixer and abort its task.
pub async fn shutdown_mixer(&mut self) {
let _ = self.mixer_cmd_tx.send(MixerCommand::Shutdown).await;
if let Some(handle) = self.mixer_task.take() {
handle.abort();
}
}
/// Produce a JSON status snapshot for the dashboard.
pub fn to_status_json(&self) -> serde_json::Value {
let legs: Vec<serde_json::Value> = self
.legs
.values()
.filter(|l| l.state != LegState::Terminated)
.map(|l| {
serde_json::json!({
"id": l.id,
"type": l.kind.as_str(),
"state": l.state.as_str(),
"codec": sip_proto::helpers::codec_name(l.codec_pt),
"rtpPort": l.rtp_port,
"remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
})
})
.collect();
serde_json::json!({
"id": self.id,
"state": self.state.as_str(),
@@ -93,11 +225,8 @@ impl PassthroughCall {
"callerNumber": self.caller_number,
"calleeNumber": self.callee_number,
"providerUsed": self.provider_id,
"createdAt": self.created_at.elapsed().as_millis(),
"duration": self.duration_secs(),
"rtpPort": self.rtp_port,
"pktFromDevice": self.pkt_from_device,
"pktFromProvider": self.pkt_from_provider,
"legs": legs,
})
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,80 @@
//! Leg I/O task spawners.
//!
//! Each SIP leg gets two tasks:
//! - Inbound: recv_from on RTP socket → strip header → send RtpPacket to mixer channel
//! - Outbound: recv encoded RTP from mixer channel → send_to remote media endpoint
//!
//! WebRTC leg I/O is handled inside webrtc_engine.rs (on_track + track.write).
use crate::mixer::RtpPacket;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
/// Channel pair for connecting a leg to the mixer.
pub struct LegChannels {
/// Mixer receives decoded packets from this leg.
pub inbound_tx: mpsc::Sender<RtpPacket>,
pub inbound_rx: mpsc::Receiver<RtpPacket>,
/// Mixer sends encoded RTP to this leg.
pub outbound_tx: mpsc::Sender<Vec<u8>>,
pub outbound_rx: mpsc::Receiver<Vec<u8>>,
}
/// Create a channel pair for a leg.
pub fn create_leg_channels() -> LegChannels {
let (inbound_tx, inbound_rx) = mpsc::channel::<RtpPacket>(64);
let (outbound_tx, outbound_rx) = mpsc::channel::<Vec<u8>>(8);
LegChannels {
inbound_tx,
inbound_rx,
outbound_tx,
outbound_rx,
}
}
/// Spawn the inbound I/O task for a SIP leg.
/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer.
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
pub fn spawn_sip_inbound(
rtp_socket: Arc<UdpSocket>,
inbound_tx: mpsc::Sender<RtpPacket>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut buf = vec![0u8; 1500];
loop {
match rtp_socket.recv_from(&mut buf).await {
Ok((n, _from)) => {
if n < 12 {
continue; // Too small for RTP header.
}
let pt = buf[1] & 0x7F;
let payload = buf[12..n].to_vec();
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt }).await.is_err() {
break; // Channel closed — leg removed.
}
}
Err(_) => break, // Socket error.
}
}
})
}
/// Spawn the outbound I/O task for a SIP leg.
/// Reads encoded RTP packets from the mixer and sends them to the remote media endpoint.
/// Returns the JoinHandle (exits when the outbound_rx channel is closed).
pub fn spawn_sip_outbound(
rtp_socket: Arc<UdpSocket>,
remote_media: SocketAddr,
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(rtp_data) = outbound_rx.recv().await {
let _ = rtp_socket.send_to(&rtp_data, remote_media).await;
}
})
}

View File

@@ -12,6 +12,8 @@ mod call_manager;
mod config;
mod dtmf;
mod ipc;
mod leg_io;
mod mixer;
mod provider;
mod recorder;
mod registrar;
@@ -131,11 +133,13 @@ async fn handle_command(
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
"make_call" => handle_make_call(engine, out_tx, &cmd).await,
"get_status" => handle_get_status(engine, out_tx, &cmd).await,
"add_leg" => handle_add_leg(engine, out_tx, &cmd).await,
"remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await,
// WebRTC commands — lock webrtc only (no engine contention).
"webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await,
"webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await,
"webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await,
// webrtc_link needs both: engine (for RTP socket) and webrtc (for session).
// webrtc_link needs both: engine (for mixer channels) and webrtc (for session).
"webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
@@ -259,14 +263,11 @@ async fn handle_sip_packet(
}
// 3. Route to existing call by SIP Call-ID.
// Check if this Call-ID belongs to an active call (avoids borrow conflict).
if eng.call_mgr.has_call(msg.call_id()) {
let config_ref = eng.config.as_ref().unwrap().clone();
// Temporarily take registrar to avoid overlapping borrows.
let registrar_dummy = Registrar::new(eng.out_tx.clone());
if eng
.call_mgr
.route_sip_message(&msg, from_addr, socket, &config_ref, &registrar_dummy)
.route_sip_message(&msg, from_addr, socket, &config_ref)
.await
{
return;
@@ -578,8 +579,8 @@ async fn handle_webrtc_ice(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd
}
}
/// Handle `webrtc_link` — link a WebRTC session to a SIP call for audio bridging.
/// Briefly locks engine to get the RTP socket, then locks webrtc to set up the bridge.
/// Handle `webrtc_link` — link a WebRTC session to a call's mixer for audio bridging.
/// Creates channels, adds WebRTC leg to the call, wires the WebRTC engine.
/// Locks are never held simultaneously — no deadlock possible.
async fn handle_webrtc_link(
engine: Arc<Mutex<ProxyEngine>>,
@@ -595,44 +596,67 @@ async fn handle_webrtc_link(
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let provider_addr = match cmd.params.get("provider_media_addr").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing provider_media_addr"); return; }
};
let provider_port = match cmd.params.get("provider_media_port").and_then(|v| v.as_u64()) {
Some(p) => p as u16,
None => { respond_err(out_tx, &cmd.id, "missing provider_media_port"); return; }
};
let sip_pt = cmd.params.get("sip_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8;
let provider_media: SocketAddr = match format!("{provider_addr}:{provider_port}").parse() {
Ok(a) => a,
Err(e) => { respond_err(out_tx, &cmd.id, &format!("bad address: {e}")); return; }
};
// Create channels for the WebRTC leg.
let channels = crate::leg_io::create_leg_channels();
// Briefly lock engine to get the B2BUA call's RTP socket.
let rtp_socket = {
// Briefly lock engine to add the WebRTC leg to the call's mixer.
{
let eng = engine.lock().await;
eng.call_mgr.get_b2bua_rtp_socket(&call_id)
}; // engine lock released here
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
// Add to mixer via channel.
call.add_leg_to_mixer(
&session_id,
codec_lib::PT_OPUS,
channels.inbound_rx,
channels.outbound_tx,
)
.await;
} // engine lock released
let rtp_socket = match rtp_socket {
Some(s) => s,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found or no RTP socket"));
return;
}
};
let bridge_info = crate::webrtc_engine::SipBridgeInfo {
provider_media,
sip_pt,
rtp_socket,
};
// Lock webrtc to set up the audio bridge.
// Lock webrtc to wire the channels.
let mut wrtc = webrtc.lock().await;
if wrtc.link_to_sip(&session_id, &call_id, bridge_info).await {
if wrtc
.link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx)
.await
{
// Also store the WebRTC leg info in the call.
drop(wrtc); // Release webrtc lock before re-acquiring engine.
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
call.legs.insert(
session_id.clone(),
crate::call::LegInfo {
id: session_id.clone(),
kind: crate::call::LegKind::WebRtc,
state: crate::call::LegState::Connected,
codec_pt: codec_lib::PT_OPUS,
sip_leg: None,
sip_call_id: None,
webrtc_session_id: Some(session_id.clone()),
rtp_socket: None,
rtp_port: 0,
remote_media: None,
signaling_addr: None,
},
);
}
}
emit_event(out_tx, "leg_added", serde_json::json!({
"call_id": call_id,
"leg_id": session_id,
"kind": "webrtc",
"state": "connected",
}));
respond_ok(out_tx, &cmd.id, serde_json::json!({
"session_id": session_id,
"call_id": call_id,
@@ -643,6 +667,98 @@ async fn handle_webrtc_link(
}
}
/// Handle `add_leg` — add a new SIP leg to an existing call.
async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
};
// Resolve provider.
let provider_config = if let Some(pid) = provider_id {
config_ref.providers.iter().find(|p| p.id == pid).cloned()
} else {
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
};
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
};
// Get registered AOR.
let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let ps = ps_arc.lock().await;
ps.registered_aor.clone()
} else {
format!("sip:{}@{}", provider_config.username, provider_config.domain)
};
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let ps = ps_arc.lock().await;
ps.public_ip.clone()
} else {
None
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let leg_id = call_mgr.add_external_leg(
&call_id, &number, &provider_config, &config_ref,
rtp_pool, &socket, public_ip.as_deref(), &registered_aor,
).await;
match leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
None => respond_err(out_tx, &cmd.id, "failed to add leg"),
}
}
/// Handle `remove_leg` — remove a leg from a call.
async fn handle_remove_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let mut eng = engine.lock().await;
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await {
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
} else {
respond_err(out_tx, &cmd.id, &format!("call/leg not found"));
}
}
/// Handle `webrtc_close` — close a WebRTC session.
/// Uses only the WebRTC lock.
async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {

View File

@@ -0,0 +1,232 @@
//! Audio mixer — mix-minus engine for multiparty calls.
//!
//! Each Call spawns one mixer task. Legs communicate with the mixer via
//! tokio mpsc channels — no shared mutable state, no lock contention.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
//! 2. Compute total mix (sum of all legs' PCM as i32)
//! 3. For each leg: mix-minus = total - own, resample to leg codec rate, encode, send
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, TranscodeState};
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
const MIX_RATE: u32 = 16000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
pub payload: Vec<u8>,
pub payload_type: u8,
}
/// Commands sent to the mixer task via a control channel.
pub enum MixerCommand {
/// Add a new leg to the mix.
AddLeg {
leg_id: String,
codec_pt: u8,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
},
/// Remove a leg from the mix (channels are dropped, I/O tasks exit).
RemoveLeg { leg_id: String },
/// Shut down the mixer.
Shutdown,
}
/// Internal per-leg state inside the mixer.
struct MixerLegSlot {
codec_pt: u8,
transcoder: TranscodeState,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
last_pcm_frame: Vec<i16>,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
// RTP output state.
rtp_seq: u16,
rtp_ts: u32,
rtp_ssrc: u32,
}
/// Spawn the mixer task for a call. Returns the command sender and task handle.
pub fn spawn_mixer(
call_id: String,
out_tx: OutTx,
) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
let handle = tokio::spawn(async move {
mixer_loop(call_id, cmd_rx, out_tx).await;
});
(cmd_tx, handle)
}
/// The 20ms mixing loop.
async fn mixer_loop(
call_id: String,
mut cmd_rx: mpsc::Receiver<MixerCommand>,
out_tx: OutTx,
) {
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
let mut interval = time::interval(Duration::from_millis(20));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
// 1. Process control commands (non-blocking).
loop {
match cmd_rx.try_recv() {
Ok(MixerCommand::AddLeg {
leg_id,
codec_pt,
inbound_rx,
outbound_tx,
}) => {
let transcoder = match TranscodeState::new() {
Ok(t) => t,
Err(e) => {
emit_event(
&out_tx,
"mixer_error",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"error": format!("codec init: {e}"),
}),
);
continue;
}
};
legs.insert(
leg_id,
MixerLegSlot {
codec_pt,
transcoder,
inbound_rx,
outbound_tx,
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
rtp_ssrc: rand::random(),
},
);
}
Ok(MixerCommand::RemoveLeg { leg_id }) => {
legs.remove(&leg_id);
// Channels drop → I/O tasks exit cleanly.
}
Ok(MixerCommand::Shutdown) => return,
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => return,
}
}
if legs.is_empty() {
continue;
}
// 2. Drain inbound packets, decode to 16kHz PCM.
let leg_ids: Vec<String> = legs.keys().cloned().collect();
for lid in &leg_ids {
let slot = legs.get_mut(lid).unwrap();
// Drain channel, keep only the latest packet (simple jitter handling).
let mut latest: Option<RtpPacket> = None;
loop {
match slot.inbound_rx.try_recv() {
Ok(pkt) => latest = Some(pkt),
Err(_) => break,
}
}
if let Some(pkt) = latest {
slot.silent_ticks = 0;
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to mixing rate if needed.
let pcm_mix = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = pcm_mix;
frame.resize(MIX_FRAME_SIZE, 0);
slot.last_pcm_frame = frame;
}
Err(_) => {
// Decode failed — use silence.
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
}
}
} else {
slot.silent_ticks += 1;
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
}
}
}
// 3. Compute total mix (sum of all legs as i32 to avoid overflow).
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
for slot in legs.values() {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as i32;
}
}
// 4. For each leg: mix-minus, resample, encode, send.
for slot in legs.values_mut() {
// Mix-minus: total minus this leg's own contribution.
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as i32).clamp(-32768, 32767) as i16;
mix_minus.push(sample);
}
// Resample from 16kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Encode to the leg's codec.
let encoded = match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
// Build RTP packet with header.
let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc);
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
// Non-blocking send — drop frame if channel is full.
let _ = slot.outbound_tx.try_send(rtp);
}
}
}

View File

@@ -1,16 +1,17 @@
//! WebRTC engine — manages browser PeerConnections with SIP audio bridging.
//! WebRTC engine — manages browser PeerConnections.
//!
//! Browser Opus audio → Rust PeerConnection → transcode via codec-lib → SIP RTP
//! SIP RTP → transcode via codec-lib → Rust PeerConnection → Browser Opus
//! Audio bridging is now channel-based:
//! - Browser Opus audio → on_track → mixer inbound channel
//! - Mixer outbound channel → Opus RTP → TrackLocalStaticRTP → browser
//!
//! The mixer handles all transcoding. The WebRTC engine just shuttles raw Opus.
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{TranscodeState, PT_G722, PT_OPUS};
use crate::mixer::RtpPacket;
use codec_lib::PT_OPUS;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, Mutex};
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
@@ -22,26 +23,14 @@ use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
/// SIP-side bridge info for a WebRTC session.
#[derive(Clone)]
pub struct SipBridgeInfo {
/// Provider's media endpoint (RTP destination).
pub provider_media: SocketAddr,
/// Provider's codec payload type (e.g. 9 for G.722).
pub sip_pt: u8,
/// The allocated RTP socket for bidirectional audio with the provider.
/// This is the socket whose port was advertised in SDP, so the provider
/// sends RTP here and expects RTP from this port.
pub rtp_socket: Arc<UdpSocket>,
}
/// A managed WebRTC session.
struct WebRtcSession {
pc: Arc<RTCPeerConnection>,
local_track: Arc<TrackLocalStaticRTP>,
call_id: Option<String>,
/// SIP bridge — set when the session is linked to a call.
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
/// Channel sender for forwarding browser Opus audio to the mixer.
/// Set when the session is linked to a call via link_to_mixer().
mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>>,
}
/// Manages all WebRTC sessions.
@@ -58,7 +47,7 @@ impl WebRtcEngine {
}
}
/// Handle a WebRTC offer from a browser.
/// Handle a WebRTC offer from a browser — create PeerConnection, return SDP answer.
pub async fn handle_offer(
&mut self,
session_id: &str,
@@ -101,8 +90,9 @@ impl WebRtcEngine {
.await
.map_err(|e| format!("add track: {e}"))?;
// Shared SIP bridge info (populated when linked to a call).
let sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>> = Arc::new(Mutex::new(None));
// Shared mixer channel sender (populated when linked to a call).
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> =
Arc::new(Mutex::new(None));
// ICE candidate handler.
let out_tx_ice = self.out_tx.clone();
@@ -153,14 +143,14 @@ impl WebRtcEngine {
}));
// Track handler — receives Opus audio from the browser.
// When SIP bridge is set, transcodes and forwards to provider.
// Forwards raw Opus payload to the mixer channel (when linked).
let out_tx_track = self.out_tx.clone();
let sid_track = session_id.to_string();
let sip_bridge_for_track = sip_bridge.clone();
let mixer_tx_for_track = mixer_tx.clone();
pc.on_track(Box::new(move |track, _receiver, _transceiver| {
let out_tx = out_tx_track.clone();
let sid = sid_track.clone();
let bridge = sip_bridge_for_track.clone();
let mixer_tx = mixer_tx_for_track.clone();
Box::pin(async move {
let codec_info = track.codec();
emit_event(
@@ -173,8 +163,8 @@ impl WebRtcEngine {
}),
);
// Spawn the browser→SIP audio forwarding task.
tokio::spawn(browser_to_sip_loop(track, bridge, out_tx, sid));
// Spawn browser→mixer forwarding task.
tokio::spawn(browser_to_mixer_loop(track, mixer_tx, out_tx, sid));
})
}));
@@ -201,43 +191,41 @@ impl WebRtcEngine {
pc,
local_track,
call_id: None,
sip_bridge,
mixer_tx,
},
);
Ok(answer_sdp)
}
/// Link a WebRTC session to a SIP call — sets up bidirectional audio bridge.
/// - Browser→SIP: already running via on_track handler, will start forwarding
/// once bridge info is set.
/// - SIP→Browser: spawned here, reads from the RTP socket and sends to browser.
pub async fn link_to_sip(
/// Link a WebRTC session to a call's mixer via channels.
/// - `inbound_tx`: browser audio goes TO the mixer through this channel
/// - `outbound_rx`: mixed audio comes FROM the mixer through this channel
pub async fn link_to_mixer(
&mut self,
session_id: &str,
call_id: &str,
bridge_info: SipBridgeInfo,
inbound_tx: mpsc::Sender<RtpPacket>,
outbound_rx: mpsc::Receiver<Vec<u8>>,
) -> bool {
if let Some(session) = self.sessions.get_mut(session_id) {
session.call_id = Some(call_id.to_string());
let session = match self.sessions.get_mut(session_id) {
Some(s) => s,
None => return false,
};
// Spawn SIP → browser audio loop (provider RTP → transcode → Opus → WebRTC track).
let local_track = session.local_track.clone();
let rtp_socket = bridge_info.rtp_socket.clone();
let sip_pt = bridge_info.sip_pt;
let out_tx = self.out_tx.clone();
let sid = session_id.to_string();
tokio::spawn(sip_to_browser_loop(
rtp_socket, local_track, sip_pt, out_tx, sid,
));
session.call_id = Some(call_id.to_string());
// Set bridge info — this unblocks the browser→SIP loop (already running).
let mut bridge = session.sip_bridge.lock().await;
*bridge = Some(bridge_info);
true
} else {
false
// Set the mixer sender so the on_track loop starts forwarding.
{
let mut tx = session.mixer_tx.lock().await;
*tx = Some(inbound_tx);
}
// Spawn mixer→browser outbound task.
let local_track = session.local_track.clone();
tokio::spawn(mixer_to_browser_loop(outbound_rx, local_track));
true
}
pub async fn add_ice_candidate(
@@ -272,90 +260,48 @@ impl WebRtcEngine {
}
Ok(())
}
pub fn has_session(&self, session_id: &str) -> bool {
self.sessions.contains_key(session_id)
}
}
/// Browser → SIP audio forwarding loop.
/// Reads Opus RTP from the browser, transcodes to the SIP codec, sends to provider.
async fn browser_to_sip_loop(
/// Browser → Mixer audio forwarding loop.
/// Reads Opus RTP from the browser track, sends raw Opus payload to the mixer channel.
async fn browser_to_mixer_loop(
track: Arc<webrtc::track::track_remote::TrackRemote>,
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>>,
out_tx: OutTx,
session_id: String,
) {
// Create a persistent codec state for this direction.
let mut transcoder = match TranscodeState::new() {
Ok(t) => t,
Err(e) => {
emit_event(
&out_tx,
"webrtc_error",
serde_json::json!({ "session_id": session_id, "error": format!("codec init: {e}") }),
);
return;
}
};
let mut buf = vec![0u8; 1500];
let mut count = 0u64;
let mut to_sip_seq: u16 = 0;
let mut to_sip_ts: u32 = 0;
let to_sip_ssrc: u32 = rand::random();
loop {
match track.read(&mut buf).await {
Ok((rtp_packet, _attributes)) => {
count += 1;
// Get the SIP bridge info (may not be set yet if call isn't linked).
let bridge = sip_bridge.lock().await;
let bridge_info = match bridge.as_ref() {
Some(b) => b.clone(),
None => continue, // Not linked to a SIP call yet — drop the packet.
};
drop(bridge); // Release lock before doing I/O.
// Extract Opus payload from the RTP packet (skip 12-byte header).
let payload = &rtp_packet.payload;
if payload.is_empty() {
continue;
}
// Transcode Opus → SIP codec (e.g. G.722).
let sip_payload = match transcoder.transcode(
payload,
PT_OPUS,
bridge_info.sip_pt,
Some("to_sip"),
) {
Ok(p) if !p.is_empty() => p,
_ => continue,
};
// Build SIP RTP packet.
let header = build_rtp_header(bridge_info.sip_pt, to_sip_seq, to_sip_ts, to_sip_ssrc);
let mut sip_rtp = header.to_vec();
sip_rtp.extend_from_slice(&sip_payload);
to_sip_seq = to_sip_seq.wrapping_add(1);
to_sip_ts = to_sip_ts.wrapping_add(rtp_clock_increment(bridge_info.sip_pt));
// Send to provider via the RTP socket (correct source port matching our SDP).
let _ = bridge_info
.rtp_socket
.send_to(&sip_rtp, bridge_info.provider_media)
.await;
// Send raw Opus payload to mixer (if linked).
let tx = mixer_tx.lock().await;
if let Some(ref tx) = *tx {
let _ = tx
.send(RtpPacket {
payload: payload.to_vec(),
payload_type: PT_OPUS,
})
.await;
}
drop(tx);
if count == 1 || count == 50 || count % 500 == 0 {
emit_event(
&out_tx,
"webrtc_audio_tx",
"webrtc_audio_rx",
serde_json::json!({
"session_id": session_id,
"direction": "browser_to_sip",
"direction": "browser_to_mixer",
"packet_count": count,
}),
);
@@ -366,85 +312,13 @@ async fn browser_to_sip_loop(
}
}
/// SIP → Browser audio forwarding loop.
/// Reads RTP from the provider (via the allocated RTP socket), transcodes to Opus,
/// and writes to the WebRTC local track for delivery to the browser.
async fn sip_to_browser_loop(
rtp_socket: Arc<UdpSocket>,
/// Mixer → Browser audio forwarding loop.
/// Reads Opus-encoded RTP packets from the mixer and writes to the WebRTC track.
async fn mixer_to_browser_loop(
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
local_track: Arc<TrackLocalStaticRTP>,
sip_pt: u8,
out_tx: OutTx,
session_id: String,
) {
let mut transcoder = match TranscodeState::new() {
Ok(t) => t,
Err(e) => {
emit_event(
&out_tx,
"webrtc_error",
serde_json::json!({
"session_id": session_id,
"error": format!("sip_to_browser codec init: {e}"),
}),
);
return;
}
};
let mut buf = vec![0u8; 1500];
let mut count = 0u64;
let mut seq: u16 = 0;
let mut ts: u32 = 0;
let ssrc: u32 = rand::random();
loop {
match rtp_socket.recv_from(&mut buf).await {
Ok((n, _from)) => {
if n < 12 {
continue; // Too small for RTP header.
}
count += 1;
// Extract payload (skip 12-byte RTP header).
let payload = &buf[12..n];
if payload.is_empty() {
continue;
}
// Transcode SIP codec → Opus.
let opus_payload = match transcoder.transcode(
payload,
sip_pt,
PT_OPUS,
Some("sip_to_browser"),
) {
Ok(p) if !p.is_empty() => p,
_ => continue,
};
// Build Opus RTP packet.
let header = build_rtp_header(PT_OPUS, seq, ts, ssrc);
let mut packet = header.to_vec();
packet.extend_from_slice(&opus_payload);
seq = seq.wrapping_add(1);
ts = ts.wrapping_add(960); // Opus: 48000 Hz × 20ms = 960 samples
let _ = local_track.write(&packet).await;
if count == 1 || count == 50 || count % 500 == 0 {
emit_event(
&out_tx,
"webrtc_audio_rx",
serde_json::json!({
"session_id": session_id,
"direction": "sip_to_browser",
"packet_count": count,
}),
);
}
}
Err(_) => break, // Socket closed.
}
while let Some(rtp_data) = outbound_rx.recv().await {
let _ = local_track.write(&rtp_data).await;
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.13.0',
version: '1.14.0',
description: 'undefined'
}

View File

@@ -128,14 +128,14 @@ async function handleRequest(
}
}
// API: add leg to call.
// API: add leg to call (device — not yet implemented, needs device-to-call routing).
if (url.pathname.startsWith('/api/call/') && url.pathname.endsWith('/addleg') && method === 'POST') {
try {
const callId = url.pathname.split('/')[3];
const body = await readJsonBody(req);
if (!body?.deviceId) return sendJson(res, { ok: false, error: 'missing deviceId' }, 400);
const ok = callManager?.addDeviceToCall(callId, body.deviceId) ?? false;
return sendJson(res, { ok });
// TODO: implement device leg addition (needs SIP INVITE to device).
return sendJson(res, { ok: false, error: 'not yet implemented' }, 501);
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}
@@ -147,8 +147,9 @@ async function handleRequest(
const callId = url.pathname.split('/')[3];
const body = await readJsonBody(req);
if (!body?.number) return sendJson(res, { ok: false, error: 'missing number' }, 400);
const ok = callManager?.addExternalToCall(callId, body.number, body.providerId) ?? false;
return sendJson(res, { ok });
const { addLeg: addLegFn } = await import('./proxybridge.ts');
const legId = await addLegFn(callId, body.number, body.providerId);
return sendJson(res, { ok: !!legId, legId });
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}
@@ -160,22 +161,22 @@ async function handleRequest(
const callId = url.pathname.split('/')[3];
const body = await readJsonBody(req);
if (!body?.legId) return sendJson(res, { ok: false, error: 'missing legId' }, 400);
const ok = callManager?.removeLegFromCall(callId, body.legId) ?? false;
const { removeLeg: removeLegFn } = await import('./proxybridge.ts');
const ok = await removeLegFn(callId, body.legId);
return sendJson(res, { ok });
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}
}
// API: transfer leg.
// API: transfer leg (not yet implemented).
if (url.pathname === '/api/transfer' && method === 'POST') {
try {
const body = await readJsonBody(req);
if (!body?.sourceCallId || !body?.legId || !body?.targetCallId) {
return sendJson(res, { ok: false, error: 'missing sourceCallId, legId, or targetCallId' }, 400);
}
const ok = callManager?.transferLeg(body.sourceCallId, body.legId, body.targetCallId) ?? false;
return sendJson(res, { ok });
return sendJson(res, { ok: false, error: 'not yet implemented' }, 501);
} catch (e: any) {
return sendJson(res, { ok: false, error: e.message }, 400);
}

View File

@@ -238,6 +238,38 @@ export async function webrtcLink(sessionId: string, callId: string, providerMedi
}
}
/**
* Add an external SIP leg to an existing call (multiparty).
*/
export async function addLeg(callId: string, number: string, providerId?: string): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await bridge.sendCommand('add_leg', {
call_id: callId,
number,
provider_id: providerId,
} as any);
return (result as any)?.leg_id || null;
} catch (e: any) {
logFn?.(`[proxy-engine] add_leg error: ${e?.message || e}`);
return null;
}
}
/**
* Remove a leg from a call.
*/
export async function removeLeg(callId: string, legId: string): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await bridge.sendCommand('remove_leg', { call_id: callId, leg_id: legId } as any);
return true;
} catch (e: any) {
logFn?.(`[proxy-engine] remove_leg error: ${e?.message || e}`);
return false;
}
}
/**
* Close a WebRTC session.
*/

View File

@@ -39,6 +39,8 @@ import {
webrtcIce,
webrtcLink,
webrtcClose,
addLeg,
removeLeg,
} from './proxybridge.ts';
import type {
IIncomingCallEvent,
@@ -359,6 +361,19 @@ async function startProxyEngine(): Promise<void> {
log(`[sip] unhandled ${data.method_or_status} Call-ID=${data.call_id?.slice(0, 20)} from=${data.from_addr}:${data.from_port}`);
});
// Leg events (multiparty).
onProxyEvent('leg_added', (data: any) => {
log(`[leg] added: call=${data.call_id} leg=${data.leg_id} kind=${data.kind} state=${data.state}`);
});
onProxyEvent('leg_removed', (data: any) => {
log(`[leg] removed: call=${data.call_id} leg=${data.leg_id}`);
});
onProxyEvent('leg_state_changed', (data: any) => {
log(`[leg] state: call=${data.call_id} leg=${data.leg_id}${data.state}`);
});
// WebRTC events from Rust — forward ICE candidates to browser via WebSocket.
onProxyEvent('webrtc_ice_candidate', (data: any) => {
// Find the browser's WebSocket by session ID and send the ICE candidate.

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.13.0',
version: '1.14.0',
description: 'undefined'
}