//! Audio mixer — mix-minus engine for multiparty calls. //! //! Each Call spawns one mixer task. Legs communicate with the mixer via //! tokio mpsc channels — no shared mutable state, no lock contention. //! //! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame). //! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz. //! //! The mixer runs a 20ms tick loop: //! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg //! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64) //! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send //! 4. For each isolated leg: play prompt frame or silence, check DTMF //! 5. For each tool leg: send per-source unmerged audio batch //! 6. Forward DTMF between participant legs only use crate::ipc::{emit_event, OutTx}; use crate::rtp::{build_rtp_header, rtp_clock_increment}; use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState}; use nnnoiseless::DenoiseState; use std::collections::{HashMap, VecDeque}; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio::time::{self, Duration, MissedTickBehavior}; /// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample. /// All processing (denoising, mixing) happens at this rate in f32 for maximum quality. const MIX_RATE: u32 = 48000; /// Samples per 20ms frame at the mixing rate. const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020 /// A raw RTP payload received from a leg (no RTP header). pub struct RtpPacket { pub payload: Vec, pub payload_type: u8, /// RTP marker bit (first packet of a DTMF event, etc.). pub marker: bool, /// RTP sequence number for reordering. pub seq: u16, /// RTP timestamp from the original packet header. pub timestamp: u32, } // --------------------------------------------------------------------------- // Leg roles // --------------------------------------------------------------------------- /// What role a leg currently plays in the mixer. enum LegRole { /// Normal participant: contributes to mix, receives mix-minus. Participant, /// Temporarily isolated for IVR/consent interaction. Isolated(IsolationState), } struct IsolationState { /// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback. prompt_frames: VecDeque>, /// Digits that complete the interaction (e.g., ['1', '2']). expected_digits: Vec, /// Ticks remaining before timeout (decremented each tick after prompt ends). timeout_ticks_remaining: u32, /// Whether we've finished playing the prompt. prompt_done: bool, /// Channel to send the result back to the command handler. result_tx: Option>, } /// Result of a leg interaction (consent prompt, IVR, etc.). pub enum InteractionResult { /// The participant pressed one of the expected digits. Digit(char), /// No digit was received within the timeout. Timeout, /// The leg was removed or the call tore down before completion. Cancelled, } // --------------------------------------------------------------------------- // Tool legs // --------------------------------------------------------------------------- /// Type of tool leg. #[derive(Debug, Clone, Copy)] pub enum ToolType { Recording, Transcription, } /// Per-source audio delivered to a tool leg each mixer tick. pub struct ToolAudioBatch { pub sources: Vec, } /// One participant's 20ms audio frame. pub struct ToolAudioSource { pub leg_id: String, /// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples. pub pcm_48k: Vec, } /// Internal storage for a tool leg inside the mixer. struct ToolLegSlot { #[allow(dead_code)] tool_type: ToolType, audio_tx: mpsc::Sender, } // --------------------------------------------------------------------------- // Commands // --------------------------------------------------------------------------- /// Commands sent to the mixer task via a control channel. pub enum MixerCommand { /// Add a new participant leg to the mix. AddLeg { leg_id: String, codec_pt: u8, inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender>, }, /// Remove a leg from the mix (channels are dropped, I/O tasks exit). RemoveLeg { leg_id: String }, /// Shut down the mixer. Shutdown, /// Isolate a leg and start an interaction (consent prompt, IVR). /// The leg is removed from the mix and hears the prompt instead. /// DTMF from the leg is checked against expected_digits. StartInteraction { leg_id: String, /// PCM frames at MIX_RATE (48kHz f32), each 960 samples. prompt_pcm_frames: Vec>, expected_digits: Vec, timeout_ms: u32, result_tx: oneshot::Sender, }, /// Cancel an in-progress interaction (e.g., leg being removed). CancelInteraction { leg_id: String }, /// Add a tool leg that receives per-source unmerged audio. AddToolLeg { leg_id: String, tool_type: ToolType, audio_tx: mpsc::Sender, }, /// Remove a tool leg (drops the channel, background task finalizes). RemoveToolLeg { leg_id: String }, } // --------------------------------------------------------------------------- // Mixer internals // --------------------------------------------------------------------------- /// Internal per-leg state inside the mixer. struct MixerLegSlot { codec_pt: u8, transcoder: TranscodeState, /// Per-leg inbound denoiser (48kHz, 480-sample frames). denoiser: Box>, inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender>, /// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32). last_pcm_frame: Vec, /// Number of consecutive ticks with no inbound packet. silent_ticks: u32, // RTP output state. rtp_seq: u16, rtp_ts: u32, rtp_ssrc: u32, /// Current role of this leg in the mixer. role: LegRole, } /// Spawn the mixer task for a call. Returns the command sender and task handle. pub fn spawn_mixer( call_id: String, out_tx: OutTx, ) -> (mpsc::Sender, JoinHandle<()>) { let (cmd_tx, cmd_rx) = mpsc::channel::(32); let handle = tokio::spawn(async move { mixer_loop(call_id, cmd_rx, out_tx).await; }); (cmd_tx, handle) } /// The 20ms mixing loop. async fn mixer_loop( call_id: String, mut cmd_rx: mpsc::Receiver, out_tx: OutTx, ) { let mut legs: HashMap = HashMap::new(); let mut tool_legs: HashMap = HashMap::new(); let mut interval = time::interval(Duration::from_millis(20)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await; // ── 1. Process control commands (non-blocking). ───────────── loop { match cmd_rx.try_recv() { Ok(MixerCommand::AddLeg { leg_id, codec_pt, inbound_rx, outbound_tx, }) => { let transcoder = match TranscodeState::new() { Ok(t) => t, Err(e) => { emit_event( &out_tx, "mixer_error", serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "error": format!("codec init: {e}"), }), ); continue; } }; legs.insert( leg_id, MixerLegSlot { codec_pt, transcoder, denoiser: new_denoiser(), inbound_rx, outbound_tx, last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE], silent_ticks: 0, rtp_seq: 0, rtp_ts: 0, rtp_ssrc: rand::random(), role: LegRole::Participant, }, ); } Ok(MixerCommand::RemoveLeg { leg_id }) => { // If the leg is isolated, send Cancelled before dropping. if let Some(slot) = legs.get_mut(&leg_id) { if let LegRole::Isolated(ref mut state) = slot.role { if let Some(tx) = state.result_tx.take() { let _ = tx.send(InteractionResult::Cancelled); } } } legs.remove(&leg_id); // Channels drop → I/O tasks exit cleanly. } Ok(MixerCommand::Shutdown) => { // Cancel all outstanding interactions before shutting down. for slot in legs.values_mut() { if let LegRole::Isolated(ref mut state) = slot.role { if let Some(tx) = state.result_tx.take() { let _ = tx.send(InteractionResult::Cancelled); } } } return; } Ok(MixerCommand::StartInteraction { leg_id, prompt_pcm_frames, expected_digits, timeout_ms, result_tx, }) => { if let Some(slot) = legs.get_mut(&leg_id) { // Cancel any existing interaction first. if let LegRole::Isolated(ref mut old_state) = slot.role { if let Some(tx) = old_state.result_tx.take() { let _ = tx.send(InteractionResult::Cancelled); } } let timeout_ticks = timeout_ms / 20; slot.role = LegRole::Isolated(IsolationState { prompt_frames: VecDeque::from(prompt_pcm_frames), expected_digits, timeout_ticks_remaining: timeout_ticks, prompt_done: false, result_tx: Some(result_tx), }); } else { // Leg not found — immediately cancel. let _ = result_tx.send(InteractionResult::Cancelled); } } Ok(MixerCommand::CancelInteraction { leg_id }) => { if let Some(slot) = legs.get_mut(&leg_id) { if let LegRole::Isolated(ref mut state) = slot.role { if let Some(tx) = state.result_tx.take() { let _ = tx.send(InteractionResult::Cancelled); } } slot.role = LegRole::Participant; } } Ok(MixerCommand::AddToolLeg { leg_id, tool_type, audio_tx, }) => { tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx }); } Ok(MixerCommand::RemoveToolLeg { leg_id }) => { tool_legs.remove(&leg_id); // Dropping the ToolLegSlot drops audio_tx → background task sees channel close. } Err(mpsc::error::TryRecvError::Empty) => break, Err(mpsc::error::TryRecvError::Disconnected) => return, } } if legs.is_empty() && tool_legs.is_empty() { continue; } // ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ──── // DTMF (PT 101) packets are collected separately. // Audio packets are sorted by sequence number and decoded // in order to maintain codec state (critical for G.722 ADPCM). let leg_ids: Vec = legs.keys().cloned().collect(); let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new(); for lid in &leg_ids { let slot = legs.get_mut(lid).unwrap(); // Drain channel — collect DTMF separately, collect ALL audio packets. let mut audio_packets: Vec = Vec::new(); loop { match slot.inbound_rx.try_recv() { Ok(pkt) => { if pkt.payload_type == 101 { // DTMF telephone-event: collect for processing. dtmf_forward.push((lid.clone(), pkt)); } else { audio_packets.push(pkt); } } Err(_) => break, } } if !audio_packets.is_empty() { slot.silent_ticks = 0; // Sort by sequence number for correct codec state progression. // This prevents G.722 ADPCM state corruption from out-of-order packets. audio_packets.sort_by_key(|p| p.seq); // Decode ALL packets in order (maintains codec state), // but only keep the last decoded frame for mixing. for pkt in &audio_packets { match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) { Ok((pcm, rate)) => { // Resample to 48kHz mixing rate if needed. let pcm_48k = if rate == MIX_RATE { pcm } else { slot.transcoder .resample_f32(&pcm, rate, MIX_RATE) .unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE]) }; // Per-leg inbound denoising at 48kHz. // Skip for Opus/WebRTC legs — browsers already apply // their own noise suppression via getUserMedia. let processed = if slot.codec_pt != codec_lib::PT_OPUS { TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k) } else { pcm_48k }; // Pad or truncate to exactly MIX_FRAME_SIZE. let mut frame = processed; frame.resize(MIX_FRAME_SIZE, 0.0); slot.last_pcm_frame = frame; } Err(_) => {} } } } else if dtmf_forward.iter().any(|(src, _)| src == lid) { // Got DTMF but no audio — don't bump silent_ticks (DTMF counts as activity). slot.silent_ticks = 0; } else { slot.silent_ticks += 1; // After 150 ticks (3 seconds) of silence, zero out to avoid stale audio. if slot.silent_ticks > 150 { slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; } } } // ── 3. Compute total mix from PARTICIPANT legs only. ──────── // Accumulate as f64 to prevent precision loss when summing f32. let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE]; for slot in legs.values() { if matches!(slot.role, LegRole::Participant) { for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) { total_mix[i] += s as f64; } } } // ── 4. Per-leg output. ────────────────────────────────────── // Collect interaction completions to apply after the loop // (can't mutate role while iterating mutably for encode). let mut completed_interactions: Vec<(String, InteractionResult)> = Vec::new(); for (lid, slot) in legs.iter_mut() { match &mut slot.role { LegRole::Participant => { // Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0]. let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE); for i in 0..MIX_FRAME_SIZE { let sample = (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32; mix_minus.push(sample.clamp(-1.0, 1.0)); } // Resample from 48kHz to the leg's codec native rate. let target_rate = codec_sample_rate(slot.codec_pt); let resampled = if target_rate == MIX_RATE { mix_minus } else { slot.transcoder .resample_f32(&mix_minus, MIX_RATE, target_rate) .unwrap_or_default() }; // Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32). let encoded = match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) { Ok(e) if !e.is_empty() => e, _ => continue, }; // Build RTP packet with header. let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc); let mut rtp = header.to_vec(); rtp.extend_from_slice(&encoded); slot.rtp_seq = slot.rtp_seq.wrapping_add(1); slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); // Non-blocking send — drop frame if channel is full. let _ = slot.outbound_tx.try_send(rtp); } LegRole::Isolated(state) => { // Check for DTMF digit from this leg. let mut matched_digit: Option = None; for (src_lid, dtmf_pkt) in &dtmf_forward { if src_lid == lid && dtmf_pkt.payload.len() >= 4 { let event_id = dtmf_pkt.payload[0]; let end_bit = (dtmf_pkt.payload[1] & 0x80) != 0; if end_bit { const EVENT_CHARS: &[char] = &[ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#', 'A', 'B', 'C', 'D', ]; if let Some(&ch) = EVENT_CHARS.get(event_id as usize) { if state.expected_digits.contains(&ch) { matched_digit = Some(ch); break; } } } } } if let Some(digit) = matched_digit { // Interaction complete — digit matched. completed_interactions .push((lid.clone(), InteractionResult::Digit(digit))); } else { // Play prompt frame or silence. let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() { frame } else { state.prompt_done = true; vec![0.0f32; MIX_FRAME_SIZE] }; // Encode prompt frame to the leg's codec. let target_rate = codec_sample_rate(slot.codec_pt); let resampled = if target_rate == MIX_RATE { pcm_frame } else { slot.transcoder .resample_f32(&pcm_frame, MIX_RATE, target_rate) .unwrap_or_default() }; if let Ok(encoded) = slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) { if !encoded.is_empty() { let header = build_rtp_header( slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc, ); let mut rtp = header.to_vec(); rtp.extend_from_slice(&encoded); slot.rtp_seq = slot.rtp_seq.wrapping_add(1); slot.rtp_ts = slot .rtp_ts .wrapping_add(rtp_clock_increment(slot.codec_pt)); let _ = slot.outbound_tx.try_send(rtp); } } // Check timeout (only after prompt finishes). if state.prompt_done { if state.timeout_ticks_remaining == 0 { completed_interactions .push((lid.clone(), InteractionResult::Timeout)); } else { state.timeout_ticks_remaining -= 1; } } } } } } // Apply completed interactions — revert legs to Participant. for (lid, result) in completed_interactions { if let Some(slot) = legs.get_mut(&lid) { if let LegRole::Isolated(ref mut state) = slot.role { if let Some(tx) = state.result_tx.take() { let _ = tx.send(result); } } slot.role = LegRole::Participant; } } // ── 5. Distribute per-source audio to tool legs. ──────────── if !tool_legs.is_empty() { // Collect participant PCM frames (computed in step 2). let sources: Vec = legs .iter() .filter(|(_, s)| matches!(s.role, LegRole::Participant)) .map(|(lid, s)| ToolAudioSource { leg_id: lid.clone(), pcm_48k: s.last_pcm_frame.clone(), }) .collect(); for tool in tool_legs.values() { let batch = ToolAudioBatch { sources: sources .iter() .map(|s| ToolAudioSource { leg_id: s.leg_id.clone(), pcm_48k: s.pcm_48k.clone(), }) .collect(), }; // Non-blocking send — drop batch if tool can't keep up. let _ = tool.audio_tx.try_send(batch); } } // ── 6. Forward DTMF packets between participant legs only. ── for (source_lid, dtmf_pkt) in &dtmf_forward { // Skip if the source is an isolated leg (its DTMF was handled in step 4). if let Some(src_slot) = legs.get(source_lid) { if matches!(src_slot.role, LegRole::Isolated(_)) { continue; } } for (target_lid, target_slot) in legs.iter_mut() { if target_lid == source_lid { continue; // Don't echo DTMF back to sender. } // Don't forward to isolated legs. if matches!(target_slot.role, LegRole::Isolated(_)) { continue; } let mut header = build_rtp_header( 101, target_slot.rtp_seq, target_slot.rtp_ts, target_slot.rtp_ssrc, ); if dtmf_pkt.marker { header[1] |= 0x80; // Set marker bit. } let mut rtp_out = header.to_vec(); rtp_out.extend_from_slice(&dtmf_pkt.payload); target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1); // Don't increment rtp_ts for DTMF — it shares timestamp context with audio. let _ = target_slot.outbound_tx.try_send(rtp_out); } } } }