- Updated mixer to handle participant and isolated leg roles, allowing for IVR and consent interactions. - Introduced commands for starting and canceling interactions, managing tool legs for recording and transcription. - Implemented per-source audio handling for tool legs, enabling separate audio processing. - Enhanced DTMF handling to forward events between participant legs only. - Added support for PCM recording directly from tool legs, with WAV file generation. - Updated TypeScript definitions and functions to support new interaction and tool leg features.
253 lines
7.1 KiB
Rust
253 lines
7.1 KiB
Rust
//! Call hub — owns N legs and a mixer task.
|
|
//!
|
|
//! Every call has a central mixer that provides mix-minus audio to all
|
|
//! participants. Legs can be added and removed dynamically mid-call.
|
|
|
|
use crate::mixer::{MixerCommand, RtpPacket};
|
|
use crate::sip_leg::SipLeg;
|
|
use sip_proto::message::SipMessage;
|
|
use std::collections::HashMap;
|
|
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
use std::time::Instant;
|
|
use tokio::net::UdpSocket;
|
|
use tokio::sync::mpsc;
|
|
use tokio::task::JoinHandle;
|
|
|
|
pub type LegId = String;
|
|
|
|
/// Call state machine.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum CallState {
|
|
SettingUp,
|
|
Ringing,
|
|
Connected,
|
|
Voicemail,
|
|
Terminated,
|
|
}
|
|
|
|
impl CallState {
|
|
pub fn as_str(&self) -> &'static str {
|
|
match self {
|
|
Self::SettingUp => "setting-up",
|
|
Self::Ringing => "ringing",
|
|
Self::Connected => "connected",
|
|
Self::Voicemail => "voicemail",
|
|
Self::Terminated => "terminated",
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum CallDirection {
|
|
Inbound,
|
|
Outbound,
|
|
}
|
|
|
|
impl CallDirection {
|
|
pub fn as_str(&self) -> &'static str {
|
|
match self {
|
|
Self::Inbound => "inbound",
|
|
Self::Outbound => "outbound",
|
|
}
|
|
}
|
|
}
|
|
|
|
/// The type of a call leg.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum LegKind {
|
|
SipProvider,
|
|
SipDevice,
|
|
WebRtc,
|
|
Media, // voicemail playback, IVR, recording
|
|
Tool, // observer leg for recording, transcription, etc.
|
|
}
|
|
|
|
impl LegKind {
|
|
pub fn as_str(&self) -> &'static str {
|
|
match self {
|
|
Self::SipProvider => "sip-provider",
|
|
Self::SipDevice => "sip-device",
|
|
Self::WebRtc => "webrtc",
|
|
Self::Media => "media",
|
|
Self::Tool => "tool",
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Per-leg state.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum LegState {
|
|
Inviting,
|
|
Ringing,
|
|
Connected,
|
|
Terminated,
|
|
}
|
|
|
|
impl LegState {
|
|
pub fn as_str(&self) -> &'static str {
|
|
match self {
|
|
Self::Inviting => "inviting",
|
|
Self::Ringing => "ringing",
|
|
Self::Connected => "connected",
|
|
Self::Terminated => "terminated",
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Information about a single leg in a call.
|
|
pub struct LegInfo {
|
|
pub id: LegId,
|
|
pub kind: LegKind,
|
|
pub state: LegState,
|
|
pub codec_pt: u8,
|
|
|
|
/// For SIP legs: the SIP dialog manager (handles 407 auth, BYE, etc).
|
|
pub sip_leg: Option<SipLeg>,
|
|
/// For SIP legs: the SIP Call-ID for message routing.
|
|
pub sip_call_id: Option<String>,
|
|
/// For WebRTC legs: the session ID in WebRtcEngine.
|
|
pub webrtc_session_id: Option<String>,
|
|
/// The RTP socket allocated for this leg.
|
|
pub rtp_socket: Option<Arc<UdpSocket>>,
|
|
/// The RTP port number.
|
|
pub rtp_port: u16,
|
|
/// The remote media endpoint (learned from SDP or address learning).
|
|
pub remote_media: Option<SocketAddr>,
|
|
/// SIP signaling address (provider or device).
|
|
pub signaling_addr: Option<SocketAddr>,
|
|
|
|
/// Flexible key-value metadata (consent state, tool config, etc.).
|
|
/// Persisted into call history on call end.
|
|
pub metadata: HashMap<String, serde_json::Value>,
|
|
}
|
|
|
|
/// A multiparty call with N legs and a central mixer.
|
|
pub struct Call {
|
|
pub id: String,
|
|
pub state: CallState,
|
|
pub direction: CallDirection,
|
|
pub created_at: Instant,
|
|
|
|
// Metadata.
|
|
pub caller_number: Option<String>,
|
|
pub callee_number: Option<String>,
|
|
pub provider_id: String,
|
|
|
|
/// Original INVITE from the device (for device-originated outbound calls).
|
|
/// Used to construct proper 180/200/error responses back to the device.
|
|
pub device_invite: Option<SipMessage>,
|
|
|
|
/// All legs in this call, keyed by leg ID.
|
|
pub legs: HashMap<LegId, LegInfo>,
|
|
|
|
/// Channel to send commands to the mixer task.
|
|
pub mixer_cmd_tx: mpsc::Sender<MixerCommand>,
|
|
|
|
/// Handle to the mixer task (aborted on call teardown).
|
|
mixer_task: Option<JoinHandle<()>>,
|
|
}
|
|
|
|
impl Call {
|
|
pub fn new(
|
|
id: String,
|
|
direction: CallDirection,
|
|
provider_id: String,
|
|
mixer_cmd_tx: mpsc::Sender<MixerCommand>,
|
|
mixer_task: JoinHandle<()>,
|
|
) -> Self {
|
|
Self {
|
|
id,
|
|
state: CallState::SettingUp,
|
|
direction,
|
|
created_at: Instant::now(),
|
|
caller_number: None,
|
|
callee_number: None,
|
|
provider_id,
|
|
device_invite: None,
|
|
legs: HashMap::new(),
|
|
mixer_cmd_tx,
|
|
mixer_task: Some(mixer_task),
|
|
}
|
|
}
|
|
|
|
/// Add a leg to the mixer. Sends the AddLeg command with channel endpoints.
|
|
pub async fn add_leg_to_mixer(
|
|
&self,
|
|
leg_id: &str,
|
|
codec_pt: u8,
|
|
inbound_rx: mpsc::Receiver<RtpPacket>,
|
|
outbound_tx: mpsc::Sender<Vec<u8>>,
|
|
) {
|
|
let _ = self
|
|
.mixer_cmd_tx
|
|
.send(MixerCommand::AddLeg {
|
|
leg_id: leg_id.to_string(),
|
|
codec_pt,
|
|
inbound_rx,
|
|
outbound_tx,
|
|
})
|
|
.await;
|
|
}
|
|
|
|
/// Remove a leg from the mixer.
|
|
pub async fn remove_leg_from_mixer(&self, leg_id: &str) {
|
|
let _ = self
|
|
.mixer_cmd_tx
|
|
.send(MixerCommand::RemoveLeg {
|
|
leg_id: leg_id.to_string(),
|
|
})
|
|
.await;
|
|
}
|
|
|
|
pub fn duration_secs(&self) -> u64 {
|
|
self.created_at.elapsed().as_secs()
|
|
}
|
|
|
|
/// Shut down the mixer and abort its task.
|
|
pub async fn shutdown_mixer(&mut self) {
|
|
let _ = self.mixer_cmd_tx.send(MixerCommand::Shutdown).await;
|
|
if let Some(handle) = self.mixer_task.take() {
|
|
handle.abort();
|
|
}
|
|
}
|
|
|
|
/// Produce a JSON status snapshot for the dashboard.
|
|
pub fn to_status_json(&self) -> serde_json::Value {
|
|
let legs: Vec<serde_json::Value> = self
|
|
.legs
|
|
.values()
|
|
.filter(|l| l.state != LegState::Terminated)
|
|
.map(|l| {
|
|
let metadata: serde_json::Value = if l.metadata.is_empty() {
|
|
serde_json::json!({})
|
|
} else {
|
|
serde_json::Value::Object(
|
|
l.metadata.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
|
|
)
|
|
};
|
|
serde_json::json!({
|
|
"id": l.id,
|
|
"type": l.kind.as_str(),
|
|
"state": l.state.as_str(),
|
|
"codec": sip_proto::helpers::codec_name(l.codec_pt),
|
|
"rtpPort": l.rtp_port,
|
|
"remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
|
|
"metadata": metadata,
|
|
})
|
|
})
|
|
.collect();
|
|
|
|
serde_json::json!({
|
|
"id": self.id,
|
|
"state": self.state.as_str(),
|
|
"direction": self.direction.as_str(),
|
|
"callerNumber": self.caller_number,
|
|
"calleeNumber": self.callee_number,
|
|
"providerUsed": self.provider_id,
|
|
"duration": self.duration_secs(),
|
|
"legs": legs,
|
|
})
|
|
}
|
|
}
|