feat(proxy-engine): add Rust-based outbound calling, WebRTC bridging, and voicemail handling
This commit is contained in:
389
rust/crates/proxy-engine/src/webrtc_engine.rs
Normal file
389
rust/crates/proxy-engine/src/webrtc_engine.rs
Normal file
@@ -0,0 +1,389 @@
|
||||
//! WebRTC engine — manages browser PeerConnections with SIP audio bridging.
|
||||
//!
|
||||
//! Browser Opus audio → Rust PeerConnection → transcode via codec-lib → SIP RTP
|
||||
//! SIP RTP → transcode via codec-lib → Rust PeerConnection → Browser Opus
|
||||
|
||||
use crate::ipc::{emit_event, OutTx};
|
||||
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
||||
use codec_lib::{TranscodeState, PT_G722, PT_OPUS};
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::Mutex;
|
||||
use webrtc::api::media_engine::MediaEngine;
|
||||
use webrtc::api::APIBuilder;
|
||||
use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
|
||||
use webrtc::peer_connection::configuration::RTCConfiguration;
|
||||
use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
|
||||
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
|
||||
use webrtc::peer_connection::RTCPeerConnection;
|
||||
use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability;
|
||||
use webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP;
|
||||
use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
|
||||
|
||||
/// SIP-side bridge info for a WebRTC session.
|
||||
#[derive(Clone)]
|
||||
pub struct SipBridgeInfo {
|
||||
/// Provider's media endpoint (RTP destination).
|
||||
pub provider_media: SocketAddr,
|
||||
/// Provider's codec payload type (e.g. 9 for G.722).
|
||||
pub sip_pt: u8,
|
||||
/// The SIP UDP socket for sending RTP to the provider.
|
||||
pub sip_socket: Arc<UdpSocket>,
|
||||
}
|
||||
|
||||
/// A managed WebRTC session.
|
||||
struct WebRtcSession {
|
||||
pc: Arc<RTCPeerConnection>,
|
||||
local_track: Arc<TrackLocalStaticRTP>,
|
||||
call_id: Option<String>,
|
||||
/// SIP bridge — set when the session is linked to a call.
|
||||
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
|
||||
}
|
||||
|
||||
/// Manages all WebRTC sessions.
|
||||
pub struct WebRtcEngine {
|
||||
sessions: HashMap<String, WebRtcSession>,
|
||||
out_tx: OutTx,
|
||||
}
|
||||
|
||||
impl WebRtcEngine {
|
||||
pub fn new(out_tx: OutTx) -> Self {
|
||||
Self {
|
||||
sessions: HashMap::new(),
|
||||
out_tx,
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a WebRTC offer from a browser.
|
||||
pub async fn handle_offer(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
offer_sdp: &str,
|
||||
) -> Result<String, String> {
|
||||
let mut media_engine = MediaEngine::default();
|
||||
media_engine
|
||||
.register_default_codecs()
|
||||
.map_err(|e| format!("register codecs: {e}"))?;
|
||||
|
||||
let api = APIBuilder::new()
|
||||
.with_media_engine(media_engine)
|
||||
.build();
|
||||
|
||||
let config = RTCConfiguration {
|
||||
ice_servers: vec![],
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let pc = api
|
||||
.new_peer_connection(config)
|
||||
.await
|
||||
.map_err(|e| format!("create peer connection: {e}"))?;
|
||||
let pc = Arc::new(pc);
|
||||
|
||||
// Local audio track for sending audio to browser (Opus).
|
||||
let local_track = Arc::new(TrackLocalStaticRTP::new(
|
||||
RTCRtpCodecCapability {
|
||||
mime_type: "audio/opus".to_string(),
|
||||
clock_rate: 48000,
|
||||
channels: 1,
|
||||
..Default::default()
|
||||
},
|
||||
"audio".to_string(),
|
||||
"siprouter".to_string(),
|
||||
));
|
||||
|
||||
let _sender = pc
|
||||
.add_track(local_track.clone() as Arc<dyn TrackLocal + Send + Sync>)
|
||||
.await
|
||||
.map_err(|e| format!("add track: {e}"))?;
|
||||
|
||||
// Shared SIP bridge info (populated when linked to a call).
|
||||
let sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
// ICE candidate handler.
|
||||
let out_tx_ice = self.out_tx.clone();
|
||||
let sid_ice = session_id.to_string();
|
||||
pc.on_ice_candidate(Box::new(move |candidate| {
|
||||
let out_tx = out_tx_ice.clone();
|
||||
let sid = sid_ice.clone();
|
||||
Box::pin(async move {
|
||||
if let Some(c) = candidate {
|
||||
if let Ok(json) = c.to_json() {
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"webrtc_ice_candidate",
|
||||
serde_json::json!({
|
||||
"session_id": sid,
|
||||
"candidate": json.candidate,
|
||||
"sdp_mid": json.sdp_mid,
|
||||
"sdp_mline_index": json.sdp_mline_index,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
}));
|
||||
|
||||
// Connection state handler.
|
||||
let out_tx_state = self.out_tx.clone();
|
||||
let sid_state = session_id.to_string();
|
||||
pc.on_peer_connection_state_change(Box::new(move |state| {
|
||||
let out_tx = out_tx_state.clone();
|
||||
let sid = sid_state.clone();
|
||||
Box::pin(async move {
|
||||
let state_str = match state {
|
||||
RTCPeerConnectionState::Connected => "connected",
|
||||
RTCPeerConnectionState::Disconnected => "disconnected",
|
||||
RTCPeerConnectionState::Failed => "failed",
|
||||
RTCPeerConnectionState::Closed => "closed",
|
||||
RTCPeerConnectionState::New => "new",
|
||||
RTCPeerConnectionState::Connecting => "connecting",
|
||||
_ => "unknown",
|
||||
};
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"webrtc_state",
|
||||
serde_json::json!({ "session_id": sid, "state": state_str }),
|
||||
);
|
||||
})
|
||||
}));
|
||||
|
||||
// Track handler — receives Opus audio from the browser.
|
||||
// When SIP bridge is set, transcodes and forwards to provider.
|
||||
let out_tx_track = self.out_tx.clone();
|
||||
let sid_track = session_id.to_string();
|
||||
let sip_bridge_for_track = sip_bridge.clone();
|
||||
pc.on_track(Box::new(move |track, _receiver, _transceiver| {
|
||||
let out_tx = out_tx_track.clone();
|
||||
let sid = sid_track.clone();
|
||||
let bridge = sip_bridge_for_track.clone();
|
||||
Box::pin(async move {
|
||||
let codec_info = track.codec();
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"webrtc_track",
|
||||
serde_json::json!({
|
||||
"session_id": sid,
|
||||
"kind": track.kind().to_string(),
|
||||
"codec": codec_info.capability.mime_type,
|
||||
}),
|
||||
);
|
||||
|
||||
// Spawn the browser→SIP audio forwarding task.
|
||||
tokio::spawn(browser_to_sip_loop(track, bridge, out_tx, sid));
|
||||
})
|
||||
}));
|
||||
|
||||
// Set remote offer.
|
||||
let offer = RTCSessionDescription::offer(offer_sdp.to_string())
|
||||
.map_err(|e| format!("parse offer: {e}"))?;
|
||||
pc.set_remote_description(offer)
|
||||
.await
|
||||
.map_err(|e| format!("set remote description: {e}"))?;
|
||||
|
||||
// Create answer.
|
||||
let answer = pc
|
||||
.create_answer(None)
|
||||
.await
|
||||
.map_err(|e| format!("create answer: {e}"))?;
|
||||
let answer_sdp = answer.sdp.clone();
|
||||
pc.set_local_description(answer)
|
||||
.await
|
||||
.map_err(|e| format!("set local description: {e}"))?;
|
||||
|
||||
self.sessions.insert(
|
||||
session_id.to_string(),
|
||||
WebRtcSession {
|
||||
pc,
|
||||
local_track,
|
||||
call_id: None,
|
||||
sip_bridge,
|
||||
},
|
||||
);
|
||||
|
||||
Ok(answer_sdp)
|
||||
}
|
||||
|
||||
/// Link a WebRTC session to a SIP call — sets up the audio bridge.
|
||||
pub async fn link_to_sip(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
call_id: &str,
|
||||
bridge_info: SipBridgeInfo,
|
||||
) -> bool {
|
||||
if let Some(session) = self.sessions.get_mut(session_id) {
|
||||
session.call_id = Some(call_id.to_string());
|
||||
let mut bridge = session.sip_bridge.lock().await;
|
||||
*bridge = Some(bridge_info);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Send transcoded audio from the SIP side to the browser.
|
||||
/// Called by the RTP relay when it receives a packet from the provider.
|
||||
pub async fn forward_sip_to_browser(
|
||||
&self,
|
||||
session_id: &str,
|
||||
sip_rtp_payload: &[u8],
|
||||
sip_pt: u8,
|
||||
) -> Result<(), String> {
|
||||
let session = self
|
||||
.sessions
|
||||
.get(session_id)
|
||||
.ok_or_else(|| format!("session {session_id} not found"))?;
|
||||
|
||||
// Transcode SIP codec → Opus.
|
||||
// We create a temporary TranscodeState per packet for simplicity.
|
||||
// TODO: Use a per-session persistent state for proper codec continuity.
|
||||
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec: {e}"))?;
|
||||
let opus_payload = transcoder
|
||||
.transcode(sip_rtp_payload, sip_pt, PT_OPUS, Some("to_browser"))
|
||||
.map_err(|e| format!("transcode: {e}"))?;
|
||||
|
||||
if opus_payload.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Build RTP header for Opus.
|
||||
// TODO: Track seq/ts/ssrc per session for proper continuity.
|
||||
let header = build_rtp_header(PT_OPUS, 0, 0, 0);
|
||||
let mut packet = header.to_vec();
|
||||
packet.extend_from_slice(&opus_payload);
|
||||
|
||||
session
|
||||
.local_track
|
||||
.write(&packet)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|e| format!("write: {e}"))
|
||||
}
|
||||
|
||||
pub async fn add_ice_candidate(
|
||||
&self,
|
||||
session_id: &str,
|
||||
candidate: &str,
|
||||
sdp_mid: Option<&str>,
|
||||
sdp_mline_index: Option<u16>,
|
||||
) -> Result<(), String> {
|
||||
let session = self
|
||||
.sessions
|
||||
.get(session_id)
|
||||
.ok_or_else(|| format!("session {session_id} not found"))?;
|
||||
|
||||
let init = RTCIceCandidateInit {
|
||||
candidate: candidate.to_string(),
|
||||
sdp_mid: sdp_mid.map(|s| s.to_string()),
|
||||
sdp_mline_index,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
session
|
||||
.pc
|
||||
.add_ice_candidate(init)
|
||||
.await
|
||||
.map_err(|e| format!("add ICE: {e}"))
|
||||
}
|
||||
|
||||
pub async fn close_session(&mut self, session_id: &str) -> Result<(), String> {
|
||||
if let Some(session) = self.sessions.remove(session_id) {
|
||||
session.pc.close().await.map_err(|e| format!("close: {e}"))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn has_session(&self, session_id: &str) -> bool {
|
||||
self.sessions.contains_key(session_id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Browser → SIP audio forwarding loop.
|
||||
/// Reads Opus RTP from the browser, transcodes to the SIP codec, sends to provider.
|
||||
async fn browser_to_sip_loop(
|
||||
track: Arc<webrtc::track::track_remote::TrackRemote>,
|
||||
sip_bridge: Arc<Mutex<Option<SipBridgeInfo>>>,
|
||||
out_tx: OutTx,
|
||||
session_id: String,
|
||||
) {
|
||||
// Create a persistent codec state for this direction.
|
||||
let mut transcoder = match TranscodeState::new() {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"webrtc_error",
|
||||
serde_json::json!({ "session_id": session_id, "error": format!("codec init: {e}") }),
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut buf = vec![0u8; 1500];
|
||||
let mut count = 0u64;
|
||||
let mut to_sip_seq: u16 = 0;
|
||||
let mut to_sip_ts: u32 = 0;
|
||||
let to_sip_ssrc: u32 = rand::random();
|
||||
|
||||
loop {
|
||||
match track.read(&mut buf).await {
|
||||
Ok((rtp_packet, _attributes)) => {
|
||||
count += 1;
|
||||
|
||||
// Get the SIP bridge info (may not be set yet if call isn't linked).
|
||||
let bridge = sip_bridge.lock().await;
|
||||
let bridge_info = match bridge.as_ref() {
|
||||
Some(b) => b.clone(),
|
||||
None => continue, // Not linked to a SIP call yet — drop the packet.
|
||||
};
|
||||
drop(bridge); // Release lock before doing I/O.
|
||||
|
||||
// Extract Opus payload from the RTP packet (skip 12-byte header).
|
||||
let payload = &rtp_packet.payload;
|
||||
if payload.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Transcode Opus → SIP codec (e.g. G.722).
|
||||
let sip_payload = match transcoder.transcode(
|
||||
payload,
|
||||
PT_OPUS,
|
||||
bridge_info.sip_pt,
|
||||
Some("to_sip"),
|
||||
) {
|
||||
Ok(p) if !p.is_empty() => p,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
// Build SIP RTP packet.
|
||||
let header = build_rtp_header(bridge_info.sip_pt, to_sip_seq, to_sip_ts, to_sip_ssrc);
|
||||
let mut sip_rtp = header.to_vec();
|
||||
sip_rtp.extend_from_slice(&sip_payload);
|
||||
|
||||
to_sip_seq = to_sip_seq.wrapping_add(1);
|
||||
to_sip_ts = to_sip_ts.wrapping_add(rtp_clock_increment(bridge_info.sip_pt));
|
||||
|
||||
// Send to provider.
|
||||
let _ = bridge_info
|
||||
.sip_socket
|
||||
.send_to(&sip_rtp, bridge_info.provider_media)
|
||||
.await;
|
||||
|
||||
if count == 1 || count == 50 || count % 500 == 0 {
|
||||
emit_event(
|
||||
&out_tx,
|
||||
"webrtc_audio_tx",
|
||||
serde_json::json!({
|
||||
"session_id": session_id,
|
||||
"direction": "browser_to_sip",
|
||||
"packet_count": count,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(_) => break, // Track ended.
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user