2026-04-10 09:57:27 +00:00
|
|
|
/// SIP proxy engine — the Rust data plane for the SIP router.
|
|
|
|
|
///
|
|
|
|
|
/// Handles ALL SIP protocol mechanics. TypeScript only sends high-level
|
|
|
|
|
/// commands (routing decisions, config) and receives high-level events
|
|
|
|
|
/// (incoming calls, registration state).
|
|
|
|
|
///
|
|
|
|
|
/// No raw SIP ever touches TypeScript.
|
|
|
|
|
|
2026-04-10 11:36:18 +00:00
|
|
|
mod audio_player;
|
2026-04-10 09:57:27 +00:00
|
|
|
mod call;
|
|
|
|
|
mod call_manager;
|
|
|
|
|
mod config;
|
|
|
|
|
mod dtmf;
|
|
|
|
|
mod ipc;
|
2026-04-10 12:52:48 +00:00
|
|
|
mod leg_io;
|
|
|
|
|
mod mixer;
|
2026-04-10 09:57:27 +00:00
|
|
|
mod provider;
|
2026-04-10 11:36:18 +00:00
|
|
|
mod recorder;
|
2026-04-10 09:57:27 +00:00
|
|
|
mod registrar;
|
|
|
|
|
mod rtp;
|
2026-04-10 12:19:20 +00:00
|
|
|
mod sip_leg;
|
2026-04-10 09:57:27 +00:00
|
|
|
mod sip_transport;
|
2026-04-10 14:54:21 +00:00
|
|
|
mod tool_leg;
|
2026-04-10 11:36:18 +00:00
|
|
|
mod voicemail;
|
|
|
|
|
mod webrtc_engine;
|
2026-04-10 09:57:27 +00:00
|
|
|
|
|
|
|
|
use crate::call_manager::CallManager;
|
|
|
|
|
use crate::config::AppConfig;
|
|
|
|
|
use crate::ipc::{emit_event, respond_err, respond_ok, Command, OutTx};
|
|
|
|
|
use crate::provider::ProviderManager;
|
|
|
|
|
use crate::registrar::Registrar;
|
|
|
|
|
use crate::rtp::RtpPortPool;
|
|
|
|
|
use crate::sip_transport::SipTransport;
|
2026-04-10 11:36:18 +00:00
|
|
|
use crate::webrtc_engine::WebRtcEngine;
|
2026-04-10 09:57:27 +00:00
|
|
|
use sip_proto::message::SipMessage;
|
|
|
|
|
use std::net::SocketAddr;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
|
|
|
|
use tokio::net::UdpSocket;
|
|
|
|
|
use tokio::sync::{mpsc, Mutex};
|
|
|
|
|
|
2026-04-10 12:19:20 +00:00
|
|
|
/// Shared mutable state for the proxy engine (SIP side).
|
|
|
|
|
/// WebRTC is intentionally kept in a separate lock to avoid contention
|
|
|
|
|
/// between SIP packet handlers and WebRTC command handlers.
|
2026-04-10 09:57:27 +00:00
|
|
|
struct ProxyEngine {
|
|
|
|
|
config: Option<AppConfig>,
|
|
|
|
|
transport: Option<SipTransport>,
|
|
|
|
|
provider_mgr: ProviderManager,
|
|
|
|
|
registrar: Registrar,
|
|
|
|
|
call_mgr: CallManager,
|
|
|
|
|
rtp_pool: Option<RtpPortPool>,
|
|
|
|
|
out_tx: OutTx,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ProxyEngine {
|
|
|
|
|
fn new(out_tx: OutTx) -> Self {
|
|
|
|
|
Self {
|
|
|
|
|
config: None,
|
|
|
|
|
transport: None,
|
|
|
|
|
provider_mgr: ProviderManager::new(out_tx.clone()),
|
|
|
|
|
registrar: Registrar::new(out_tx.clone()),
|
|
|
|
|
call_mgr: CallManager::new(out_tx.clone()),
|
|
|
|
|
rtp_pool: None,
|
|
|
|
|
out_tx,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[tokio::main]
|
|
|
|
|
async fn main() {
|
|
|
|
|
// Output channel: all stdout writes go through here for serialization.
|
|
|
|
|
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<String>();
|
|
|
|
|
|
|
|
|
|
// Stdout writer task.
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
let mut stdout = tokio::io::stdout();
|
|
|
|
|
while let Some(line) = out_rx.recv().await {
|
|
|
|
|
let mut output = line.into_bytes();
|
|
|
|
|
output.push(b'\n');
|
|
|
|
|
if stdout.write_all(&output).await.is_err() {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
let _ = stdout.flush().await;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
// Emit ready event.
|
|
|
|
|
emit_event(&out_tx, "ready", serde_json::json!({}));
|
|
|
|
|
|
2026-04-10 12:19:20 +00:00
|
|
|
// Shared engine state (SIP side).
|
2026-04-10 09:57:27 +00:00
|
|
|
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone())));
|
|
|
|
|
|
2026-04-10 12:19:20 +00:00
|
|
|
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
|
|
|
|
|
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
|
|
|
|
|
|
2026-04-10 09:57:27 +00:00
|
|
|
// Read commands from stdin.
|
|
|
|
|
let stdin = tokio::io::stdin();
|
|
|
|
|
let reader = BufReader::new(stdin);
|
|
|
|
|
let mut lines = reader.lines();
|
|
|
|
|
|
|
|
|
|
while let Ok(Some(line)) = lines.next_line().await {
|
|
|
|
|
if line.trim().is_empty() {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let cmd: Command = match serde_json::from_str(&line) {
|
|
|
|
|
Ok(c) => c,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
respond_err(&out_tx, "", &format!("parse: {e}"));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let engine = engine.clone();
|
2026-04-10 12:19:20 +00:00
|
|
|
let webrtc = webrtc.clone();
|
2026-04-10 09:57:27 +00:00
|
|
|
let out_tx = out_tx.clone();
|
|
|
|
|
|
|
|
|
|
// Handle commands — some are async, so we spawn.
|
|
|
|
|
tokio::spawn(async move {
|
2026-04-10 12:19:20 +00:00
|
|
|
handle_command(engine, webrtc, &out_tx, cmd).await;
|
2026-04-10 09:57:27 +00:00
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 12:19:20 +00:00
|
|
|
async fn handle_command(
|
|
|
|
|
engine: Arc<Mutex<ProxyEngine>>,
|
|
|
|
|
webrtc: Arc<Mutex<WebRtcEngine>>,
|
|
|
|
|
out_tx: &OutTx,
|
|
|
|
|
cmd: Command,
|
|
|
|
|
) {
|
2026-04-10 09:57:27 +00:00
|
|
|
match cmd.method.as_str() {
|
2026-04-10 12:19:20 +00:00
|
|
|
// SIP commands — lock engine only.
|
2026-04-10 09:57:27 +00:00
|
|
|
"configure" => handle_configure(engine, out_tx, &cmd).await,
|
|
|
|
|
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
|
2026-04-10 11:36:18 +00:00
|
|
|
"make_call" => handle_make_call(engine, out_tx, &cmd).await,
|
2026-04-10 09:57:27 +00:00
|
|
|
"get_status" => handle_get_status(engine, out_tx, &cmd).await,
|
2026-04-10 12:52:48 +00:00
|
|
|
"add_leg" => handle_add_leg(engine, out_tx, &cmd).await,
|
|
|
|
|
"remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await,
|
2026-04-10 12:19:20 +00:00
|
|
|
// WebRTC commands — lock webrtc only (no engine contention).
|
|
|
|
|
"webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await,
|
|
|
|
|
"webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await,
|
|
|
|
|
"webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await,
|
2026-04-10 12:52:48 +00:00
|
|
|
// webrtc_link needs both: engine (for mixer channels) and webrtc (for session).
|
2026-04-10 12:19:20 +00:00
|
|
|
"webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await,
|
2026-04-10 15:12:30 +00:00
|
|
|
"add_device_leg" => handle_add_device_leg(engine, out_tx, &cmd).await,
|
|
|
|
|
"transfer_leg" => handle_transfer_leg(engine, out_tx, &cmd).await,
|
|
|
|
|
"replace_leg" => handle_replace_leg(engine, out_tx, &cmd).await,
|
2026-04-10 14:54:21 +00:00
|
|
|
// Leg interaction and tool leg commands.
|
|
|
|
|
"start_interaction" => handle_start_interaction(engine, out_tx, &cmd).await,
|
|
|
|
|
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
|
|
|
|
|
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
|
|
|
|
|
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
|
2026-04-10 09:57:27 +00:00
|
|
|
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle the `configure` command — receives full app config from TypeScript.
|
|
|
|
|
/// First call: initializes SIP transport + everything.
|
|
|
|
|
/// Subsequent calls: reconfigures providers/devices/routing without rebinding.
|
|
|
|
|
async fn handle_configure(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let app_config: AppConfig = match serde_json::from_value(cmd.params.clone()) {
|
|
|
|
|
Ok(c) => c,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("bad config: {e}"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let is_reconfigure = eng.transport.is_some();
|
|
|
|
|
|
|
|
|
|
let socket = if is_reconfigure {
|
|
|
|
|
// Reconfigure — socket already bound, just update subsystems.
|
|
|
|
|
eng.transport.as_ref().unwrap().socket()
|
|
|
|
|
} else {
|
|
|
|
|
// First configure — bind SIP transport.
|
|
|
|
|
let bind_addr = format!("0.0.0.0:{}", app_config.proxy.lan_port);
|
|
|
|
|
let transport = match SipTransport::bind(&bind_addr).await {
|
|
|
|
|
Ok(t) => t,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("SIP bind failed: {e}"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let socket = transport.socket();
|
|
|
|
|
|
|
|
|
|
// Start UDP receiver.
|
|
|
|
|
let engine_for_recv = engine.clone();
|
|
|
|
|
let socket_for_recv = socket.clone();
|
|
|
|
|
transport.spawn_receiver(move |data: &[u8], addr: SocketAddr| {
|
|
|
|
|
let engine = engine_for_recv.clone();
|
|
|
|
|
let socket = socket_for_recv.clone();
|
|
|
|
|
let data = data.to_vec();
|
|
|
|
|
tokio::spawn(async move {
|
|
|
|
|
handle_sip_packet(engine, &socket, &data, addr).await;
|
|
|
|
|
});
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
eng.transport = Some(transport);
|
|
|
|
|
|
|
|
|
|
// Initialize RTP port pool (only on first configure).
|
|
|
|
|
eng.rtp_pool = Some(RtpPortPool::new(
|
|
|
|
|
app_config.proxy.rtp_port_range.min,
|
|
|
|
|
app_config.proxy.rtp_port_range.max,
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
socket
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// (Re)configure registrar.
|
|
|
|
|
eng.registrar.configure(&app_config.devices);
|
|
|
|
|
|
|
|
|
|
// (Re)configure provider registrations.
|
|
|
|
|
eng.provider_mgr
|
|
|
|
|
.configure(
|
|
|
|
|
&app_config.providers,
|
|
|
|
|
app_config.proxy.public_ip_seed.as_deref(),
|
|
|
|
|
&app_config.proxy.lan_ip,
|
|
|
|
|
app_config.proxy.lan_port,
|
|
|
|
|
socket,
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
let bind_info = format!("0.0.0.0:{}", app_config.proxy.lan_port);
|
|
|
|
|
eng.config = Some(app_config);
|
|
|
|
|
|
|
|
|
|
respond_ok(
|
|
|
|
|
out_tx,
|
|
|
|
|
&cmd.id,
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"bound": bind_info,
|
|
|
|
|
"reconfigure": is_reconfigure,
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle incoming SIP packets from the UDP socket.
|
|
|
|
|
/// This is the core routing pipeline — entirely in Rust.
|
|
|
|
|
async fn handle_sip_packet(
|
|
|
|
|
engine: Arc<Mutex<ProxyEngine>>,
|
|
|
|
|
socket: &UdpSocket,
|
|
|
|
|
data: &[u8],
|
|
|
|
|
from_addr: SocketAddr,
|
|
|
|
|
) {
|
|
|
|
|
let msg = match SipMessage::parse(data) {
|
|
|
|
|
Some(m) => m,
|
|
|
|
|
None => return, // Not a valid SIP message, ignore.
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
|
|
|
|
|
// 1. Provider registration responses — consumed internally.
|
|
|
|
|
if msg.is_response() {
|
|
|
|
|
if eng.provider_mgr.handle_response(&msg, socket).await {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 2. Device REGISTER — handled by registrar.
|
|
|
|
|
let is_from_provider = eng
|
|
|
|
|
.provider_mgr
|
|
|
|
|
.find_by_address(&from_addr)
|
|
|
|
|
.await
|
|
|
|
|
.is_some();
|
|
|
|
|
|
|
|
|
|
if !is_from_provider && msg.method() == Some("REGISTER") {
|
|
|
|
|
if let Some(response_buf) = eng.registrar.handle_register(&msg, from_addr) {
|
|
|
|
|
let _ = socket.send_to(&response_buf, from_addr).await;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 3. Route to existing call by SIP Call-ID.
|
|
|
|
|
if eng.call_mgr.has_call(msg.call_id()) {
|
|
|
|
|
let config_ref = eng.config.as_ref().unwrap().clone();
|
|
|
|
|
if eng
|
|
|
|
|
.call_mgr
|
2026-04-10 12:52:48 +00:00
|
|
|
.route_sip_message(&msg, from_addr, socket, &config_ref)
|
2026-04-10 09:57:27 +00:00
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let config_ref = eng.config.as_ref().unwrap().clone();
|
|
|
|
|
|
|
|
|
|
// 4. New inbound INVITE from provider.
|
|
|
|
|
if is_from_provider && msg.is_request() && msg.method() == Some("INVITE") {
|
|
|
|
|
// Detect public IP from Via.
|
|
|
|
|
if let Some(via) = msg.get_header("Via") {
|
|
|
|
|
if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
|
|
|
|
|
let mut ps = ps_arc.lock().await;
|
|
|
|
|
ps.detect_public_ip(via);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Send 100 Trying immediately.
|
|
|
|
|
let trying = SipMessage::create_response(100, "Trying", &msg, None);
|
|
|
|
|
let _ = socket.send_to(&trying.serialize(), from_addr).await;
|
|
|
|
|
|
|
|
|
|
// Determine provider info.
|
|
|
|
|
let (provider_id, provider_config, public_ip) =
|
|
|
|
|
if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
|
|
|
|
|
let ps = ps_arc.lock().await;
|
|
|
|
|
(
|
|
|
|
|
ps.config.id.clone(),
|
|
|
|
|
ps.config.clone(),
|
|
|
|
|
ps.public_ip.clone(),
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
return;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Create the inbound call — Rust handles everything.
|
|
|
|
|
// Split borrows via destructuring to satisfy the borrow checker.
|
|
|
|
|
let ProxyEngine {
|
|
|
|
|
ref registrar,
|
|
|
|
|
ref mut call_mgr,
|
|
|
|
|
ref mut rtp_pool,
|
|
|
|
|
..
|
|
|
|
|
} = *eng;
|
|
|
|
|
let rtp_pool = rtp_pool.as_mut().unwrap();
|
|
|
|
|
let call_id = call_mgr
|
|
|
|
|
.create_inbound_call(
|
|
|
|
|
&msg,
|
|
|
|
|
from_addr,
|
|
|
|
|
&provider_id,
|
|
|
|
|
&provider_config,
|
|
|
|
|
&config_ref,
|
|
|
|
|
registrar,
|
|
|
|
|
rtp_pool,
|
|
|
|
|
socket,
|
|
|
|
|
public_ip.as_deref(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
if let Some(call_id) = call_id {
|
|
|
|
|
// Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc).
|
|
|
|
|
let from_header = msg.get_header("From").unwrap_or("");
|
|
|
|
|
let from_uri = SipMessage::extract_uri(from_header).unwrap_or("Unknown");
|
|
|
|
|
let called_number = msg
|
|
|
|
|
.request_uri()
|
|
|
|
|
.and_then(|uri| SipMessage::extract_uri(uri))
|
|
|
|
|
.unwrap_or("");
|
|
|
|
|
|
|
|
|
|
emit_event(
|
|
|
|
|
&eng.out_tx,
|
|
|
|
|
"incoming_call",
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"call_id": call_id,
|
|
|
|
|
"from_uri": from_uri,
|
|
|
|
|
"to_number": called_number,
|
|
|
|
|
"provider_id": provider_id,
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 5. New outbound INVITE from device.
|
|
|
|
|
if !is_from_provider && msg.is_request() && msg.method() == Some("INVITE") {
|
|
|
|
|
// Resolve outbound route.
|
|
|
|
|
let dialed_number = msg
|
|
|
|
|
.request_uri()
|
|
|
|
|
.and_then(|uri| SipMessage::extract_uri(uri))
|
|
|
|
|
.unwrap_or(msg.request_uri().unwrap_or(""))
|
|
|
|
|
.to_string();
|
|
|
|
|
|
|
|
|
|
let device = eng.registrar.find_by_address(&from_addr);
|
|
|
|
|
let device_id = device.map(|d| d.device_id.clone());
|
|
|
|
|
|
|
|
|
|
// Find provider via routing rules.
|
|
|
|
|
let route_result = config_ref.resolve_outbound_route(
|
|
|
|
|
&dialed_number,
|
|
|
|
|
device_id.as_deref(),
|
|
|
|
|
&|pid: &str| {
|
|
|
|
|
// Can't call async here — use a sync check.
|
|
|
|
|
// For now, assume all configured providers are available.
|
|
|
|
|
true
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
if let Some(route) = route_result {
|
2026-04-10 14:54:21 +00:00
|
|
|
// Look up provider state by config ID (not by device address).
|
|
|
|
|
let (public_ip, registered_aor) = if let Some(ps_arc) =
|
|
|
|
|
eng.provider_mgr.find_by_provider_id(&route.provider.id).await
|
|
|
|
|
{
|
2026-04-10 09:57:27 +00:00
|
|
|
let ps = ps_arc.lock().await;
|
2026-04-10 14:54:21 +00:00
|
|
|
(ps.public_ip.clone(), ps.registered_aor.clone())
|
2026-04-10 09:57:27 +00:00
|
|
|
} else {
|
2026-04-10 14:54:21 +00:00
|
|
|
(None, format!("sip:{}@{}", route.provider.username, route.provider.domain))
|
2026-04-10 09:57:27 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let ProxyEngine {
|
|
|
|
|
ref mut call_mgr,
|
|
|
|
|
ref mut rtp_pool,
|
|
|
|
|
..
|
|
|
|
|
} = *eng;
|
|
|
|
|
let rtp_pool = rtp_pool.as_mut().unwrap();
|
|
|
|
|
let call_id = call_mgr
|
2026-04-10 14:54:21 +00:00
|
|
|
.create_device_outbound_call(
|
2026-04-10 09:57:27 +00:00
|
|
|
&msg,
|
|
|
|
|
from_addr,
|
|
|
|
|
&route.provider,
|
|
|
|
|
&config_ref,
|
|
|
|
|
rtp_pool,
|
|
|
|
|
socket,
|
|
|
|
|
public_ip.as_deref(),
|
2026-04-10 14:54:21 +00:00
|
|
|
®istered_aor,
|
2026-04-10 09:57:27 +00:00
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
if let Some(call_id) = call_id {
|
|
|
|
|
emit_event(
|
|
|
|
|
&eng.out_tx,
|
|
|
|
|
"outbound_device_call",
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"call_id": call_id,
|
|
|
|
|
"from_device": device_id,
|
|
|
|
|
"to_number": dialed_number,
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 6. Other messages — log for debugging.
|
|
|
|
|
let label = if msg.is_request() {
|
|
|
|
|
msg.method().unwrap_or("?").to_string()
|
|
|
|
|
} else {
|
|
|
|
|
msg.status_code().map(|c| c.to_string()).unwrap_or_default()
|
|
|
|
|
};
|
|
|
|
|
emit_event(
|
|
|
|
|
&eng.out_tx,
|
|
|
|
|
"sip_unhandled",
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"method_or_status": label,
|
|
|
|
|
"call_id": msg.call_id(),
|
|
|
|
|
"from_addr": from_addr.ip().to_string(),
|
|
|
|
|
"from_port": from_addr.port(),
|
|
|
|
|
"is_from_provider": is_from_provider,
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle `get_status` — return active call statuses from Rust.
|
|
|
|
|
async fn handle_get_status(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let eng = engine.lock().await;
|
|
|
|
|
let calls = eng.call_mgr.get_all_statuses();
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({ "calls": calls }));
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 11:36:18 +00:00
|
|
|
/// Handle `make_call` — initiate an outbound call to a number via a provider.
|
|
|
|
|
async fn handle_make_call(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(n) => n.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
|
|
|
|
|
};
|
|
|
|
|
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let config_ref = match &eng.config {
|
|
|
|
|
Some(c) => c.clone(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Resolve provider.
|
|
|
|
|
let provider_config = if let Some(pid) = provider_id {
|
|
|
|
|
config_ref.providers.iter().find(|p| p.id == pid).cloned()
|
|
|
|
|
} else {
|
|
|
|
|
// Use route resolution or first provider.
|
|
|
|
|
let route = config_ref.resolve_outbound_route(&number, None, &|_| true);
|
|
|
|
|
route.map(|r| r.provider)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let provider_config = match provider_config {
|
|
|
|
|
Some(p) => p,
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Get public IP and registered AOR from provider state.
|
|
|
|
|
let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
|
|
|
|
|
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
|
|
|
|
|
).await {
|
|
|
|
|
let ps = ps_arc.lock().await;
|
|
|
|
|
(ps.public_ip.clone(), ps.registered_aor.clone())
|
|
|
|
|
} else {
|
|
|
|
|
// Fallback — construct AOR from config.
|
|
|
|
|
(None, format!("sip:{}@{}", provider_config.username, provider_config.domain))
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let socket = match &eng.transport {
|
|
|
|
|
Some(t) => t.socket(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
|
|
|
|
|
let rtp_pool = rtp_pool.as_mut().unwrap();
|
|
|
|
|
|
|
|
|
|
let call_id = call_mgr.make_outbound_call(
|
|
|
|
|
&number,
|
|
|
|
|
&provider_config,
|
|
|
|
|
&config_ref,
|
|
|
|
|
rtp_pool,
|
|
|
|
|
&socket,
|
|
|
|
|
public_ip.as_deref(),
|
|
|
|
|
®istered_aor,
|
|
|
|
|
).await;
|
|
|
|
|
|
|
|
|
|
match call_id {
|
|
|
|
|
Some(id) => {
|
|
|
|
|
emit_event(out_tx, "outbound_call_started", serde_json::json!({
|
|
|
|
|
"call_id": id,
|
|
|
|
|
"number": number,
|
|
|
|
|
"provider_id": provider_config.id,
|
|
|
|
|
}));
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({ "call_id": id }));
|
|
|
|
|
}
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, "call origination failed — provider not registered or no ports available");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 09:57:27 +00:00
|
|
|
/// Handle the `hangup` command.
|
|
|
|
|
async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(id) => id.to_string(),
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, "missing call_id");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let socket = match &eng.transport {
|
|
|
|
|
Some(t) => t.socket(),
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, "not initialized");
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if eng.call_mgr.hangup(&call_id, &socket).await {
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
|
|
|
|
} else {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-10 11:36:18 +00:00
|
|
|
|
|
|
|
|
/// Handle `webrtc_offer` — browser sends SDP offer, we create PeerConnection and return answer.
|
2026-04-10 12:19:20 +00:00
|
|
|
/// Uses only the WebRTC lock — no contention with SIP handlers.
|
|
|
|
|
async fn handle_webrtc_offer(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
|
2026-04-10 11:36:18 +00:00
|
|
|
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let offer_sdp = match cmd.params.get("sdp").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
2026-04-10 12:19:20 +00:00
|
|
|
let mut wrtc = webrtc.lock().await;
|
|
|
|
|
match wrtc.handle_offer(&session_id, &offer_sdp).await {
|
2026-04-10 11:36:18 +00:00
|
|
|
Ok(answer_sdp) => {
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({
|
|
|
|
|
"session_id": session_id,
|
|
|
|
|
"sdp": answer_sdp,
|
|
|
|
|
}));
|
|
|
|
|
}
|
|
|
|
|
Err(e) => respond_err(out_tx, &cmd.id, &e),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle `webrtc_ice` — forward ICE candidate from browser to Rust PeerConnection.
|
2026-04-10 12:19:20 +00:00
|
|
|
/// Uses only the WebRTC lock.
|
|
|
|
|
async fn handle_webrtc_ice(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
|
2026-04-10 11:36:18 +00:00
|
|
|
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let candidate = cmd.params.get("candidate").and_then(|v| v.as_str()).unwrap_or("");
|
|
|
|
|
let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str());
|
|
|
|
|
let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16);
|
|
|
|
|
|
2026-04-10 12:19:20 +00:00
|
|
|
let wrtc = webrtc.lock().await;
|
|
|
|
|
match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
|
2026-04-10 11:36:18 +00:00
|
|
|
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
|
|
|
|
|
Err(e) => respond_err(out_tx, &cmd.id, &e),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 12:52:48 +00:00
|
|
|
/// Handle `webrtc_link` — link a WebRTC session to a call's mixer for audio bridging.
|
|
|
|
|
/// Creates channels, adds WebRTC leg to the call, wires the WebRTC engine.
|
2026-04-10 12:19:20 +00:00
|
|
|
/// Locks are never held simultaneously — no deadlock possible.
|
|
|
|
|
async fn handle_webrtc_link(
|
|
|
|
|
engine: Arc<Mutex<ProxyEngine>>,
|
|
|
|
|
webrtc: Arc<Mutex<WebRtcEngine>>,
|
|
|
|
|
out_tx: &OutTx,
|
|
|
|
|
cmd: &Command,
|
|
|
|
|
) {
|
2026-04-10 11:36:18 +00:00
|
|
|
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
2026-04-10 12:52:48 +00:00
|
|
|
// Create channels for the WebRTC leg.
|
|
|
|
|
let channels = crate::leg_io::create_leg_channels();
|
2026-04-10 11:36:18 +00:00
|
|
|
|
2026-04-10 12:52:48 +00:00
|
|
|
// Briefly lock engine to add the WebRTC leg to the call's mixer.
|
|
|
|
|
{
|
2026-04-10 12:19:20 +00:00
|
|
|
let eng = engine.lock().await;
|
2026-04-10 12:52:48 +00:00
|
|
|
let call = match eng.call_mgr.calls.get(&call_id) {
|
|
|
|
|
Some(c) => c,
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
// Add to mixer via channel.
|
|
|
|
|
call.add_leg_to_mixer(
|
|
|
|
|
&session_id,
|
|
|
|
|
codec_lib::PT_OPUS,
|
|
|
|
|
channels.inbound_rx,
|
|
|
|
|
channels.outbound_tx,
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
} // engine lock released
|
2026-04-10 12:19:20 +00:00
|
|
|
|
2026-04-10 12:52:48 +00:00
|
|
|
// Lock webrtc to wire the channels.
|
|
|
|
|
let mut wrtc = webrtc.lock().await;
|
|
|
|
|
if wrtc
|
|
|
|
|
.link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
// Also store the WebRTC leg info in the call.
|
|
|
|
|
drop(wrtc); // Release webrtc lock before re-acquiring engine.
|
|
|
|
|
{
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
|
|
|
|
|
call.legs.insert(
|
|
|
|
|
session_id.clone(),
|
|
|
|
|
crate::call::LegInfo {
|
|
|
|
|
id: session_id.clone(),
|
|
|
|
|
kind: crate::call::LegKind::WebRtc,
|
|
|
|
|
state: crate::call::LegState::Connected,
|
|
|
|
|
codec_pt: codec_lib::PT_OPUS,
|
|
|
|
|
sip_leg: None,
|
|
|
|
|
sip_call_id: None,
|
|
|
|
|
webrtc_session_id: Some(session_id.clone()),
|
|
|
|
|
rtp_socket: None,
|
|
|
|
|
rtp_port: 0,
|
|
|
|
|
remote_media: None,
|
|
|
|
|
signaling_addr: None,
|
2026-04-10 14:54:21 +00:00
|
|
|
metadata: std::collections::HashMap::new(),
|
2026-04-10 12:52:48 +00:00
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
}
|
2026-04-10 12:19:20 +00:00
|
|
|
}
|
2026-04-10 11:36:18 +00:00
|
|
|
|
2026-04-10 12:52:48 +00:00
|
|
|
emit_event(out_tx, "leg_added", serde_json::json!({
|
|
|
|
|
"call_id": call_id,
|
|
|
|
|
"leg_id": session_id,
|
|
|
|
|
"kind": "webrtc",
|
|
|
|
|
"state": "connected",
|
|
|
|
|
}));
|
2026-04-10 11:36:18 +00:00
|
|
|
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({
|
|
|
|
|
"session_id": session_id,
|
|
|
|
|
"call_id": call_id,
|
|
|
|
|
"bridged": true,
|
|
|
|
|
}));
|
|
|
|
|
} else {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("session {session_id} not found"));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 12:52:48 +00:00
|
|
|
/// Handle `add_leg` — add a new SIP leg to an existing call.
|
|
|
|
|
async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(n) => n.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
|
|
|
|
|
};
|
|
|
|
|
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let config_ref = match &eng.config {
|
|
|
|
|
Some(c) => c.clone(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Resolve provider.
|
|
|
|
|
let provider_config = if let Some(pid) = provider_id {
|
|
|
|
|
config_ref.providers.iter().find(|p| p.id == pid).cloned()
|
|
|
|
|
} else {
|
|
|
|
|
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let provider_config = match provider_config {
|
|
|
|
|
Some(p) => p,
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Get registered AOR.
|
|
|
|
|
let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
|
|
|
|
|
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
|
|
|
|
|
).await {
|
|
|
|
|
let ps = ps_arc.lock().await;
|
|
|
|
|
ps.registered_aor.clone()
|
|
|
|
|
} else {
|
|
|
|
|
format!("sip:{}@{}", provider_config.username, provider_config.domain)
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
|
|
|
|
|
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
|
|
|
|
|
).await {
|
|
|
|
|
let ps = ps_arc.lock().await;
|
|
|
|
|
ps.public_ip.clone()
|
|
|
|
|
} else {
|
|
|
|
|
None
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let socket = match &eng.transport {
|
|
|
|
|
Some(t) => t.socket(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
|
|
|
|
|
let rtp_pool = rtp_pool.as_mut().unwrap();
|
|
|
|
|
|
|
|
|
|
let leg_id = call_mgr.add_external_leg(
|
|
|
|
|
&call_id, &number, &provider_config, &config_ref,
|
|
|
|
|
rtp_pool, &socket, public_ip.as_deref(), ®istered_aor,
|
|
|
|
|
).await;
|
|
|
|
|
|
|
|
|
|
match leg_id {
|
|
|
|
|
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
|
|
|
|
|
None => respond_err(out_tx, &cmd.id, "failed to add leg"),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 15:12:30 +00:00
|
|
|
/// Handle `add_device_leg` — add a local SIP device to an existing call.
|
|
|
|
|
async fn handle_add_device_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let device_id = match cmd.params.get("device_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing device_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let config_ref = match &eng.config {
|
|
|
|
|
Some(c) => c.clone(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
|
|
|
|
|
};
|
|
|
|
|
let socket = match &eng.transport {
|
|
|
|
|
Some(t) => t.socket(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let ProxyEngine { ref registrar, ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
|
|
|
|
|
let rtp_pool = rtp_pool.as_mut().unwrap();
|
|
|
|
|
|
|
|
|
|
let leg_id = call_mgr.add_device_leg(
|
|
|
|
|
&call_id, &device_id, registrar, &config_ref, rtp_pool, &socket,
|
|
|
|
|
).await;
|
|
|
|
|
|
|
|
|
|
match leg_id {
|
|
|
|
|
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
|
|
|
|
|
None => respond_err(out_tx, &cmd.id, "failed to add device leg — device not registered or call not found"),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle `transfer_leg` — move a leg from one call to another.
|
|
|
|
|
async fn handle_transfer_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let source_call_id = match cmd.params.get("source_call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing source_call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let target_call_id = match cmd.params.get("target_call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing target_call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
if eng.call_mgr.transfer_leg(&source_call_id, &leg_id, &target_call_id).await {
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
|
|
|
|
} else {
|
|
|
|
|
respond_err(out_tx, &cmd.id, "transfer failed — call or leg not found");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle `replace_leg` — terminate a leg and dial a replacement into the same call.
|
|
|
|
|
async fn handle_replace_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let old_leg_id = match cmd.params.get("old_leg_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing old_leg_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(n) => n.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
|
|
|
|
|
};
|
|
|
|
|
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let config_ref = match &eng.config {
|
|
|
|
|
Some(c) => c.clone(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
|
|
|
|
|
};
|
|
|
|
|
let socket = match &eng.transport {
|
|
|
|
|
Some(t) => t.socket(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Resolve provider.
|
|
|
|
|
let provider_config = if let Some(pid) = provider_id {
|
|
|
|
|
config_ref.providers.iter().find(|p| p.id == pid).cloned()
|
|
|
|
|
} else {
|
|
|
|
|
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
|
|
|
|
|
};
|
|
|
|
|
let provider_config = match provider_config {
|
|
|
|
|
Some(p) => p,
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_provider_id(&provider_config.id).await {
|
|
|
|
|
let ps = ps_arc.lock().await;
|
|
|
|
|
(ps.public_ip.clone(), ps.registered_aor.clone())
|
|
|
|
|
} else {
|
|
|
|
|
(None, format!("sip:{}@{}", provider_config.username, provider_config.domain))
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
|
|
|
|
|
let rtp_pool = rtp_pool.as_mut().unwrap();
|
|
|
|
|
|
|
|
|
|
let new_leg_id = call_mgr.replace_leg(
|
|
|
|
|
&call_id, &old_leg_id, &number, &provider_config, &config_ref,
|
|
|
|
|
rtp_pool, &socket, public_ip.as_deref(), ®istered_aor,
|
|
|
|
|
).await;
|
|
|
|
|
|
|
|
|
|
match new_leg_id {
|
|
|
|
|
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "new_leg_id": lid })),
|
|
|
|
|
None => respond_err(out_tx, &cmd.id, "replace failed — call ended or dial failed"),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 12:52:48 +00:00
|
|
|
/// Handle `remove_leg` — remove a leg from a call.
|
|
|
|
|
async fn handle_remove_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let socket = match &eng.transport {
|
|
|
|
|
Some(t) => t.socket(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await {
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
|
|
|
|
} else {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("call/leg not found"));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-04-10 11:36:18 +00:00
|
|
|
/// Handle `webrtc_close` — close a WebRTC session.
|
2026-04-10 12:19:20 +00:00
|
|
|
/// Uses only the WebRTC lock.
|
|
|
|
|
async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
|
2026-04-10 11:36:18 +00:00
|
|
|
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
2026-04-10 12:19:20 +00:00
|
|
|
let mut wrtc = webrtc.lock().await;
|
|
|
|
|
match wrtc.close_session(&session_id).await {
|
2026-04-10 11:36:18 +00:00
|
|
|
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
|
|
|
|
|
Err(e) => respond_err(out_tx, &cmd.id, &e),
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-04-10 14:54:21 +00:00
|
|
|
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
// Leg interaction & tool leg commands
|
|
|
|
|
// ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
/// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF.
|
|
|
|
|
/// This command blocks until the interaction completes (digit, timeout, or cancel).
|
|
|
|
|
async fn handle_start_interaction(
|
|
|
|
|
engine: Arc<Mutex<ProxyEngine>>,
|
|
|
|
|
out_tx: &OutTx,
|
|
|
|
|
cmd: &Command,
|
|
|
|
|
) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let prompt_wav = match cmd.params.get("prompt_wav").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing prompt_wav"); return; }
|
|
|
|
|
};
|
|
|
|
|
let expected_digits: Vec<char> = cmd
|
|
|
|
|
.params
|
|
|
|
|
.get("expected_digits")
|
|
|
|
|
.and_then(|v| v.as_str())
|
|
|
|
|
.unwrap_or("12")
|
|
|
|
|
.chars()
|
|
|
|
|
.collect();
|
|
|
|
|
let timeout_ms = cmd
|
|
|
|
|
.params
|
|
|
|
|
.get("timeout_ms")
|
|
|
|
|
.and_then(|v| v.as_u64())
|
|
|
|
|
.unwrap_or(15000) as u32;
|
|
|
|
|
|
|
|
|
|
// Load prompt audio from WAV file.
|
|
|
|
|
let prompt_frames = match crate::audio_player::load_prompt_pcm_frames(&prompt_wav) {
|
|
|
|
|
Ok(f) => f,
|
|
|
|
|
Err(e) => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("prompt load failed: {e}"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Create oneshot channel for the result.
|
|
|
|
|
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
|
|
|
|
|
|
|
|
|
// Send StartInteraction to the mixer.
|
|
|
|
|
{
|
|
|
|
|
let eng = engine.lock().await;
|
|
|
|
|
let call = match eng.call_mgr.calls.get(&call_id) {
|
|
|
|
|
Some(c) => c,
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
let _ = call
|
|
|
|
|
.mixer_cmd_tx
|
|
|
|
|
.send(crate::mixer::MixerCommand::StartInteraction {
|
|
|
|
|
leg_id: leg_id.clone(),
|
|
|
|
|
prompt_pcm_frames: prompt_frames,
|
|
|
|
|
expected_digits: expected_digits.clone(),
|
|
|
|
|
timeout_ms,
|
|
|
|
|
result_tx,
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
} // engine lock released — we block on the oneshot, not the lock.
|
|
|
|
|
|
|
|
|
|
// Await the interaction result (blocks this task until complete).
|
|
|
|
|
let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
|
|
|
|
|
let result = match tokio::time::timeout(safety_timeout, result_rx).await {
|
|
|
|
|
Ok(Ok(r)) => r,
|
|
|
|
|
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled, // oneshot dropped
|
|
|
|
|
Err(_) => crate::mixer::InteractionResult::Timeout, // safety timeout
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Store consent result in leg metadata.
|
|
|
|
|
let (result_str, digit_str) = match &result {
|
|
|
|
|
crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())),
|
|
|
|
|
crate::mixer::InteractionResult::Timeout => ("timeout", None),
|
|
|
|
|
crate::mixer::InteractionResult::Cancelled => ("cancelled", None),
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
|
|
|
|
|
if let Some(leg) = call.legs.get_mut(&leg_id) {
|
|
|
|
|
leg.metadata.insert(
|
|
|
|
|
"last_interaction_result".to_string(),
|
|
|
|
|
serde_json::json!(result_str),
|
|
|
|
|
);
|
|
|
|
|
if let Some(ref d) = digit_str {
|
|
|
|
|
leg.metadata.insert(
|
|
|
|
|
"last_interaction_digit".to_string(),
|
|
|
|
|
serde_json::json!(d),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let mut resp = serde_json::json!({ "result": result_str });
|
|
|
|
|
if let Some(d) = digit_str {
|
|
|
|
|
resp["digit"] = serde_json::json!(d);
|
|
|
|
|
}
|
|
|
|
|
respond_ok(out_tx, &cmd.id, resp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle `add_tool_leg` — add a recording or transcription tool leg to a call.
|
|
|
|
|
async fn handle_add_tool_leg(
|
|
|
|
|
engine: Arc<Mutex<ProxyEngine>>,
|
|
|
|
|
out_tx: &OutTx,
|
|
|
|
|
cmd: &Command,
|
|
|
|
|
) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let tool_type_str = match cmd.params.get("tool_type").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing tool_type"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let tool_type = match tool_type_str.as_str() {
|
|
|
|
|
"recording" => crate::mixer::ToolType::Recording,
|
|
|
|
|
"transcription" => crate::mixer::ToolType::Transcription,
|
|
|
|
|
other => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("unknown tool_type: {other}"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let tool_leg_id = format!("{call_id}-tool-{}", rand::random::<u32>());
|
|
|
|
|
|
|
|
|
|
// Spawn the appropriate background task.
|
|
|
|
|
let (audio_tx, _task_handle) = match tool_type {
|
|
|
|
|
crate::mixer::ToolType::Recording => {
|
|
|
|
|
let base_dir = cmd
|
|
|
|
|
.params
|
|
|
|
|
.get("config")
|
|
|
|
|
.and_then(|c| c.get("base_dir"))
|
|
|
|
|
.and_then(|v| v.as_str())
|
|
|
|
|
.unwrap_or(".nogit/recordings")
|
|
|
|
|
.to_string();
|
|
|
|
|
crate::tool_leg::spawn_recording_tool(
|
|
|
|
|
tool_leg_id.clone(),
|
|
|
|
|
call_id.clone(),
|
|
|
|
|
base_dir,
|
|
|
|
|
out_tx.clone(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
crate::mixer::ToolType::Transcription => {
|
|
|
|
|
crate::tool_leg::spawn_transcription_tool(
|
|
|
|
|
tool_leg_id.clone(),
|
|
|
|
|
call_id.clone(),
|
|
|
|
|
out_tx.clone(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Send AddToolLeg to the mixer and register in call.
|
|
|
|
|
{
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let call = match eng.call_mgr.calls.get_mut(&call_id) {
|
|
|
|
|
Some(c) => c,
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let _ = call
|
|
|
|
|
.mixer_cmd_tx
|
|
|
|
|
.send(crate::mixer::MixerCommand::AddToolLeg {
|
|
|
|
|
leg_id: tool_leg_id.clone(),
|
|
|
|
|
tool_type,
|
|
|
|
|
audio_tx,
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
// Register tool leg in the call's leg map.
|
|
|
|
|
let mut metadata = std::collections::HashMap::new();
|
|
|
|
|
metadata.insert(
|
|
|
|
|
"tool_type".to_string(),
|
|
|
|
|
serde_json::json!(tool_type_str),
|
|
|
|
|
);
|
|
|
|
|
call.legs.insert(
|
|
|
|
|
tool_leg_id.clone(),
|
|
|
|
|
crate::call::LegInfo {
|
|
|
|
|
id: tool_leg_id.clone(),
|
|
|
|
|
kind: crate::call::LegKind::Tool,
|
|
|
|
|
state: crate::call::LegState::Connected,
|
|
|
|
|
codec_pt: 0,
|
|
|
|
|
sip_leg: None,
|
|
|
|
|
sip_call_id: None,
|
|
|
|
|
webrtc_session_id: None,
|
|
|
|
|
rtp_socket: None,
|
|
|
|
|
rtp_port: 0,
|
|
|
|
|
remote_media: None,
|
|
|
|
|
signaling_addr: None,
|
|
|
|
|
metadata,
|
|
|
|
|
},
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
emit_event(
|
|
|
|
|
out_tx,
|
|
|
|
|
"leg_added",
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"call_id": call_id,
|
|
|
|
|
"leg_id": tool_leg_id,
|
|
|
|
|
"kind": "tool",
|
|
|
|
|
"tool_type": tool_type_str,
|
|
|
|
|
"state": "connected",
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
respond_ok(
|
|
|
|
|
out_tx,
|
|
|
|
|
&cmd.id,
|
|
|
|
|
serde_json::json!({ "tool_leg_id": tool_leg_id }),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle `remove_tool_leg` — remove a tool leg from a call.
|
|
|
|
|
async fn handle_remove_tool_leg(
|
|
|
|
|
engine: Arc<Mutex<ProxyEngine>>,
|
|
|
|
|
out_tx: &OutTx,
|
|
|
|
|
cmd: &Command,
|
|
|
|
|
) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let tool_leg_id = match cmd.params.get("tool_leg_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing tool_leg_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let call = match eng.call_mgr.calls.get_mut(&call_id) {
|
|
|
|
|
Some(c) => c,
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Remove from mixer (drops audio_tx → background task finalizes).
|
|
|
|
|
let _ = call
|
|
|
|
|
.mixer_cmd_tx
|
|
|
|
|
.send(crate::mixer::MixerCommand::RemoveToolLeg {
|
|
|
|
|
leg_id: tool_leg_id.clone(),
|
|
|
|
|
})
|
|
|
|
|
.await;
|
|
|
|
|
|
|
|
|
|
// Remove from call's leg map.
|
|
|
|
|
call.legs.remove(&tool_leg_id);
|
|
|
|
|
|
|
|
|
|
emit_event(
|
|
|
|
|
out_tx,
|
|
|
|
|
"leg_removed",
|
|
|
|
|
serde_json::json!({
|
|
|
|
|
"call_id": call_id,
|
|
|
|
|
"leg_id": tool_leg_id,
|
|
|
|
|
}),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Handle `set_leg_metadata` — set a metadata key on a leg.
|
|
|
|
|
async fn handle_set_leg_metadata(
|
|
|
|
|
engine: Arc<Mutex<ProxyEngine>>,
|
|
|
|
|
out_tx: &OutTx,
|
|
|
|
|
cmd: &Command,
|
|
|
|
|
) {
|
|
|
|
|
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
|
|
|
|
|
};
|
|
|
|
|
let key = match cmd.params.get("key").and_then(|v| v.as_str()) {
|
|
|
|
|
Some(s) => s.to_string(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing key"); return; }
|
|
|
|
|
};
|
|
|
|
|
let value = match cmd.params.get("value") {
|
|
|
|
|
Some(v) => v.clone(),
|
|
|
|
|
None => { respond_err(out_tx, &cmd.id, "missing value"); return; }
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let mut eng = engine.lock().await;
|
|
|
|
|
let call = match eng.call_mgr.calls.get_mut(&call_id) {
|
|
|
|
|
Some(c) => c,
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
let leg = match call.legs.get_mut(&leg_id) {
|
|
|
|
|
Some(l) => l,
|
|
|
|
|
None => {
|
|
|
|
|
respond_err(out_tx, &cmd.id, &format!("leg {leg_id} not found"));
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
leg.metadata.insert(key, value);
|
|
|
|
|
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
|
|
|
|
}
|