Files
siprouter/rust/crates/proxy-engine/src/call.rs
T

297 lines
8.9 KiB
Rust
Raw Normal View History

//! Call hub — owns N legs and a mixer task.
//!
//! Every call has a central mixer that provides mix-minus audio to all
//! participants. Legs can be added and removed dynamically mid-call.
use crate::mixer::{MixerCommand, RtpPacket};
use crate::sip_leg::SipLeg;
use sip_proto::message::SipMessage;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::net::UdpSocket;
use tokio::sync::{mpsc, watch};
use tokio::task::JoinHandle;
pub type LegId = String;
/// Call state machine.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CallState {
SettingUp,
Ringing,
Connected,
Voicemail,
Ivr,
Terminated,
}
impl CallState {
/// Wire-format string for events/dashboards. Not currently emitted —
/// call state changes flow as typed events (`call_answered`, etc.) —
/// but kept for future status-snapshot work.
#[allow(dead_code)]
pub fn as_str(&self) -> &'static str {
match self {
Self::SettingUp => "setting-up",
Self::Ringing => "ringing",
Self::Connected => "connected",
Self::Voicemail => "voicemail",
Self::Ivr => "ivr",
Self::Terminated => "terminated",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CallDirection {
Inbound,
Outbound,
}
impl CallDirection {
/// Wire-format string. See CallState::as_str.
#[allow(dead_code)]
pub fn as_str(&self) -> &'static str {
match self {
Self::Inbound => "inbound",
Self::Outbound => "outbound",
}
}
}
/// The type of a call leg.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegKind {
SipProvider,
SipDevice,
WebRtc,
/// Voicemail playback, IVR prompt playback, recording — not yet wired up
/// as a distinct leg kind (those paths currently use the mixer's role
/// system instead). Kept behind allow so adding a real media leg later
/// doesn't require re-introducing the variant.
#[allow(dead_code)]
Media,
Tool, // observer leg for recording, transcription, etc.
}
impl LegKind {
pub fn as_str(&self) -> &'static str {
match self {
Self::SipProvider => "sip-provider",
Self::SipDevice => "sip-device",
Self::WebRtc => "webrtc",
Self::Media => "media",
Self::Tool => "tool",
}
}
}
/// Per-leg state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegState {
Inviting,
Ringing,
Connected,
Terminated,
}
impl LegState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Inviting => "inviting",
Self::Ringing => "ringing",
Self::Connected => "connected",
Self::Terminated => "terminated",
}
}
}
/// Information about a single leg in a call.
pub struct LegInfo {
pub id: LegId,
pub kind: LegKind,
pub state: LegState,
pub codec_pt: u8,
/// Media transport currently negotiated for this leg.
///
/// `rtp` covers classic SIP audio media, `t38-udptl` covers T.38 fax,
/// `webrtc` is used for browser legs, and `internal` for proxy-local media/tool paths.
pub media_protocol: &'static str,
/// Whether this leg is currently wired into an active media bridge.
pub media_io_active: bool,
/// For SIP legs: the SIP dialog manager (handles 407 auth, BYE, etc).
pub sip_leg: Option<SipLeg>,
/// For SIP legs: the SIP Call-ID for message routing.
pub sip_call_id: Option<String>,
/// For WebRTC legs: the session ID in WebRtcEngine.
///
/// Populated at leg creation but not yet consumed by the hub —
/// WebRTC session lookup currently goes through the session registry
/// directly. Kept for introspection/debugging.
#[allow(dead_code)]
pub webrtc_session_id: Option<String>,
/// The RTP socket allocated for this leg.
pub rtp_socket: Option<Arc<UdpSocket>>,
/// The RTP port number.
pub rtp_port: u16,
/// Public IP to advertise in SDP/Record-Route when THIS leg is the
/// destination of a rewrite. Populated only for provider legs; `None`
/// for LAN SIP devices, WebRTC browsers, media, and tool legs (which
/// are reachable via `lan_ip`). See `route_passthrough_message` for
/// the per-destination advertise-IP logic.
pub public_ip: Option<String>,
/// The remote media endpoint (learned from SDP or address learning).
pub remote_media: Option<SocketAddr>,
/// SIP signaling address (provider or device).
pub signaling_addr: Option<SocketAddr>,
/// Flexible key-value metadata (consent state, tool config, etc.).
/// Persisted into call history on call end.
pub metadata: HashMap<String, serde_json::Value>,
}
#[derive(Clone)]
pub struct PendingDialogBridge {
pub source_leg_id: LegId,
pub target_leg_id: LegId,
pub source_request: SipMessage,
pub target_request: SipMessage,
pub method: String,
}
/// A multiparty call with N legs and a central mixer.
pub struct Call {
// Duplicated from the HashMap key in CallManager. Kept for future
// status-snapshot work.
#[allow(dead_code)]
pub id: String,
pub state: CallState,
// Populated at call creation but not currently consumed — dashboard
// pull snapshots are gone (push events only).
#[allow(dead_code)]
pub direction: CallDirection,
pub created_at: Instant,
// Metadata.
pub caller_number: Option<String>,
pub callee_number: Option<String>,
#[allow(dead_code)]
pub provider_id: String,
/// Original INVITE from the device (for device-originated outbound calls).
/// Used to construct proper 180/200/error responses back to the device.
pub device_invite: Option<SipMessage>,
/// Pending in-dialog B2BUA transaction bridged across two different SIP dialogs.
pub pending_dialog_bridge: Option<PendingDialogBridge>,
/// All legs in this call, keyed by leg ID.
pub legs: HashMap<LegId, LegInfo>,
/// Channel to send commands to the mixer task.
pub mixer_cmd_tx: mpsc::Sender<MixerCommand>,
/// Active passthrough media bridge mode, if any.
pub media_bridge_mode: Option<String>,
/// Cancellation handles for non-mixer passthrough media tasks.
media_bridge_cancel_txs: Vec<watch::Sender<bool>>,
/// Handle to the mixer task (aborted on call teardown).
mixer_task: Option<JoinHandle<()>>,
}
impl Call {
pub fn new(
id: String,
direction: CallDirection,
provider_id: String,
mixer_cmd_tx: mpsc::Sender<MixerCommand>,
mixer_task: JoinHandle<()>,
) -> Self {
Self {
id,
state: CallState::SettingUp,
direction,
created_at: Instant::now(),
caller_number: None,
callee_number: None,
provider_id,
device_invite: None,
pending_dialog_bridge: None,
legs: HashMap::new(),
mixer_cmd_tx,
media_bridge_mode: None,
media_bridge_cancel_txs: Vec::new(),
mixer_task: Some(mixer_task),
}
}
/// Add a leg to the mixer. Sends the AddLeg command with channel endpoints.
pub async fn add_leg_to_mixer(
&self,
leg_id: &str,
codec_pt: u8,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
) {
let _ = self
.mixer_cmd_tx
.send(MixerCommand::AddLeg {
leg_id: leg_id.to_string(),
codec_pt,
inbound_rx,
outbound_tx,
})
.await;
}
/// Remove a leg from the mixer.
pub async fn remove_leg_from_mixer(&self, leg_id: &str) {
let _ = self
.mixer_cmd_tx
.send(MixerCommand::RemoveLeg {
leg_id: leg_id.to_string(),
})
.await;
}
pub fn duration_secs(&self) -> u64 {
self.created_at.elapsed().as_secs()
}
pub fn clear_media_bridge(&mut self) {
for cancel_tx in self.media_bridge_cancel_txs.drain(..) {
let _ = cancel_tx.send(true);
}
self.media_bridge_mode = None;
}
pub fn install_media_bridge(
&mut self,
mode: &str,
cancel_txs: Vec<watch::Sender<bool>>,
) {
self.clear_media_bridge();
self.media_bridge_mode = Some(mode.to_string());
self.media_bridge_cancel_txs = cancel_txs;
}
pub fn note_mixer_bridge(&mut self, mode: &str) {
self.clear_media_bridge();
self.media_bridge_mode = Some(mode.to_string());
}
/// Shut down the mixer and abort its task.
pub async fn shutdown_mixer(&mut self) {
self.clear_media_bridge();
let _ = self.mixer_cmd_tx.send(MixerCommand::Shutdown).await;
if let Some(handle) = self.mixer_task.take() {
handle.abort();
}
}
}