Files
siprouter/rust/crates/proxy-engine/src/mixer.rs

606 lines
25 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
//! Audio mixer — mix-minus engine for multiparty calls.
//!
//! Each Call spawns one mixer task. Legs communicate with the mixer via
//! tokio mpsc channels — no shared mutable state, no lock contention.
//!
//! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame).
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
//! 5. For each tool leg: send per-source unmerged audio batch
//! 6. Forward DTMF between participant legs only
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
use nnnoiseless::DenoiseState;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
/// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample.
/// All processing (denoising, mixing) happens at this rate in f32 for maximum quality.
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
pub payload: Vec<u8>,
pub payload_type: u8,
/// RTP marker bit (first packet of a DTMF event, etc.).
pub marker: bool,
/// RTP sequence number for reordering.
pub seq: u16,
/// RTP timestamp from the original packet header.
pub timestamp: u32,
}
// ---------------------------------------------------------------------------
// Leg roles
// ---------------------------------------------------------------------------
/// What role a leg currently plays in the mixer.
enum LegRole {
/// Normal participant: contributes to mix, receives mix-minus.
Participant,
/// Temporarily isolated for IVR/consent interaction.
Isolated(IsolationState),
}
struct IsolationState {
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
prompt_frames: VecDeque<Vec<f32>>,
/// Digits that complete the interaction (e.g., ['1', '2']).
expected_digits: Vec<char>,
/// Ticks remaining before timeout (decremented each tick after prompt ends).
timeout_ticks_remaining: u32,
/// Whether we've finished playing the prompt.
prompt_done: bool,
/// Channel to send the result back to the command handler.
result_tx: Option<oneshot::Sender<InteractionResult>>,
}
/// Result of a leg interaction (consent prompt, IVR, etc.).
pub enum InteractionResult {
/// The participant pressed one of the expected digits.
Digit(char),
/// No digit was received within the timeout.
Timeout,
/// The leg was removed or the call tore down before completion.
Cancelled,
}
// ---------------------------------------------------------------------------
// Tool legs
// ---------------------------------------------------------------------------
/// Type of tool leg.
#[derive(Debug, Clone, Copy)]
pub enum ToolType {
Recording,
Transcription,
}
/// Per-source audio delivered to a tool leg each mixer tick.
pub struct ToolAudioBatch {
pub sources: Vec<ToolAudioSource>,
}
/// One participant's 20ms audio frame.
pub struct ToolAudioSource {
pub leg_id: String,
/// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples.
pub pcm_48k: Vec<f32>,
}
/// Internal storage for a tool leg inside the mixer.
struct ToolLegSlot {
#[allow(dead_code)]
tool_type: ToolType,
audio_tx: mpsc::Sender<ToolAudioBatch>,
}
// ---------------------------------------------------------------------------
// Commands
// ---------------------------------------------------------------------------
/// Commands sent to the mixer task via a control channel.
pub enum MixerCommand {
/// Add a new participant leg to the mix.
AddLeg {
leg_id: String,
codec_pt: u8,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
},
/// Remove a leg from the mix (channels are dropped, I/O tasks exit).
RemoveLeg { leg_id: String },
/// Shut down the mixer.
Shutdown,
/// Isolate a leg and start an interaction (consent prompt, IVR).
/// The leg is removed from the mix and hears the prompt instead.
/// DTMF from the leg is checked against expected_digits.
StartInteraction {
leg_id: String,
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
prompt_pcm_frames: Vec<Vec<f32>>,
expected_digits: Vec<char>,
timeout_ms: u32,
result_tx: oneshot::Sender<InteractionResult>,
},
/// Cancel an in-progress interaction (e.g., leg being removed).
CancelInteraction { leg_id: String },
/// Add a tool leg that receives per-source unmerged audio.
AddToolLeg {
leg_id: String,
tool_type: ToolType,
audio_tx: mpsc::Sender<ToolAudioBatch>,
},
/// Remove a tool leg (drops the channel, background task finalizes).
RemoveToolLeg { leg_id: String },
}
// ---------------------------------------------------------------------------
// Mixer internals
// ---------------------------------------------------------------------------
/// Internal per-leg state inside the mixer.
struct MixerLegSlot {
codec_pt: u8,
transcoder: TranscodeState,
/// Per-leg inbound denoiser (48kHz, 480-sample frames).
denoiser: Box<DenoiseState<'static>>,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
last_pcm_frame: Vec<f32>,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
// RTP output state.
rtp_seq: u16,
rtp_ts: u32,
rtp_ssrc: u32,
/// Current role of this leg in the mixer.
role: LegRole,
}
/// Spawn the mixer task for a call. Returns the command sender and task handle.
pub fn spawn_mixer(
call_id: String,
out_tx: OutTx,
) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
let handle = tokio::spawn(async move {
mixer_loop(call_id, cmd_rx, out_tx).await;
});
(cmd_tx, handle)
}
/// The 20ms mixing loop.
async fn mixer_loop(
call_id: String,
mut cmd_rx: mpsc::Receiver<MixerCommand>,
out_tx: OutTx,
) {
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
let mut tool_legs: HashMap<String, ToolLegSlot> = HashMap::new();
let mut interval = time::interval(Duration::from_millis(20));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
// ── 1. Process control commands (non-blocking). ─────────────
loop {
match cmd_rx.try_recv() {
Ok(MixerCommand::AddLeg {
leg_id,
codec_pt,
inbound_rx,
outbound_tx,
}) => {
let transcoder = match TranscodeState::new() {
Ok(t) => t,
Err(e) => {
emit_event(
&out_tx,
"mixer_error",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"error": format!("codec init: {e}"),
}),
);
continue;
}
};
legs.insert(
leg_id,
MixerLegSlot {
codec_pt,
transcoder,
denoiser: new_denoiser(),
inbound_rx,
outbound_tx,
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
rtp_ssrc: rand::random(),
role: LegRole::Participant,
},
);
}
Ok(MixerCommand::RemoveLeg { leg_id }) => {
// If the leg is isolated, send Cancelled before dropping.
if let Some(slot) = legs.get_mut(&leg_id) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
}
legs.remove(&leg_id);
// Channels drop → I/O tasks exit cleanly.
}
Ok(MixerCommand::Shutdown) => {
// Cancel all outstanding interactions before shutting down.
for slot in legs.values_mut() {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
}
return;
}
Ok(MixerCommand::StartInteraction {
leg_id,
prompt_pcm_frames,
expected_digits,
timeout_ms,
result_tx,
}) => {
if let Some(slot) = legs.get_mut(&leg_id) {
// Cancel any existing interaction first.
if let LegRole::Isolated(ref mut old_state) = slot.role {
if let Some(tx) = old_state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
let timeout_ticks = timeout_ms / 20;
slot.role = LegRole::Isolated(IsolationState {
prompt_frames: VecDeque::from(prompt_pcm_frames),
expected_digits,
timeout_ticks_remaining: timeout_ticks,
prompt_done: false,
result_tx: Some(result_tx),
});
} else {
// Leg not found — immediately cancel.
let _ = result_tx.send(InteractionResult::Cancelled);
}
}
Ok(MixerCommand::CancelInteraction { leg_id }) => {
if let Some(slot) = legs.get_mut(&leg_id) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
slot.role = LegRole::Participant;
}
}
Ok(MixerCommand::AddToolLeg {
leg_id,
tool_type,
audio_tx,
}) => {
tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx });
}
Ok(MixerCommand::RemoveToolLeg { leg_id }) => {
tool_legs.remove(&leg_id);
// Dropping the ToolLegSlot drops audio_tx → background task sees channel close.
}
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => return,
}
}
if legs.is_empty() && tool_legs.is_empty() {
continue;
}
// ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ────
// DTMF (PT 101) packets are collected separately.
// Audio packets are sorted by sequence number and decoded
// in order to maintain codec state (critical for G.722 ADPCM).
let leg_ids: Vec<String> = legs.keys().cloned().collect();
let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new();
for lid in &leg_ids {
let slot = legs.get_mut(lid).unwrap();
// Drain channel — collect DTMF separately, collect ALL audio packets.
let mut audio_packets: Vec<RtpPacket> = Vec::new();
loop {
match slot.inbound_rx.try_recv() {
Ok(pkt) => {
if pkt.payload_type == 101 {
// DTMF telephone-event: collect for processing.
dtmf_forward.push((lid.clone(), pkt));
} else {
audio_packets.push(pkt);
}
}
Err(_) => break,
}
}
if !audio_packets.is_empty() {
slot.silent_ticks = 0;
// Sort by sequence number for correct codec state progression.
// This prevents G.722 ADPCM state corruption from out-of-order packets.
audio_packets.sort_by_key(|p| p.seq);
// Decode ALL packets in order (maintains codec state),
// but only keep the last decoded frame for mixing.
for pkt in &audio_packets {
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to 48kHz mixing rate if needed.
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
// Per-leg inbound denoising at 48kHz.
// Only for SIP telephony legs — WebRTC browsers
// already apply noise suppression via getUserMedia.
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = processed;
frame.resize(MIX_FRAME_SIZE, 0.0);
slot.last_pcm_frame = frame;
}
Err(_) => {}
}
}
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
// Got DTMF but no audio — don't bump silent_ticks (DTMF counts as activity).
slot.silent_ticks = 0;
} else {
slot.silent_ticks += 1;
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
}
}
}
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
// Accumulate as f64 to prevent precision loss when summing f32.
let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE];
for slot in legs.values() {
if matches!(slot.role, LegRole::Participant) {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as f64;
}
}
}
// ── 4. Per-leg output. ──────────────────────────────────────
// Collect interaction completions to apply after the loop
// (can't mutate role while iterating mutably for encode).
let mut completed_interactions: Vec<(String, InteractionResult)> = Vec::new();
for (lid, slot) in legs.iter_mut() {
match &mut slot.role {
LegRole::Participant => {
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(sample.clamp(-1.0, 1.0));
}
// Resample from 48kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample_f32(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
let encoded =
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
// Build RTP packet with header.
let header =
build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc);
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
// Non-blocking send — drop frame if channel is full.
let _ = slot.outbound_tx.try_send(rtp);
}
LegRole::Isolated(state) => {
// Check for DTMF digit from this leg.
let mut matched_digit: Option<char> = None;
for (src_lid, dtmf_pkt) in &dtmf_forward {
if src_lid == lid && dtmf_pkt.payload.len() >= 4 {
let event_id = dtmf_pkt.payload[0];
let end_bit = (dtmf_pkt.payload[1] & 0x80) != 0;
if end_bit {
const EVENT_CHARS: &[char] = &[
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#',
'A', 'B', 'C', 'D',
];
if let Some(&ch) = EVENT_CHARS.get(event_id as usize) {
if state.expected_digits.contains(&ch) {
matched_digit = Some(ch);
break;
}
}
}
}
}
if let Some(digit) = matched_digit {
// Interaction complete — digit matched.
completed_interactions
.push((lid.clone(), InteractionResult::Digit(digit)));
} else {
// Play prompt frame or silence.
let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() {
frame
} else {
state.prompt_done = true;
vec![0.0f32; MIX_FRAME_SIZE]
};
// Encode prompt frame to the leg's codec.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
pcm_frame
} else {
slot.transcoder
.resample_f32(&pcm_frame, MIX_RATE, target_rate)
.unwrap_or_default()
};
if let Ok(encoded) =
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
{
if !encoded.is_empty() {
let header = build_rtp_header(
slot.codec_pt,
slot.rtp_seq,
slot.rtp_ts,
slot.rtp_ssrc,
);
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot
.rtp_ts
.wrapping_add(rtp_clock_increment(slot.codec_pt));
let _ = slot.outbound_tx.try_send(rtp);
}
}
// Check timeout (only after prompt finishes).
if state.prompt_done {
if state.timeout_ticks_remaining == 0 {
completed_interactions
.push((lid.clone(), InteractionResult::Timeout));
} else {
state.timeout_ticks_remaining -= 1;
}
}
}
}
}
}
// Apply completed interactions — revert legs to Participant.
for (lid, result) in completed_interactions {
if let Some(slot) = legs.get_mut(&lid) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(result);
}
}
slot.role = LegRole::Participant;
}
}
// ── 5. Distribute per-source audio to tool legs. ────────────
if !tool_legs.is_empty() {
// Collect participant PCM frames (computed in step 2).
let sources: Vec<ToolAudioSource> = legs
.iter()
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
.map(|(lid, s)| ToolAudioSource {
leg_id: lid.clone(),
pcm_48k: s.last_pcm_frame.clone(),
})
.collect();
for tool in tool_legs.values() {
let batch = ToolAudioBatch {
sources: sources
.iter()
.map(|s| ToolAudioSource {
leg_id: s.leg_id.clone(),
pcm_48k: s.pcm_48k.clone(),
})
.collect(),
};
// Non-blocking send — drop batch if tool can't keep up.
let _ = tool.audio_tx.try_send(batch);
}
}
// ── 6. Forward DTMF packets between participant legs only. ──
for (source_lid, dtmf_pkt) in &dtmf_forward {
// Skip if the source is an isolated leg (its DTMF was handled in step 4).
if let Some(src_slot) = legs.get(source_lid) {
if matches!(src_slot.role, LegRole::Isolated(_)) {
continue;
}
}
for (target_lid, target_slot) in legs.iter_mut() {
if target_lid == source_lid {
continue; // Don't echo DTMF back to sender.
}
// Don't forward to isolated legs.
if matches!(target_slot.role, LegRole::Isolated(_)) {
continue;
}
let mut header = build_rtp_header(
101,
target_slot.rtp_seq,
target_slot.rtp_ts,
target_slot.rtp_ssrc,
);
if dtmf_pkt.marker {
header[1] |= 0x80; // Set marker bit.
}
let mut rtp_out = header.to_vec();
rtp_out.extend_from_slice(&dtmf_pkt.payload);
target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1);
// Don't increment rtp_ts for DTMF — it shares timestamp context with audio.
let _ = target_slot.outbound_tx.try_send(rtp_out);
}
}
}
}