//! Audio mixer — mix-minus engine for multiparty calls. //! //! Each Call spawns one mixer task. Legs communicate with the mixer via //! tokio mpsc channels — no shared mutable state, no lock contention. //! //! The mixer runs a 20ms tick loop: //! 1. Drain inbound channels, decode to PCM, resample to 16kHz //! 2. Compute total mix (sum of all legs' PCM as i32) //! 3. For each leg: mix-minus = total - own, resample to leg codec rate, encode, send use crate::ipc::{emit_event, OutTx}; use crate::rtp::{build_rtp_header, rtp_clock_increment}; use codec_lib::{codec_sample_rate, TranscodeState}; use std::collections::HashMap; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time::{self, Duration, MissedTickBehavior}; /// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample. const MIX_RATE: u32 = 16000; /// Samples per 20ms frame at the mixing rate. const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020 /// A raw RTP payload received from a leg (no RTP header). pub struct RtpPacket { pub payload: Vec, pub payload_type: u8, } /// Commands sent to the mixer task via a control channel. pub enum MixerCommand { /// Add a new leg to the mix. AddLeg { leg_id: String, codec_pt: u8, inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender>, }, /// Remove a leg from the mix (channels are dropped, I/O tasks exit). RemoveLeg { leg_id: String }, /// Shut down the mixer. Shutdown, } /// Internal per-leg state inside the mixer. struct MixerLegSlot { codec_pt: u8, transcoder: TranscodeState, inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender>, /// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus. last_pcm_frame: Vec, /// Number of consecutive ticks with no inbound packet. silent_ticks: u32, // RTP output state. rtp_seq: u16, rtp_ts: u32, rtp_ssrc: u32, } /// Spawn the mixer task for a call. Returns the command sender and task handle. pub fn spawn_mixer( call_id: String, out_tx: OutTx, ) -> (mpsc::Sender, JoinHandle<()>) { let (cmd_tx, cmd_rx) = mpsc::channel::(32); let handle = tokio::spawn(async move { mixer_loop(call_id, cmd_rx, out_tx).await; }); (cmd_tx, handle) } /// The 20ms mixing loop. async fn mixer_loop( call_id: String, mut cmd_rx: mpsc::Receiver, out_tx: OutTx, ) { let mut legs: HashMap = HashMap::new(); let mut interval = time::interval(Duration::from_millis(20)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await; // 1. Process control commands (non-blocking). loop { match cmd_rx.try_recv() { Ok(MixerCommand::AddLeg { leg_id, codec_pt, inbound_rx, outbound_tx, }) => { let transcoder = match TranscodeState::new() { Ok(t) => t, Err(e) => { emit_event( &out_tx, "mixer_error", serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "error": format!("codec init: {e}"), }), ); continue; } }; legs.insert( leg_id, MixerLegSlot { codec_pt, transcoder, inbound_rx, outbound_tx, last_pcm_frame: vec![0i16; MIX_FRAME_SIZE], silent_ticks: 0, rtp_seq: 0, rtp_ts: 0, rtp_ssrc: rand::random(), }, ); } Ok(MixerCommand::RemoveLeg { leg_id }) => { legs.remove(&leg_id); // Channels drop → I/O tasks exit cleanly. } Ok(MixerCommand::Shutdown) => return, Err(mpsc::error::TryRecvError::Empty) => break, Err(mpsc::error::TryRecvError::Disconnected) => return, } } if legs.is_empty() { continue; } // 2. Drain inbound packets, decode to 16kHz PCM. let leg_ids: Vec = legs.keys().cloned().collect(); for lid in &leg_ids { let slot = legs.get_mut(lid).unwrap(); // Drain channel, keep only the latest packet (simple jitter handling). let mut latest: Option = None; loop { match slot.inbound_rx.try_recv() { Ok(pkt) => latest = Some(pkt), Err(_) => break, } } if let Some(pkt) = latest { slot.silent_ticks = 0; match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) { Ok((pcm, rate)) => { // Resample to mixing rate if needed. let pcm_mix = if rate == MIX_RATE { pcm } else { slot.transcoder .resample(&pcm, rate, MIX_RATE) .unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE]) }; // Pad or truncate to exactly MIX_FRAME_SIZE. let mut frame = pcm_mix; frame.resize(MIX_FRAME_SIZE, 0); slot.last_pcm_frame = frame; } Err(_) => { // Decode failed — use silence. slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE]; } } } else { slot.silent_ticks += 1; // After 150 ticks (3 seconds) of silence, zero out to avoid stale audio. if slot.silent_ticks > 150 { slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE]; } } } // 3. Compute total mix (sum of all legs as i32 to avoid overflow). let mut total_mix = vec![0i32; MIX_FRAME_SIZE]; for slot in legs.values() { for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) { total_mix[i] += s as i32; } } // 4. For each leg: mix-minus, resample, encode, send. for slot in legs.values_mut() { // Mix-minus: total minus this leg's own contribution. let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE); for i in 0..MIX_FRAME_SIZE { let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32).clamp(-32768, 32767) as i16; mix_minus.push(sample); } // Resample from 16kHz to the leg's codec native rate. let target_rate = codec_sample_rate(slot.codec_pt); let resampled = if target_rate == MIX_RATE { mix_minus } else { slot.transcoder .resample(&mix_minus, MIX_RATE, target_rate) .unwrap_or_default() }; // Encode to the leg's codec. let encoded = match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) { Ok(e) if !e.is_empty() => e, _ => continue, }; // Build RTP packet with header. let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc); let mut rtp = header.to_vec(); rtp.extend_from_slice(&encoded); slot.rtp_seq = slot.rtp_seq.wrapping_add(1); slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); // Non-blocking send — drop frame if channel is full. let _ = slot.outbound_tx.try_send(rtp); } } }