909 lines
34 KiB
Rust
909 lines
34 KiB
Rust
//! Audio mixer — mix-minus engine for multiparty calls.
|
||
//!
|
||
//! Each Call spawns one mixer task. Legs communicate with the mixer via
|
||
//! tokio mpsc channels — no shared mutable state, no lock contention.
|
||
//!
|
||
//! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame).
|
||
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
|
||
//!
|
||
//! The mixer runs a 20ms tick loop:
|
||
//! 1. Drain inbound channels, reorder RTP, decode variable-duration packets to 48kHz,
|
||
//! and queue them in per-leg PCM buffers
|
||
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
|
||
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
|
||
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
|
||
//! 5. For each tool leg: send per-source unmerged audio batch
|
||
//! 6. Forward DTMF between participant legs only
|
||
|
||
use crate::ipc::{emit_event, OutTx};
|
||
use crate::jitter_buffer::{JitterBuffer, JitterResult};
|
||
use crate::rtp::{build_rtp_header, rtp_clock_increment, rtp_clock_rate};
|
||
use crate::tts::TtsStreamMessage;
|
||
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
|
||
use nnnoiseless::DenoiseState;
|
||
use std::collections::{HashMap, VecDeque};
|
||
use tokio::sync::{mpsc, oneshot, watch};
|
||
use tokio::task::JoinHandle;
|
||
use tokio::time::{self, Duration, MissedTickBehavior};
|
||
|
||
/// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample.
|
||
/// All processing (denoising, mixing) happens at this rate in f32 for maximum quality.
|
||
const MIX_RATE: u32 = 48000;
|
||
/// Samples per 20ms frame at the mixing rate.
|
||
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
|
||
/// Safety cap for how much timestamp-derived gap fill we synthesize at once.
|
||
const MAX_GAP_FILL_SAMPLES: usize = MIX_FRAME_SIZE * 6; // 120ms
|
||
/// Bound how many decode / concealment steps a leg can consume in one tick.
|
||
const MAX_PACKET_STEPS_PER_TICK: usize = 24;
|
||
/// Report the first output drop immediately, then every N drops.
|
||
const DROP_REPORT_INTERVAL: u64 = 50;
|
||
|
||
/// A raw RTP payload received from a leg (no RTP header).
|
||
pub struct RtpPacket {
|
||
pub payload: Vec<u8>,
|
||
pub payload_type: u8,
|
||
/// RTP marker bit (first packet of a DTMF event, etc.).
|
||
pub marker: bool,
|
||
/// RTP sequence number for reordering.
|
||
pub seq: u16,
|
||
/// RTP timestamp from the original packet header.
|
||
pub timestamp: u32,
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Leg roles
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// What role a leg currently plays in the mixer.
|
||
enum LegRole {
|
||
/// Normal participant: contributes to mix, receives mix-minus.
|
||
Participant,
|
||
/// Temporarily isolated for IVR/consent interaction.
|
||
Isolated(IsolationState),
|
||
}
|
||
|
||
struct IsolationState {
|
||
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
|
||
prompt_frames: VecDeque<Vec<f32>>,
|
||
/// Live TTS frames arrive here while playback is already in progress.
|
||
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
|
||
/// Cancels the background TTS producer when the interaction ends early.
|
||
prompt_cancel_tx: Option<watch::Sender<bool>>,
|
||
/// Whether the live prompt stream has ended.
|
||
prompt_stream_finished: bool,
|
||
/// Digits that complete the interaction (e.g., ['1', '2']).
|
||
expected_digits: Vec<char>,
|
||
/// Ticks remaining before timeout (decremented each tick after prompt ends).
|
||
timeout_ticks_remaining: u32,
|
||
/// Whether we've finished playing the prompt.
|
||
prompt_done: bool,
|
||
/// Channel to send the result back to the command handler.
|
||
result_tx: Option<oneshot::Sender<InteractionResult>>,
|
||
}
|
||
|
||
/// Result of a leg interaction (consent prompt, IVR, etc.).
|
||
pub enum InteractionResult {
|
||
/// The participant pressed one of the expected digits.
|
||
Digit(char),
|
||
/// No digit was received within the timeout.
|
||
Timeout,
|
||
/// The leg was removed or the call tore down before completion.
|
||
Cancelled,
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Tool legs
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// Type of tool leg.
|
||
#[derive(Debug, Clone, Copy)]
|
||
pub enum ToolType {
|
||
Recording,
|
||
Transcription,
|
||
}
|
||
|
||
/// Per-source audio delivered to a tool leg each mixer tick.
|
||
pub struct ToolAudioBatch {
|
||
pub sources: Vec<ToolAudioSource>,
|
||
}
|
||
|
||
/// One participant's 20ms audio frame.
|
||
pub struct ToolAudioSource {
|
||
pub leg_id: String,
|
||
/// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples.
|
||
pub pcm_48k: Vec<f32>,
|
||
}
|
||
|
||
/// Internal storage for a tool leg inside the mixer.
|
||
struct ToolLegSlot {
|
||
#[allow(dead_code)]
|
||
tool_type: ToolType,
|
||
audio_tx: mpsc::Sender<ToolAudioBatch>,
|
||
dropped_batches: u64,
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Commands
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// Commands sent to the mixer task via a control channel.
|
||
pub enum MixerCommand {
|
||
/// Add a new participant leg to the mix.
|
||
AddLeg {
|
||
leg_id: String,
|
||
codec_pt: u8,
|
||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||
},
|
||
/// Remove a leg from the mix (channels are dropped, I/O tasks exit).
|
||
RemoveLeg { leg_id: String },
|
||
/// Shut down the mixer.
|
||
Shutdown,
|
||
|
||
/// Isolate a leg and start an interaction (consent prompt, IVR).
|
||
/// The leg is removed from the mix and hears the prompt instead.
|
||
/// DTMF from the leg is checked against expected_digits.
|
||
StartInteraction {
|
||
leg_id: String,
|
||
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
|
||
prompt_pcm_frames: Vec<Vec<f32>>,
|
||
/// Optional live prompt stream. Frames are appended as they are synthesized.
|
||
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
|
||
/// Optional cancellation handle for the live prompt stream.
|
||
prompt_cancel_tx: Option<watch::Sender<bool>>,
|
||
expected_digits: Vec<char>,
|
||
timeout_ms: u32,
|
||
result_tx: oneshot::Sender<InteractionResult>,
|
||
},
|
||
|
||
/// Add a tool leg that receives per-source unmerged audio.
|
||
AddToolLeg {
|
||
leg_id: String,
|
||
tool_type: ToolType,
|
||
audio_tx: mpsc::Sender<ToolAudioBatch>,
|
||
},
|
||
/// Remove a tool leg (drops the channel, background task finalizes).
|
||
RemoveToolLeg { leg_id: String },
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Mixer internals
|
||
// ---------------------------------------------------------------------------
|
||
|
||
/// Internal per-leg state inside the mixer.
|
||
struct MixerLegSlot {
|
||
codec_pt: u8,
|
||
transcoder: TranscodeState,
|
||
/// Per-leg inbound denoiser (48kHz, 480-sample frames).
|
||
denoiser: Box<DenoiseState<'static>>,
|
||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||
/// Decoded PCM waiting for playout. Variable-duration RTP packets are
|
||
/// decoded into this FIFO; the mixer consumes exactly one 20ms frame per tick.
|
||
pcm_buffer: VecDeque<f32>,
|
||
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
|
||
last_pcm_frame: Vec<f32>,
|
||
/// Next RTP timestamp expected from the inbound stream.
|
||
expected_rtp_timestamp: Option<u32>,
|
||
/// Best-effort estimate of packet duration in RTP clock units.
|
||
estimated_packet_ts: u32,
|
||
/// Number of consecutive ticks with no inbound packet.
|
||
silent_ticks: u32,
|
||
/// Per-leg jitter buffer for packet reordering and timing.
|
||
jitter: JitterBuffer,
|
||
// RTP output state.
|
||
rtp_seq: u16,
|
||
rtp_ts: u32,
|
||
rtp_ssrc: u32,
|
||
/// Dropped outbound frames for this leg (queue full / closed).
|
||
outbound_drops: u64,
|
||
/// Current role of this leg in the mixer.
|
||
role: LegRole,
|
||
}
|
||
|
||
fn mix_samples_to_rtp_ts(codec_pt: u8, mix_samples: usize) -> u32 {
|
||
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
|
||
(((mix_samples as u64 * clock_rate) + (MIX_RATE as u64 / 2)) / MIX_RATE as u64) as u32
|
||
}
|
||
|
||
fn rtp_ts_to_mix_samples(codec_pt: u8, rtp_ts: u32) -> usize {
|
||
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
|
||
(((rtp_ts as u64 * MIX_RATE as u64) + (clock_rate / 2)) / clock_rate) as usize
|
||
}
|
||
|
||
fn is_forward_rtp_delta(delta: u32) -> bool {
|
||
delta > 0 && delta < 0x8000_0000
|
||
}
|
||
|
||
fn should_emit_drop_event(total_drops: u64) -> bool {
|
||
total_drops == 1 || total_drops % DROP_REPORT_INTERVAL == 0
|
||
}
|
||
|
||
fn emit_output_drop_event(
|
||
out_tx: &OutTx,
|
||
call_id: &str,
|
||
leg_id: Option<&str>,
|
||
tool_leg_id: Option<&str>,
|
||
stream: &str,
|
||
reason: &str,
|
||
total_drops: u64,
|
||
) {
|
||
if !should_emit_drop_event(total_drops) {
|
||
return;
|
||
}
|
||
|
||
emit_event(
|
||
out_tx,
|
||
"mixer_output_drop",
|
||
serde_json::json!({
|
||
"call_id": call_id,
|
||
"leg_id": leg_id,
|
||
"tool_leg_id": tool_leg_id,
|
||
"stream": stream,
|
||
"reason": reason,
|
||
"total_drops": total_drops,
|
||
}),
|
||
);
|
||
}
|
||
|
||
fn fade_concealment_from_last_frame(slot: &mut MixerLegSlot, samples: usize, decay: f32) {
|
||
let mut template = if slot.last_pcm_frame.is_empty() {
|
||
vec![0.0f32; MIX_FRAME_SIZE]
|
||
} else {
|
||
slot.last_pcm_frame.clone()
|
||
};
|
||
|
||
let mut remaining = samples;
|
||
while remaining > 0 {
|
||
for sample in &mut template {
|
||
*sample *= decay;
|
||
}
|
||
let take = remaining.min(template.len());
|
||
slot.pcm_buffer.extend(template.iter().take(take).copied());
|
||
remaining -= take;
|
||
}
|
||
}
|
||
|
||
fn append_packet_loss_concealment(slot: &mut MixerLegSlot, samples: usize) {
|
||
let mut remaining = samples.max(1);
|
||
while remaining > 0 {
|
||
let chunk = remaining.min(MIX_FRAME_SIZE);
|
||
if slot.codec_pt == codec_lib::PT_OPUS {
|
||
match slot.transcoder.opus_plc(chunk) {
|
||
Ok(mut pcm) => {
|
||
pcm.resize(chunk, 0.0);
|
||
slot.pcm_buffer.extend(pcm);
|
||
}
|
||
Err(_) => fade_concealment_from_last_frame(slot, chunk, 0.8),
|
||
}
|
||
} else {
|
||
fade_concealment_from_last_frame(slot, chunk, 0.85);
|
||
}
|
||
remaining -= chunk;
|
||
}
|
||
}
|
||
|
||
fn decode_packet_to_mix_pcm(slot: &mut MixerLegSlot, pkt: &RtpPacket) -> Option<Vec<f32>> {
|
||
let (pcm, rate) = slot
|
||
.transcoder
|
||
.decode_to_f32(&pkt.payload, pkt.payload_type)
|
||
.ok()?;
|
||
|
||
let pcm_48k = if rate == MIX_RATE {
|
||
pcm
|
||
} else {
|
||
slot.transcoder
|
||
.resample_f32(&pcm, rate, MIX_RATE)
|
||
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
|
||
};
|
||
|
||
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
|
||
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
|
||
} else {
|
||
pcm_48k
|
||
};
|
||
|
||
Some(processed)
|
||
}
|
||
|
||
fn queue_inbound_packet(slot: &mut MixerLegSlot, pkt: RtpPacket) {
|
||
if let Some(pcm_48k) = decode_packet_to_mix_pcm(slot, &pkt) {
|
||
if pcm_48k.is_empty() {
|
||
return;
|
||
}
|
||
|
||
if let Some(expected_ts) = slot.expected_rtp_timestamp {
|
||
let gap_ts = pkt.timestamp.wrapping_sub(expected_ts);
|
||
if is_forward_rtp_delta(gap_ts) {
|
||
let gap_samples = rtp_ts_to_mix_samples(slot.codec_pt, gap_ts);
|
||
if gap_samples <= MAX_GAP_FILL_SAMPLES {
|
||
append_packet_loss_concealment(slot, gap_samples);
|
||
} else {
|
||
slot.pcm_buffer.clear();
|
||
}
|
||
}
|
||
}
|
||
|
||
let packet_ts = mix_samples_to_rtp_ts(slot.codec_pt, pcm_48k.len());
|
||
if packet_ts > 0 {
|
||
slot.estimated_packet_ts = packet_ts;
|
||
slot.expected_rtp_timestamp = Some(pkt.timestamp.wrapping_add(packet_ts));
|
||
}
|
||
slot.pcm_buffer.extend(pcm_48k);
|
||
}
|
||
}
|
||
|
||
fn fill_leg_playout_buffer(slot: &mut MixerLegSlot) {
|
||
let mut steps = 0usize;
|
||
while slot.pcm_buffer.len() < MIX_FRAME_SIZE && steps < MAX_PACKET_STEPS_PER_TICK {
|
||
steps += 1;
|
||
match slot.jitter.consume() {
|
||
JitterResult::Packet(pkt) => queue_inbound_packet(slot, pkt),
|
||
JitterResult::Missing => {
|
||
let conceal_ts = slot
|
||
.estimated_packet_ts
|
||
.max(rtp_clock_increment(slot.codec_pt));
|
||
let conceal_samples =
|
||
rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts).clamp(1, MAX_GAP_FILL_SAMPLES);
|
||
append_packet_loss_concealment(slot, conceal_samples);
|
||
if let Some(expected_ts) = slot.expected_rtp_timestamp {
|
||
slot.expected_rtp_timestamp = Some(expected_ts.wrapping_add(conceal_ts));
|
||
}
|
||
}
|
||
JitterResult::Filling => break,
|
||
}
|
||
}
|
||
}
|
||
|
||
fn take_mix_frame(slot: &mut MixerLegSlot) -> Vec<f32> {
|
||
let mut frame = Vec::with_capacity(MIX_FRAME_SIZE);
|
||
while frame.len() < MIX_FRAME_SIZE {
|
||
if let Some(sample) = slot.pcm_buffer.pop_front() {
|
||
frame.push(sample);
|
||
} else {
|
||
frame.push(0.0);
|
||
}
|
||
}
|
||
frame
|
||
}
|
||
|
||
fn soft_limit_sample(sample: f32) -> f32 {
|
||
const KNEE: f32 = 0.85;
|
||
|
||
let abs = sample.abs();
|
||
if abs <= KNEE {
|
||
sample
|
||
} else {
|
||
let excess = abs - KNEE;
|
||
let compressed = KNEE + (excess / (1.0 + (excess / (1.0 - KNEE))));
|
||
sample.signum() * compressed.min(1.0)
|
||
}
|
||
}
|
||
|
||
fn try_send_leg_output(
|
||
out_tx: &OutTx,
|
||
call_id: &str,
|
||
leg_id: &str,
|
||
slot: &mut MixerLegSlot,
|
||
rtp: Vec<u8>,
|
||
stream: &str,
|
||
) {
|
||
let reason = match slot.outbound_tx.try_send(rtp) {
|
||
Ok(()) => return,
|
||
Err(mpsc::error::TrySendError::Full(_)) => "full",
|
||
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
|
||
};
|
||
|
||
slot.outbound_drops += 1;
|
||
emit_output_drop_event(
|
||
out_tx,
|
||
call_id,
|
||
Some(leg_id),
|
||
None,
|
||
stream,
|
||
reason,
|
||
slot.outbound_drops,
|
||
);
|
||
}
|
||
|
||
fn try_send_tool_output(
|
||
out_tx: &OutTx,
|
||
call_id: &str,
|
||
tool_leg_id: &str,
|
||
tool: &mut ToolLegSlot,
|
||
batch: ToolAudioBatch,
|
||
) {
|
||
let reason = match tool.audio_tx.try_send(batch) {
|
||
Ok(()) => return,
|
||
Err(mpsc::error::TrySendError::Full(_)) => "full",
|
||
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
|
||
};
|
||
|
||
tool.dropped_batches += 1;
|
||
emit_output_drop_event(
|
||
out_tx,
|
||
call_id,
|
||
None,
|
||
Some(tool_leg_id),
|
||
"tool-batch",
|
||
reason,
|
||
tool.dropped_batches,
|
||
);
|
||
}
|
||
|
||
fn cancel_prompt_producer(state: &mut IsolationState) {
|
||
if let Some(cancel_tx) = state.prompt_cancel_tx.take() {
|
||
let _ = cancel_tx.send(true);
|
||
}
|
||
}
|
||
|
||
fn cancel_isolated_interaction(state: &mut IsolationState) {
|
||
cancel_prompt_producer(state);
|
||
if let Some(tx) = state.result_tx.take() {
|
||
let _ = tx.send(InteractionResult::Cancelled);
|
||
}
|
||
}
|
||
|
||
fn drain_prompt_stream(
|
||
out_tx: &OutTx,
|
||
call_id: &str,
|
||
leg_id: &str,
|
||
state: &mut IsolationState,
|
||
) {
|
||
loop {
|
||
let Some(mut stream_rx) = state.prompt_stream_rx.take() else {
|
||
return;
|
||
};
|
||
|
||
match stream_rx.try_recv() {
|
||
Ok(TtsStreamMessage::Frames(frames)) => {
|
||
state.prompt_frames.extend(frames);
|
||
state.prompt_stream_rx = Some(stream_rx);
|
||
}
|
||
Ok(TtsStreamMessage::Finished) => {
|
||
state.prompt_stream_finished = true;
|
||
return;
|
||
}
|
||
Ok(TtsStreamMessage::Failed(error)) => {
|
||
emit_event(
|
||
out_tx,
|
||
"mixer_error",
|
||
serde_json::json!({
|
||
"call_id": call_id,
|
||
"leg_id": leg_id,
|
||
"error": format!("tts stream failed: {error}"),
|
||
}),
|
||
);
|
||
state.prompt_stream_finished = true;
|
||
return;
|
||
}
|
||
Err(mpsc::error::TryRecvError::Empty) => {
|
||
state.prompt_stream_rx = Some(stream_rx);
|
||
return;
|
||
}
|
||
Err(mpsc::error::TryRecvError::Disconnected) => {
|
||
state.prompt_stream_finished = true;
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Spawn the mixer task for a call. Returns the command sender and task handle.
|
||
pub fn spawn_mixer(call_id: String, out_tx: OutTx) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
|
||
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
|
||
|
||
let handle = tokio::spawn(async move {
|
||
mixer_loop(call_id, cmd_rx, out_tx).await;
|
||
});
|
||
|
||
(cmd_tx, handle)
|
||
}
|
||
|
||
/// The 20ms mixing loop.
|
||
async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, out_tx: OutTx) {
|
||
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
|
||
let mut tool_legs: HashMap<String, ToolLegSlot> = HashMap::new();
|
||
let mut interval = time::interval(Duration::from_millis(20));
|
||
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||
|
||
loop {
|
||
interval.tick().await;
|
||
|
||
// ── 1. Process control commands (non-blocking). ─────────────
|
||
loop {
|
||
match cmd_rx.try_recv() {
|
||
Ok(MixerCommand::AddLeg {
|
||
leg_id,
|
||
codec_pt,
|
||
inbound_rx,
|
||
outbound_tx,
|
||
}) => {
|
||
let transcoder = match TranscodeState::new() {
|
||
Ok(t) => t,
|
||
Err(e) => {
|
||
emit_event(
|
||
&out_tx,
|
||
"mixer_error",
|
||
serde_json::json!({
|
||
"call_id": call_id,
|
||
"leg_id": leg_id,
|
||
"error": format!("codec init: {e}"),
|
||
}),
|
||
);
|
||
continue;
|
||
}
|
||
};
|
||
legs.insert(
|
||
leg_id,
|
||
MixerLegSlot {
|
||
codec_pt,
|
||
transcoder,
|
||
denoiser: new_denoiser(),
|
||
inbound_rx,
|
||
outbound_tx,
|
||
pcm_buffer: VecDeque::new(),
|
||
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
|
||
expected_rtp_timestamp: None,
|
||
estimated_packet_ts: rtp_clock_increment(codec_pt),
|
||
silent_ticks: 0,
|
||
rtp_seq: 0,
|
||
rtp_ts: 0,
|
||
rtp_ssrc: rand::random(),
|
||
outbound_drops: 0,
|
||
role: LegRole::Participant,
|
||
jitter: JitterBuffer::new(),
|
||
},
|
||
);
|
||
}
|
||
Ok(MixerCommand::RemoveLeg { leg_id }) => {
|
||
// If the leg is isolated, send Cancelled before dropping.
|
||
if let Some(slot) = legs.get_mut(&leg_id) {
|
||
if let LegRole::Isolated(ref mut state) = slot.role {
|
||
cancel_isolated_interaction(state);
|
||
}
|
||
}
|
||
legs.remove(&leg_id);
|
||
// Channels drop → I/O tasks exit cleanly.
|
||
}
|
||
Ok(MixerCommand::Shutdown) => {
|
||
// Cancel all outstanding interactions before shutting down.
|
||
for slot in legs.values_mut() {
|
||
if let LegRole::Isolated(ref mut state) = slot.role {
|
||
cancel_isolated_interaction(state);
|
||
}
|
||
}
|
||
return;
|
||
}
|
||
Ok(MixerCommand::StartInteraction {
|
||
leg_id,
|
||
prompt_pcm_frames,
|
||
prompt_stream_rx,
|
||
prompt_cancel_tx,
|
||
expected_digits,
|
||
timeout_ms,
|
||
result_tx,
|
||
}) => {
|
||
if let Some(slot) = legs.get_mut(&leg_id) {
|
||
// Cancel any existing interaction first.
|
||
if let LegRole::Isolated(ref mut old_state) = slot.role {
|
||
cancel_isolated_interaction(old_state);
|
||
}
|
||
let timeout_ticks = timeout_ms / 20;
|
||
slot.role = LegRole::Isolated(IsolationState {
|
||
prompt_frames: VecDeque::from(prompt_pcm_frames),
|
||
prompt_stream_rx,
|
||
prompt_cancel_tx,
|
||
prompt_stream_finished: false,
|
||
expected_digits,
|
||
timeout_ticks_remaining: timeout_ticks,
|
||
prompt_done: false,
|
||
result_tx: Some(result_tx),
|
||
});
|
||
} else {
|
||
// Leg not found — immediately cancel.
|
||
if let Some(cancel_tx) = prompt_cancel_tx {
|
||
let _ = cancel_tx.send(true);
|
||
}
|
||
let _ = result_tx.send(InteractionResult::Cancelled);
|
||
}
|
||
}
|
||
Ok(MixerCommand::AddToolLeg {
|
||
leg_id,
|
||
tool_type,
|
||
audio_tx,
|
||
}) => {
|
||
tool_legs.insert(
|
||
leg_id,
|
||
ToolLegSlot {
|
||
tool_type,
|
||
audio_tx,
|
||
dropped_batches: 0,
|
||
},
|
||
);
|
||
}
|
||
Ok(MixerCommand::RemoveToolLeg { leg_id }) => {
|
||
tool_legs.remove(&leg_id);
|
||
// Dropping the ToolLegSlot drops audio_tx → background task sees channel close.
|
||
}
|
||
Err(mpsc::error::TryRecvError::Empty) => break,
|
||
Err(mpsc::error::TryRecvError::Disconnected) => return,
|
||
}
|
||
}
|
||
|
||
if legs.is_empty() && tool_legs.is_empty() {
|
||
continue;
|
||
}
|
||
|
||
// ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ────
|
||
// DTMF (PT 101) packets are collected separately.
|
||
// Audio packets are sorted by sequence number and decoded
|
||
// in order to maintain codec state (critical for G.722 ADPCM).
|
||
let leg_ids: Vec<String> = legs.keys().cloned().collect();
|
||
let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new();
|
||
|
||
for lid in &leg_ids {
|
||
let slot = legs.get_mut(lid).unwrap();
|
||
|
||
// Step 2a: Drain all pending packets into the jitter buffer.
|
||
let mut got_audio = false;
|
||
loop {
|
||
match slot.inbound_rx.try_recv() {
|
||
Ok(pkt) => {
|
||
if pkt.payload_type == 101 {
|
||
dtmf_forward.push((lid.clone(), pkt));
|
||
} else {
|
||
got_audio = true;
|
||
slot.jitter.push(pkt);
|
||
}
|
||
}
|
||
Err(_) => break,
|
||
}
|
||
}
|
||
|
||
// Step 2b: Decode enough RTP to cover one 20ms playout frame.
|
||
// Variable-duration packets (10ms, 20ms, 60ms, ...) accumulate in
|
||
// the per-leg PCM FIFO; we pop exactly one 20ms frame below.
|
||
fill_leg_playout_buffer(slot);
|
||
slot.last_pcm_frame = take_mix_frame(slot);
|
||
|
||
// Run jitter adaptation + prune stale packets.
|
||
slot.jitter.adapt();
|
||
slot.jitter.prune_stale();
|
||
|
||
// Silent ticks: based on actual network reception, not jitter buffer state.
|
||
if got_audio || dtmf_forward.iter().any(|(src, _)| src == lid) {
|
||
slot.silent_ticks = 0;
|
||
} else {
|
||
slot.silent_ticks += 1;
|
||
}
|
||
if slot.silent_ticks > 150 {
|
||
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||
slot.pcm_buffer.clear();
|
||
slot.expected_rtp_timestamp = None;
|
||
slot.estimated_packet_ts = rtp_clock_increment(slot.codec_pt);
|
||
}
|
||
}
|
||
|
||
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
|
||
// Accumulate as f64 to prevent precision loss when summing f32.
|
||
let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE];
|
||
for slot in legs.values() {
|
||
if matches!(slot.role, LegRole::Participant) {
|
||
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
|
||
total_mix[i] += s as f64;
|
||
}
|
||
}
|
||
}
|
||
|
||
// ── 4. Per-leg output. ──────────────────────────────────────
|
||
// Collect interaction completions to apply after the loop
|
||
// (can't mutate role while iterating mutably for encode).
|
||
let mut completed_interactions: Vec<(String, InteractionResult)> = Vec::new();
|
||
|
||
for (lid, slot) in legs.iter_mut() {
|
||
match &mut slot.role {
|
||
LegRole::Participant => {
|
||
// Mix-minus: total minus this leg's own contribution.
|
||
// Apply a light soft limiter instead of hard clipping the sum.
|
||
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
|
||
for i in 0..MIX_FRAME_SIZE {
|
||
let sample = (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
|
||
mix_minus.push(soft_limit_sample(sample));
|
||
}
|
||
|
||
// Resample from 48kHz to the leg's codec native rate.
|
||
let target_rate = codec_sample_rate(slot.codec_pt);
|
||
let resampled = if target_rate == MIX_RATE {
|
||
mix_minus
|
||
} else {
|
||
slot.transcoder
|
||
.resample_f32(&mix_minus, MIX_RATE, target_rate)
|
||
.unwrap_or_default()
|
||
};
|
||
|
||
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
|
||
let encoded = match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
|
||
Ok(e) if !e.is_empty() => e,
|
||
_ => continue,
|
||
};
|
||
|
||
// Build RTP packet with header.
|
||
let header =
|
||
build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc);
|
||
let mut rtp = header.to_vec();
|
||
rtp.extend_from_slice(&encoded);
|
||
|
||
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
|
||
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
|
||
|
||
try_send_leg_output(&out_tx, &call_id, lid, slot, rtp, "participant-audio");
|
||
}
|
||
LegRole::Isolated(state) => {
|
||
drain_prompt_stream(&out_tx, &call_id, lid, state);
|
||
|
||
// Check for DTMF digit from this leg.
|
||
let mut matched_digit: Option<char> = None;
|
||
for (src_lid, dtmf_pkt) in &dtmf_forward {
|
||
if src_lid == lid && dtmf_pkt.payload.len() >= 4 {
|
||
let event_id = dtmf_pkt.payload[0];
|
||
let end_bit = (dtmf_pkt.payload[1] & 0x80) != 0;
|
||
if end_bit {
|
||
const EVENT_CHARS: &[char] = &[
|
||
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#',
|
||
'A', 'B', 'C', 'D',
|
||
];
|
||
if let Some(&ch) = EVENT_CHARS.get(event_id as usize) {
|
||
if state.expected_digits.contains(&ch) {
|
||
matched_digit = Some(ch);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if let Some(digit) = matched_digit {
|
||
// Interaction complete — digit matched.
|
||
completed_interactions.push((lid.clone(), InteractionResult::Digit(digit)));
|
||
} else {
|
||
// Play prompt frame, wait for live TTS, or move to timeout once the
|
||
// prompt stream has fully drained.
|
||
let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() {
|
||
frame
|
||
} else if !state.prompt_stream_finished {
|
||
vec![0.0f32; MIX_FRAME_SIZE]
|
||
} else {
|
||
state.prompt_done = true;
|
||
vec![0.0f32; MIX_FRAME_SIZE]
|
||
};
|
||
|
||
// Encode prompt frame to the leg's codec.
|
||
let target_rate = codec_sample_rate(slot.codec_pt);
|
||
let resampled = if target_rate == MIX_RATE {
|
||
pcm_frame
|
||
} else {
|
||
slot.transcoder
|
||
.resample_f32(&pcm_frame, MIX_RATE, target_rate)
|
||
.unwrap_or_default()
|
||
};
|
||
|
||
let mut prompt_rtp: Option<Vec<u8>> = None;
|
||
if let Ok(encoded) =
|
||
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
|
||
{
|
||
if !encoded.is_empty() {
|
||
let header = build_rtp_header(
|
||
slot.codec_pt,
|
||
slot.rtp_seq,
|
||
slot.rtp_ts,
|
||
slot.rtp_ssrc,
|
||
);
|
||
let mut rtp = header.to_vec();
|
||
rtp.extend_from_slice(&encoded);
|
||
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
|
||
slot.rtp_ts =
|
||
slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
|
||
prompt_rtp = Some(rtp);
|
||
}
|
||
}
|
||
|
||
// Check timeout (only after prompt finishes).
|
||
if state.prompt_done {
|
||
if state.timeout_ticks_remaining == 0 {
|
||
completed_interactions
|
||
.push((lid.clone(), InteractionResult::Timeout));
|
||
} else {
|
||
state.timeout_ticks_remaining -= 1;
|
||
}
|
||
}
|
||
|
||
if let Some(rtp) = prompt_rtp {
|
||
try_send_leg_output(
|
||
&out_tx,
|
||
&call_id,
|
||
lid,
|
||
slot,
|
||
rtp,
|
||
"isolated-prompt",
|
||
);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// Apply completed interactions — revert legs to Participant.
|
||
for (lid, result) in completed_interactions {
|
||
if let Some(slot) = legs.get_mut(&lid) {
|
||
if let LegRole::Isolated(ref mut state) = slot.role {
|
||
cancel_prompt_producer(state);
|
||
if let Some(tx) = state.result_tx.take() {
|
||
let _ = tx.send(result);
|
||
}
|
||
}
|
||
slot.role = LegRole::Participant;
|
||
}
|
||
}
|
||
|
||
// ── 5. Distribute per-source audio to tool legs. ────────────
|
||
if !tool_legs.is_empty() {
|
||
// Collect participant PCM frames (computed in step 2).
|
||
let sources: Vec<ToolAudioSource> = legs
|
||
.iter()
|
||
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
|
||
.map(|(lid, s)| ToolAudioSource {
|
||
leg_id: lid.clone(),
|
||
pcm_48k: s.last_pcm_frame.clone(),
|
||
})
|
||
.collect();
|
||
|
||
for (tool_leg_id, tool) in tool_legs.iter_mut() {
|
||
let batch = ToolAudioBatch {
|
||
sources: sources
|
||
.iter()
|
||
.map(|s| ToolAudioSource {
|
||
leg_id: s.leg_id.clone(),
|
||
pcm_48k: s.pcm_48k.clone(),
|
||
})
|
||
.collect(),
|
||
};
|
||
try_send_tool_output(&out_tx, &call_id, tool_leg_id, tool, batch);
|
||
}
|
||
}
|
||
|
||
// ── 6. Forward DTMF packets between participant legs only. ──
|
||
for (source_lid, dtmf_pkt) in &dtmf_forward {
|
||
// Skip if the source is an isolated leg (its DTMF was handled in step 4).
|
||
if let Some(src_slot) = legs.get(source_lid) {
|
||
if matches!(src_slot.role, LegRole::Isolated(_)) {
|
||
continue;
|
||
}
|
||
}
|
||
for (target_lid, target_slot) in legs.iter_mut() {
|
||
if target_lid == source_lid {
|
||
continue; // Don't echo DTMF back to sender.
|
||
}
|
||
// Don't forward to isolated legs.
|
||
if matches!(target_slot.role, LegRole::Isolated(_)) {
|
||
continue;
|
||
}
|
||
let mut header = build_rtp_header(
|
||
101,
|
||
target_slot.rtp_seq,
|
||
target_slot.rtp_ts,
|
||
target_slot.rtp_ssrc,
|
||
);
|
||
if dtmf_pkt.marker {
|
||
header[1] |= 0x80; // Set marker bit.
|
||
}
|
||
let mut rtp_out = header.to_vec();
|
||
rtp_out.extend_from_slice(&dtmf_pkt.payload);
|
||
target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1);
|
||
// Don't increment rtp_ts for DTMF — it shares timestamp context with audio.
|
||
try_send_leg_output(&out_tx, &call_id, target_lid, target_slot, rtp_out, "dtmf");
|
||
}
|
||
}
|
||
}
|
||
}
|