Files
siprouter/rust/crates/proxy-engine/src/main.rs

1249 lines
44 KiB
Rust

/// SIP proxy engine — the Rust data plane for the SIP router.
///
/// Handles ALL SIP protocol mechanics. TypeScript only sends high-level
/// commands (routing decisions, config) and receives high-level events
/// (incoming calls, registration state).
///
/// No raw SIP ever touches TypeScript.
mod audio_player;
mod call;
mod call_manager;
mod config;
mod dtmf;
mod ipc;
mod leg_io;
mod mixer;
mod provider;
mod recorder;
mod registrar;
mod rtp;
mod sip_leg;
mod sip_transport;
mod tool_leg;
mod tts;
mod voicemail;
mod webrtc_engine;
use crate::call_manager::CallManager;
use crate::config::AppConfig;
use crate::ipc::{emit_event, respond_err, respond_ok, Command, OutTx};
use crate::provider::ProviderManager;
use crate::registrar::Registrar;
use crate::rtp::RtpPortPool;
use crate::sip_transport::SipTransport;
use crate::webrtc_engine::WebRtcEngine;
use sip_proto::message::SipMessage;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, Mutex};
/// Shared mutable state for the proxy engine (SIP side).
/// WebRTC is intentionally kept in a separate lock to avoid contention
/// between SIP packet handlers and WebRTC command handlers.
struct ProxyEngine {
config: Option<AppConfig>,
transport: Option<SipTransport>,
provider_mgr: ProviderManager,
registrar: Registrar,
call_mgr: CallManager,
rtp_pool: Option<RtpPortPool>,
out_tx: OutTx,
}
impl ProxyEngine {
fn new(out_tx: OutTx) -> Self {
Self {
config: None,
transport: None,
provider_mgr: ProviderManager::new(out_tx.clone()),
registrar: Registrar::new(out_tx.clone()),
call_mgr: CallManager::new(out_tx.clone()),
rtp_pool: None,
out_tx,
}
}
}
#[tokio::main]
async fn main() {
// Output channel: all stdout writes go through here for serialization.
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<String>();
// Stdout writer task.
tokio::spawn(async move {
let mut stdout = tokio::io::stdout();
while let Some(line) = out_rx.recv().await {
let mut output = line.into_bytes();
output.push(b'\n');
if stdout.write_all(&output).await.is_err() {
break;
}
let _ = stdout.flush().await;
}
});
// Emit ready event.
emit_event(&out_tx, "ready", serde_json::json!({}));
// Shared engine state (SIP side).
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone())));
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
// TTS engine — separate lock, lazy-loads model on first use.
let tts_engine = Arc::new(Mutex::new(tts::TtsEngine::new()));
// Read commands from stdin.
let stdin = tokio::io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
let cmd: Command = match serde_json::from_str(&line) {
Ok(c) => c,
Err(e) => {
respond_err(&out_tx, "", &format!("parse: {e}"));
continue;
}
};
let engine = engine.clone();
let webrtc = webrtc.clone();
let tts_engine = tts_engine.clone();
let out_tx = out_tx.clone();
// Handle commands — some are async, so we spawn.
tokio::spawn(async move {
handle_command(engine, webrtc, tts_engine, &out_tx, cmd).await;
});
}
}
async fn handle_command(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: &OutTx,
cmd: Command,
) {
match cmd.method.as_str() {
// SIP commands — lock engine only.
"configure" => handle_configure(engine, out_tx, &cmd).await,
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
"make_call" => handle_make_call(engine, out_tx, &cmd).await,
"get_status" => handle_get_status(engine, out_tx, &cmd).await,
"add_leg" => handle_add_leg(engine, out_tx, &cmd).await,
"remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await,
// WebRTC commands — lock webrtc only (no engine contention).
"webrtc_offer" => handle_webrtc_offer(webrtc, out_tx, &cmd).await,
"webrtc_ice" => handle_webrtc_ice(webrtc, out_tx, &cmd).await,
"webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await,
// webrtc_link needs both: engine (for mixer channels) and webrtc (for session).
"webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await,
"add_device_leg" => handle_add_device_leg(engine, out_tx, &cmd).await,
"transfer_leg" => handle_transfer_leg(engine, out_tx, &cmd).await,
"replace_leg" => handle_replace_leg(engine, out_tx, &cmd).await,
// Leg interaction and tool leg commands.
"start_interaction" => handle_start_interaction(engine, out_tx, &cmd).await,
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
// TTS command — lock tts_engine only (no SIP/WebRTC contention).
"generate_tts" => handle_generate_tts(tts_engine, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
}
/// Handle the `configure` command — receives full app config from TypeScript.
/// First call: initializes SIP transport + everything.
/// Subsequent calls: reconfigures providers/devices/routing without rebinding.
async fn handle_configure(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let app_config: AppConfig = match serde_json::from_value(cmd.params.clone()) {
Ok(c) => c,
Err(e) => {
respond_err(out_tx, &cmd.id, &format!("bad config: {e}"));
return;
}
};
let mut eng = engine.lock().await;
let is_reconfigure = eng.transport.is_some();
let socket = if is_reconfigure {
// Reconfigure — socket already bound, just update subsystems.
eng.transport.as_ref().unwrap().socket()
} else {
// First configure — bind SIP transport.
let bind_addr = format!("0.0.0.0:{}", app_config.proxy.lan_port);
let transport = match SipTransport::bind(&bind_addr).await {
Ok(t) => t,
Err(e) => {
respond_err(out_tx, &cmd.id, &format!("SIP bind failed: {e}"));
return;
}
};
let socket = transport.socket();
// Start UDP receiver.
let engine_for_recv = engine.clone();
let socket_for_recv = socket.clone();
transport.spawn_receiver(move |data: &[u8], addr: SocketAddr| {
let engine = engine_for_recv.clone();
let socket = socket_for_recv.clone();
let data = data.to_vec();
tokio::spawn(async move {
handle_sip_packet(engine, &socket, &data, addr).await;
});
});
eng.transport = Some(transport);
// Initialize RTP port pool (only on first configure).
eng.rtp_pool = Some(RtpPortPool::new(
app_config.proxy.rtp_port_range.min,
app_config.proxy.rtp_port_range.max,
));
socket
};
// (Re)configure registrar.
eng.registrar.configure(&app_config.devices);
// (Re)configure provider registrations.
eng.provider_mgr
.configure(
&app_config.providers,
app_config.proxy.public_ip_seed.as_deref(),
&app_config.proxy.lan_ip,
app_config.proxy.lan_port,
socket,
)
.await;
let bind_info = format!("0.0.0.0:{}", app_config.proxy.lan_port);
eng.config = Some(app_config);
respond_ok(
out_tx,
&cmd.id,
serde_json::json!({
"bound": bind_info,
"reconfigure": is_reconfigure,
}),
);
}
/// Handle incoming SIP packets from the UDP socket.
/// This is the core routing pipeline — entirely in Rust.
async fn handle_sip_packet(
engine: Arc<Mutex<ProxyEngine>>,
socket: &UdpSocket,
data: &[u8],
from_addr: SocketAddr,
) {
let msg = match SipMessage::parse(data) {
Some(m) => m,
None => return, // Not a valid SIP message, ignore.
};
let mut eng = engine.lock().await;
// 1. Provider registration responses — consumed internally.
if msg.is_response() {
if eng.provider_mgr.handle_response(&msg, socket).await {
return;
}
}
// 2. Device REGISTER — handled by registrar.
let is_from_provider = eng
.provider_mgr
.find_by_address(&from_addr)
.await
.is_some();
if !is_from_provider && msg.method() == Some("REGISTER") {
if let Some(response_buf) = eng.registrar.handle_register(&msg, from_addr) {
let _ = socket.send_to(&response_buf, from_addr).await;
return;
}
}
// 3. Route to existing call by SIP Call-ID.
if eng.call_mgr.has_call(msg.call_id()) {
let config_ref = eng.config.as_ref().unwrap().clone();
if eng
.call_mgr
.route_sip_message(&msg, from_addr, socket, &config_ref)
.await
{
return;
}
}
let config_ref = eng.config.as_ref().unwrap().clone();
// 4. New inbound INVITE from provider.
if is_from_provider && msg.is_request() && msg.method() == Some("INVITE") {
// Detect public IP from Via.
if let Some(via) = msg.get_header("Via") {
if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
let mut ps = ps_arc.lock().await;
ps.detect_public_ip(via);
}
}
// Send 100 Trying immediately.
let trying = SipMessage::create_response(100, "Trying", &msg, None);
let _ = socket.send_to(&trying.serialize(), from_addr).await;
// Determine provider info.
let (provider_id, provider_config, public_ip) =
if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
let ps = ps_arc.lock().await;
(
ps.config.id.clone(),
ps.config.clone(),
ps.public_ip.clone(),
)
} else {
return;
};
// Create the inbound call — Rust handles everything.
// Split borrows via destructuring to satisfy the borrow checker.
let ProxyEngine {
ref registrar,
ref mut call_mgr,
ref mut rtp_pool,
..
} = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let call_id = call_mgr
.create_inbound_call(
&msg,
from_addr,
&provider_id,
&provider_config,
&config_ref,
registrar,
rtp_pool,
socket,
public_ip.as_deref(),
)
.await;
if let Some(call_id) = call_id {
// Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc).
let from_header = msg.get_header("From").unwrap_or("");
let from_uri = SipMessage::extract_uri(from_header).unwrap_or("Unknown");
let called_number = msg
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or("");
emit_event(
&eng.out_tx,
"incoming_call",
serde_json::json!({
"call_id": call_id,
"from_uri": from_uri,
"to_number": called_number,
"provider_id": provider_id,
}),
);
}
return;
}
// 5. New outbound INVITE from device.
if !is_from_provider && msg.is_request() && msg.method() == Some("INVITE") {
// Resolve outbound route.
let dialed_number = msg
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or(msg.request_uri().unwrap_or(""))
.to_string();
let device = eng.registrar.find_by_address(&from_addr);
let device_id = device.map(|d| d.device_id.clone());
// Find provider via routing rules.
let route_result = config_ref.resolve_outbound_route(
&dialed_number,
device_id.as_deref(),
&|pid: &str| {
// Can't call async here — use a sync check.
// For now, assume all configured providers are available.
true
},
);
if let Some(route) = route_result {
// Look up provider state by config ID (not by device address).
let (public_ip, registered_aor) = if let Some(ps_arc) =
eng.provider_mgr.find_by_provider_id(&route.provider.id).await
{
let ps = ps_arc.lock().await;
(ps.public_ip.clone(), ps.registered_aor.clone())
} else {
(None, format!("sip:{}@{}", route.provider.username, route.provider.domain))
};
let ProxyEngine {
ref mut call_mgr,
ref mut rtp_pool,
..
} = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let call_id = call_mgr
.create_device_outbound_call(
&msg,
from_addr,
&route.provider,
&config_ref,
rtp_pool,
socket,
public_ip.as_deref(),
&registered_aor,
)
.await;
if let Some(call_id) = call_id {
emit_event(
&eng.out_tx,
"outbound_device_call",
serde_json::json!({
"call_id": call_id,
"from_device": device_id,
"to_number": dialed_number,
}),
);
}
}
return;
}
// 6. Other messages — log for debugging.
let label = if msg.is_request() {
msg.method().unwrap_or("?").to_string()
} else {
msg.status_code().map(|c| c.to_string()).unwrap_or_default()
};
emit_event(
&eng.out_tx,
"sip_unhandled",
serde_json::json!({
"method_or_status": label,
"call_id": msg.call_id(),
"from_addr": from_addr.ip().to_string(),
"from_port": from_addr.port(),
"is_from_provider": is_from_provider,
}),
);
}
/// Handle `get_status` — return active call statuses from Rust.
async fn handle_get_status(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let eng = engine.lock().await;
let calls = eng.call_mgr.get_all_statuses();
respond_ok(out_tx, &cmd.id, serde_json::json!({ "calls": calls }));
}
/// Handle `make_call` — initiate an outbound call to a number via a provider.
async fn handle_make_call(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
};
// Resolve provider.
let provider_config = if let Some(pid) = provider_id {
config_ref.providers.iter().find(|p| p.id == pid).cloned()
} else {
// Use route resolution or first provider.
let route = config_ref.resolve_outbound_route(&number, None, &|_| true);
route.map(|r| r.provider)
};
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
};
// Get public IP and registered AOR from provider state.
let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let ps = ps_arc.lock().await;
(ps.public_ip.clone(), ps.registered_aor.clone())
} else {
// Fallback — construct AOR from config.
(None, format!("sip:{}@{}", provider_config.username, provider_config.domain))
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let call_id = call_mgr.make_outbound_call(
&number,
&provider_config,
&config_ref,
rtp_pool,
&socket,
public_ip.as_deref(),
&registered_aor,
).await;
match call_id {
Some(id) => {
emit_event(out_tx, "outbound_call_started", serde_json::json!({
"call_id": id,
"number": number,
"provider_id": provider_config.id,
}));
respond_ok(out_tx, &cmd.id, serde_json::json!({ "call_id": id }));
}
None => {
respond_err(out_tx, &cmd.id, "call origination failed — provider not registered or no ports available");
}
}
}
/// Handle the `hangup` command.
async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(id) => id.to_string(),
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let mut eng = engine.lock().await;
let socket = match &eng.transport {
Some(t) => t.socket(),
None => {
respond_err(out_tx, &cmd.id, "not initialized");
return;
}
};
if eng.call_mgr.hangup(&call_id, &socket).await {
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
} else {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
}
}
/// Handle `webrtc_offer` — browser sends SDP offer, we create PeerConnection and return answer.
/// Uses only the WebRTC lock — no contention with SIP handlers.
async fn handle_webrtc_offer(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
};
let offer_sdp = match cmd.params.get("sdp").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; }
};
let mut wrtc = webrtc.lock().await;
match wrtc.handle_offer(&session_id, &offer_sdp).await {
Ok(answer_sdp) => {
respond_ok(out_tx, &cmd.id, serde_json::json!({
"session_id": session_id,
"sdp": answer_sdp,
}));
}
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}
/// Handle `webrtc_ice` — forward ICE candidate from browser to Rust PeerConnection.
/// Uses only the WebRTC lock.
async fn handle_webrtc_ice(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
};
let candidate = cmd.params.get("candidate").and_then(|v| v.as_str()).unwrap_or("");
let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str());
let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16);
let wrtc = webrtc.lock().await;
match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}
/// Handle `webrtc_link` — link a WebRTC session to a call's mixer for audio bridging.
/// Creates channels, adds WebRTC leg to the call, wires the WebRTC engine.
/// Locks are never held simultaneously — no deadlock possible.
async fn handle_webrtc_link(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
};
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
// Create channels for the WebRTC leg.
let channels = crate::leg_io::create_leg_channels();
// Briefly lock engine to add the WebRTC leg to the call's mixer.
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
// Add to mixer via channel.
call.add_leg_to_mixer(
&session_id,
codec_lib::PT_OPUS,
channels.inbound_rx,
channels.outbound_tx,
)
.await;
} // engine lock released
// Lock webrtc to wire the channels.
let mut wrtc = webrtc.lock().await;
if wrtc
.link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx)
.await
{
// Also store the WebRTC leg info in the call.
drop(wrtc); // Release webrtc lock before re-acquiring engine.
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
call.legs.insert(
session_id.clone(),
crate::call::LegInfo {
id: session_id.clone(),
kind: crate::call::LegKind::WebRtc,
state: crate::call::LegState::Connected,
codec_pt: codec_lib::PT_OPUS,
sip_leg: None,
sip_call_id: None,
webrtc_session_id: Some(session_id.clone()),
rtp_socket: None,
rtp_port: 0,
remote_media: None,
signaling_addr: None,
metadata: std::collections::HashMap::new(),
},
);
}
}
emit_event(out_tx, "leg_added", serde_json::json!({
"call_id": call_id,
"leg_id": session_id,
"kind": "webrtc",
"state": "connected",
"codec": "Opus",
"rtpPort": 0,
"remoteMedia": null,
"metadata": {},
}));
respond_ok(out_tx, &cmd.id, serde_json::json!({
"session_id": session_id,
"call_id": call_id,
"bridged": true,
}));
} else {
respond_err(out_tx, &cmd.id, &format!("session {session_id} not found"));
}
}
/// Handle `add_leg` — add a new SIP leg to an existing call.
async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
};
// Resolve provider.
let provider_config = if let Some(pid) = provider_id {
config_ref.providers.iter().find(|p| p.id == pid).cloned()
} else {
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
};
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
};
// Get registered AOR.
let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let ps = ps_arc.lock().await;
ps.registered_aor.clone()
} else {
format!("sip:{}@{}", provider_config.username, provider_config.domain)
};
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let ps = ps_arc.lock().await;
ps.public_ip.clone()
} else {
None
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let leg_id = call_mgr.add_external_leg(
&call_id, &number, &provider_config, &config_ref,
rtp_pool, &socket, public_ip.as_deref(), &registered_aor,
).await;
match leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
None => respond_err(out_tx, &cmd.id, "failed to add leg"),
}
}
/// Handle `add_device_leg` — add a local SIP device to an existing call.
async fn handle_add_device_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let device_id = match cmd.params.get("device_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing device_id"); return; }
};
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
let ProxyEngine { ref registrar, ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let leg_id = call_mgr.add_device_leg(
&call_id, &device_id, registrar, &config_ref, rtp_pool, &socket,
).await;
match leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
None => respond_err(out_tx, &cmd.id, "failed to add device leg — device not registered or call not found"),
}
}
/// Handle `transfer_leg` — move a leg from one call to another.
async fn handle_transfer_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let source_call_id = match cmd.params.get("source_call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing source_call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let target_call_id = match cmd.params.get("target_call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing target_call_id"); return; }
};
let mut eng = engine.lock().await;
if eng.call_mgr.transfer_leg(&source_call_id, &leg_id, &target_call_id).await {
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
} else {
respond_err(out_tx, &cmd.id, "transfer failed — call or leg not found");
}
}
/// Handle `replace_leg` — terminate a leg and dial a replacement into the same call.
async fn handle_replace_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let old_leg_id = match cmd.params.get("old_leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing old_leg_id"); return; }
};
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
// Resolve provider.
let provider_config = if let Some(pid) = provider_id {
config_ref.providers.iter().find(|p| p.id == pid).cloned()
} else {
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
};
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
};
let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_provider_id(&provider_config.id).await {
let ps = ps_arc.lock().await;
(ps.public_ip.clone(), ps.registered_aor.clone())
} else {
(None, format!("sip:{}@{}", provider_config.username, provider_config.domain))
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let new_leg_id = call_mgr.replace_leg(
&call_id, &old_leg_id, &number, &provider_config, &config_ref,
rtp_pool, &socket, public_ip.as_deref(), &registered_aor,
).await;
match new_leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "new_leg_id": lid })),
None => respond_err(out_tx, &cmd.id, "replace failed — call ended or dial failed"),
}
}
/// Handle `remove_leg` — remove a leg from a call.
async fn handle_remove_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let mut eng = engine.lock().await;
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
};
if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await {
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
} else {
respond_err(out_tx, &cmd.id, &format!("call/leg not found"));
}
}
/// Handle `webrtc_close` — close a WebRTC session.
/// Uses only the WebRTC lock.
async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
};
let mut wrtc = webrtc.lock().await;
match wrtc.close_session(&session_id).await {
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}
// ---------------------------------------------------------------------------
// Leg interaction & tool leg commands
// ---------------------------------------------------------------------------
/// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF.
/// This command blocks until the interaction completes (digit, timeout, or cancel).
async fn handle_start_interaction(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let prompt_wav = match cmd.params.get("prompt_wav").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing prompt_wav"); return; }
};
let expected_digits: Vec<char> = cmd
.params
.get("expected_digits")
.and_then(|v| v.as_str())
.unwrap_or("12")
.chars()
.collect();
let timeout_ms = cmd
.params
.get("timeout_ms")
.and_then(|v| v.as_u64())
.unwrap_or(15000) as u32;
// Load prompt audio from WAV file.
let prompt_frames = match crate::audio_player::load_prompt_pcm_frames(&prompt_wav) {
Ok(f) => f,
Err(e) => {
respond_err(out_tx, &cmd.id, &format!("prompt load failed: {e}"));
return;
}
};
// Create oneshot channel for the result.
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
// Send StartInteraction to the mixer.
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: leg_id.clone(),
prompt_pcm_frames: prompt_frames,
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
})
.await;
} // engine lock released — we block on the oneshot, not the lock.
// Await the interaction result (blocks this task until complete).
let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
let result = match tokio::time::timeout(safety_timeout, result_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled, // oneshot dropped
Err(_) => crate::mixer::InteractionResult::Timeout, // safety timeout
};
// Store consent result in leg metadata.
let (result_str, digit_str) = match &result {
crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())),
crate::mixer::InteractionResult::Timeout => ("timeout", None),
crate::mixer::InteractionResult::Cancelled => ("cancelled", None),
};
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
if let Some(leg) = call.legs.get_mut(&leg_id) {
leg.metadata.insert(
"last_interaction_result".to_string(),
serde_json::json!(result_str),
);
if let Some(ref d) = digit_str {
leg.metadata.insert(
"last_interaction_digit".to_string(),
serde_json::json!(d),
);
}
}
}
}
let mut resp = serde_json::json!({ "result": result_str });
if let Some(d) = digit_str {
resp["digit"] = serde_json::json!(d);
}
respond_ok(out_tx, &cmd.id, resp);
}
/// Handle `add_tool_leg` — add a recording or transcription tool leg to a call.
async fn handle_add_tool_leg(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let tool_type_str = match cmd.params.get("tool_type").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing tool_type"); return; }
};
let tool_type = match tool_type_str.as_str() {
"recording" => crate::mixer::ToolType::Recording,
"transcription" => crate::mixer::ToolType::Transcription,
other => {
respond_err(out_tx, &cmd.id, &format!("unknown tool_type: {other}"));
return;
}
};
let tool_leg_id = format!("{call_id}-tool-{}", rand::random::<u32>());
// Spawn the appropriate background task.
let (audio_tx, _task_handle) = match tool_type {
crate::mixer::ToolType::Recording => {
let base_dir = cmd
.params
.get("config")
.and_then(|c| c.get("base_dir"))
.and_then(|v| v.as_str())
.unwrap_or(".nogit/recordings")
.to_string();
crate::tool_leg::spawn_recording_tool(
tool_leg_id.clone(),
call_id.clone(),
base_dir,
out_tx.clone(),
)
}
crate::mixer::ToolType::Transcription => {
crate::tool_leg::spawn_transcription_tool(
tool_leg_id.clone(),
call_id.clone(),
out_tx.clone(),
)
}
};
// Send AddToolLeg to the mixer and register in call.
{
let mut eng = engine.lock().await;
let call = match eng.call_mgr.calls.get_mut(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::AddToolLeg {
leg_id: tool_leg_id.clone(),
tool_type,
audio_tx,
})
.await;
// Register tool leg in the call's leg map.
let mut metadata = std::collections::HashMap::new();
metadata.insert(
"tool_type".to_string(),
serde_json::json!(tool_type_str),
);
call.legs.insert(
tool_leg_id.clone(),
crate::call::LegInfo {
id: tool_leg_id.clone(),
kind: crate::call::LegKind::Tool,
state: crate::call::LegState::Connected,
codec_pt: 0,
sip_leg: None,
sip_call_id: None,
webrtc_session_id: None,
rtp_socket: None,
rtp_port: 0,
remote_media: None,
signaling_addr: None,
metadata,
},
);
}
emit_event(
out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": tool_leg_id,
"kind": "tool",
"state": "connected",
"codec": null,
"rtpPort": 0,
"remoteMedia": null,
"metadata": { "tool_type": tool_type_str },
}),
);
respond_ok(
out_tx,
&cmd.id,
serde_json::json!({ "tool_leg_id": tool_leg_id }),
);
}
/// Handle `remove_tool_leg` — remove a tool leg from a call.
async fn handle_remove_tool_leg(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let tool_leg_id = match cmd.params.get("tool_leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing tool_leg_id"); return; }
};
let mut eng = engine.lock().await;
let call = match eng.call_mgr.calls.get_mut(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
// Remove from mixer (drops audio_tx → background task finalizes).
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::RemoveToolLeg {
leg_id: tool_leg_id.clone(),
})
.await;
// Remove from call's leg map.
call.legs.remove(&tool_leg_id);
emit_event(
out_tx,
"leg_removed",
serde_json::json!({
"call_id": call_id,
"leg_id": tool_leg_id,
}),
);
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
}
/// Handle `set_leg_metadata` — set a metadata key on a leg.
async fn handle_set_leg_metadata(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let key = match cmd.params.get("key").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing key"); return; }
};
let value = match cmd.params.get("value") {
Some(v) => v.clone(),
None => { respond_err(out_tx, &cmd.id, "missing value"); return; }
};
let mut eng = engine.lock().await;
let call = match eng.call_mgr.calls.get_mut(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let leg = match call.legs.get_mut(&leg_id) {
Some(l) => l,
None => {
respond_err(out_tx, &cmd.id, &format!("leg {leg_id} not found"));
return;
}
};
leg.metadata.insert(key, value);
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
}
/// Handle `generate_tts` — synthesize text to a WAV file using Kokoro TTS.
async fn handle_generate_tts(
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let mut tts = tts_engine.lock().await;
match tts.generate(&cmd.params).await {
Ok(result) => respond_ok(out_tx, &cmd.id, result),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}