//! Audio mixer — mix-minus engine for multiparty calls. //! //! Each Call spawns one mixer task. Legs communicate with the mixer via //! tokio mpsc channels — no shared mutable state, no lock contention. //! //! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame). //! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz. //! //! The mixer runs a 20ms tick loop: //! 1. Drain inbound channels, reorder RTP, decode variable-duration packets to 48kHz, //! and queue them in per-leg PCM buffers //! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64) //! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send //! 4. For each isolated leg: play prompt frame or silence, check DTMF //! 5. For each tool leg: send per-source unmerged audio batch //! 6. Forward DTMF between participant legs only use crate::ipc::{emit_event, OutTx}; use crate::jitter_buffer::{JitterBuffer, JitterResult}; use crate::rtp::{build_rtp_header, rtp_clock_increment, rtp_clock_rate}; use crate::tts::TtsStreamMessage; use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState}; use nnnoiseless::DenoiseState; use std::collections::{HashMap, VecDeque}; use tokio::sync::{mpsc, oneshot, watch}; use tokio::task::JoinHandle; use tokio::time::{self, Duration, MissedTickBehavior}; /// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample. /// All processing (denoising, mixing) happens at this rate in f32 for maximum quality. const MIX_RATE: u32 = 48000; /// Samples per 20ms frame at the mixing rate. const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020 /// Safety cap for how much timestamp-derived gap fill we synthesize at once. const MAX_GAP_FILL_SAMPLES: usize = MIX_FRAME_SIZE * 6; // 120ms /// Bound how many decode / concealment steps a leg can consume in one tick. const MAX_PACKET_STEPS_PER_TICK: usize = 24; /// Report the first output drop immediately, then every N drops. const DROP_REPORT_INTERVAL: u64 = 50; /// A raw RTP payload received from a leg (no RTP header). pub struct RtpPacket { pub payload: Vec, pub payload_type: u8, /// RTP marker bit (first packet of a DTMF event, etc.). pub marker: bool, /// RTP sequence number for reordering. pub seq: u16, /// RTP timestamp from the original packet header. pub timestamp: u32, } // --------------------------------------------------------------------------- // Leg roles // --------------------------------------------------------------------------- /// What role a leg currently plays in the mixer. enum LegRole { /// Normal participant: contributes to mix, receives mix-minus. Participant, /// Temporarily isolated for IVR/consent interaction. Isolated(IsolationState), } struct IsolationState { /// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback. prompt_frames: VecDeque>, /// Live TTS frames arrive here while playback is already in progress. prompt_stream_rx: Option>, /// Cancels the background TTS producer when the interaction ends early. prompt_cancel_tx: Option>, /// Whether the live prompt stream has ended. prompt_stream_finished: bool, /// Digits that complete the interaction (e.g., ['1', '2']). expected_digits: Vec, /// Ticks remaining before timeout (decremented each tick after prompt ends). timeout_ticks_remaining: u32, /// Whether we've finished playing the prompt. prompt_done: bool, /// Channel to send the result back to the command handler. result_tx: Option>, } /// Result of a leg interaction (consent prompt, IVR, etc.). pub enum InteractionResult { /// The participant pressed one of the expected digits. Digit(char), /// No digit was received within the timeout. Timeout, /// The leg was removed or the call tore down before completion. Cancelled, } // --------------------------------------------------------------------------- // Tool legs // --------------------------------------------------------------------------- /// Type of tool leg. #[derive(Debug, Clone, Copy)] pub enum ToolType { Recording, Transcription, } /// Per-source audio delivered to a tool leg each mixer tick. pub struct ToolAudioBatch { pub sources: Vec, } /// One participant's 20ms audio frame. pub struct ToolAudioSource { pub leg_id: String, /// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples. pub pcm_48k: Vec, } /// Internal storage for a tool leg inside the mixer. struct ToolLegSlot { #[allow(dead_code)] tool_type: ToolType, audio_tx: mpsc::Sender, dropped_batches: u64, } // --------------------------------------------------------------------------- // Commands // --------------------------------------------------------------------------- /// Commands sent to the mixer task via a control channel. pub enum MixerCommand { /// Add a new participant leg to the mix. AddLeg { leg_id: String, codec_pt: u8, inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender>, }, /// Remove a leg from the mix (channels are dropped, I/O tasks exit). RemoveLeg { leg_id: String }, /// Shut down the mixer. Shutdown, /// Isolate a leg and start an interaction (consent prompt, IVR). /// The leg is removed from the mix and hears the prompt instead. /// DTMF from the leg is checked against expected_digits. StartInteraction { leg_id: String, /// PCM frames at MIX_RATE (48kHz f32), each 960 samples. prompt_pcm_frames: Vec>, /// Optional live prompt stream. Frames are appended as they are synthesized. prompt_stream_rx: Option>, /// Optional cancellation handle for the live prompt stream. prompt_cancel_tx: Option>, expected_digits: Vec, timeout_ms: u32, result_tx: oneshot::Sender, }, /// Add a tool leg that receives per-source unmerged audio. AddToolLeg { leg_id: String, tool_type: ToolType, audio_tx: mpsc::Sender, }, /// Remove a tool leg (drops the channel, background task finalizes). RemoveToolLeg { leg_id: String }, } // --------------------------------------------------------------------------- // Mixer internals // --------------------------------------------------------------------------- /// Internal per-leg state inside the mixer. struct MixerLegSlot { codec_pt: u8, transcoder: TranscodeState, /// Per-leg inbound denoiser (48kHz, 480-sample frames). denoiser: Box>, inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender>, /// Decoded PCM waiting for playout. Variable-duration RTP packets are /// decoded into this FIFO; the mixer consumes exactly one 20ms frame per tick. pcm_buffer: VecDeque, /// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32). last_pcm_frame: Vec, /// Next RTP timestamp expected from the inbound stream. expected_rtp_timestamp: Option, /// Best-effort estimate of packet duration in RTP clock units. estimated_packet_ts: u32, /// Number of consecutive ticks with no inbound packet. silent_ticks: u32, /// Per-leg jitter buffer for packet reordering and timing. jitter: JitterBuffer, // RTP output state. rtp_seq: u16, rtp_ts: u32, rtp_ssrc: u32, /// Dropped outbound frames for this leg (queue full / closed). outbound_drops: u64, /// Current role of this leg in the mixer. role: LegRole, } fn mix_samples_to_rtp_ts(codec_pt: u8, mix_samples: usize) -> u32 { let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64; (((mix_samples as u64 * clock_rate) + (MIX_RATE as u64 / 2)) / MIX_RATE as u64) as u32 } fn rtp_ts_to_mix_samples(codec_pt: u8, rtp_ts: u32) -> usize { let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64; (((rtp_ts as u64 * MIX_RATE as u64) + (clock_rate / 2)) / clock_rate) as usize } fn is_forward_rtp_delta(delta: u32) -> bool { delta > 0 && delta < 0x8000_0000 } fn should_emit_drop_event(total_drops: u64) -> bool { total_drops == 1 || total_drops % DROP_REPORT_INTERVAL == 0 } fn emit_output_drop_event( out_tx: &OutTx, call_id: &str, leg_id: Option<&str>, tool_leg_id: Option<&str>, stream: &str, reason: &str, total_drops: u64, ) { if !should_emit_drop_event(total_drops) { return; } emit_event( out_tx, "mixer_output_drop", serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "tool_leg_id": tool_leg_id, "stream": stream, "reason": reason, "total_drops": total_drops, }), ); } fn fade_concealment_from_last_frame(slot: &mut MixerLegSlot, samples: usize, decay: f32) { let mut template = if slot.last_pcm_frame.is_empty() { vec![0.0f32; MIX_FRAME_SIZE] } else { slot.last_pcm_frame.clone() }; let mut remaining = samples; while remaining > 0 { for sample in &mut template { *sample *= decay; } let take = remaining.min(template.len()); slot.pcm_buffer.extend(template.iter().take(take).copied()); remaining -= take; } } fn append_packet_loss_concealment(slot: &mut MixerLegSlot, samples: usize) { let mut remaining = samples.max(1); while remaining > 0 { let chunk = remaining.min(MIX_FRAME_SIZE); if slot.codec_pt == codec_lib::PT_OPUS { match slot.transcoder.opus_plc(chunk) { Ok(mut pcm) => { pcm.resize(chunk, 0.0); slot.pcm_buffer.extend(pcm); } Err(_) => fade_concealment_from_last_frame(slot, chunk, 0.8), } } else { fade_concealment_from_last_frame(slot, chunk, 0.85); } remaining -= chunk; } } fn decode_packet_to_mix_pcm(slot: &mut MixerLegSlot, pkt: &RtpPacket) -> Option> { let (pcm, rate) = slot .transcoder .decode_to_f32(&pkt.payload, pkt.payload_type) .ok()?; let pcm_48k = if rate == MIX_RATE { pcm } else { slot.transcoder .resample_f32(&pcm, rate, MIX_RATE) .unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE]) }; let processed = if slot.codec_pt != codec_lib::PT_OPUS { TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k) } else { pcm_48k }; Some(processed) } fn queue_inbound_packet(slot: &mut MixerLegSlot, pkt: RtpPacket) { if let Some(pcm_48k) = decode_packet_to_mix_pcm(slot, &pkt) { if pcm_48k.is_empty() { return; } if let Some(expected_ts) = slot.expected_rtp_timestamp { let gap_ts = pkt.timestamp.wrapping_sub(expected_ts); if is_forward_rtp_delta(gap_ts) { let gap_samples = rtp_ts_to_mix_samples(slot.codec_pt, gap_ts); if gap_samples <= MAX_GAP_FILL_SAMPLES { append_packet_loss_concealment(slot, gap_samples); } else { slot.pcm_buffer.clear(); } } } let packet_ts = mix_samples_to_rtp_ts(slot.codec_pt, pcm_48k.len()); if packet_ts > 0 { slot.estimated_packet_ts = packet_ts; slot.expected_rtp_timestamp = Some(pkt.timestamp.wrapping_add(packet_ts)); } slot.pcm_buffer.extend(pcm_48k); } } fn fill_leg_playout_buffer(slot: &mut MixerLegSlot) { let mut steps = 0usize; while slot.pcm_buffer.len() < MIX_FRAME_SIZE && steps < MAX_PACKET_STEPS_PER_TICK { steps += 1; match slot.jitter.consume() { JitterResult::Packet(pkt) => queue_inbound_packet(slot, pkt), JitterResult::Missing => { let conceal_ts = slot .estimated_packet_ts .max(rtp_clock_increment(slot.codec_pt)); let conceal_samples = rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts).clamp(1, MAX_GAP_FILL_SAMPLES); append_packet_loss_concealment(slot, conceal_samples); if let Some(expected_ts) = slot.expected_rtp_timestamp { slot.expected_rtp_timestamp = Some(expected_ts.wrapping_add(conceal_ts)); } } JitterResult::Filling => break, } } } fn take_mix_frame(slot: &mut MixerLegSlot) -> Vec { let mut frame = Vec::with_capacity(MIX_FRAME_SIZE); while frame.len() < MIX_FRAME_SIZE { if let Some(sample) = slot.pcm_buffer.pop_front() { frame.push(sample); } else { frame.push(0.0); } } frame } fn soft_limit_sample(sample: f32) -> f32 { const KNEE: f32 = 0.85; let abs = sample.abs(); if abs <= KNEE { sample } else { let excess = abs - KNEE; let compressed = KNEE + (excess / (1.0 + (excess / (1.0 - KNEE)))); sample.signum() * compressed.min(1.0) } } fn try_send_leg_output( out_tx: &OutTx, call_id: &str, leg_id: &str, slot: &mut MixerLegSlot, rtp: Vec, stream: &str, ) { let reason = match slot.outbound_tx.try_send(rtp) { Ok(()) => return, Err(mpsc::error::TrySendError::Full(_)) => "full", Err(mpsc::error::TrySendError::Closed(_)) => "closed", }; slot.outbound_drops += 1; emit_output_drop_event( out_tx, call_id, Some(leg_id), None, stream, reason, slot.outbound_drops, ); } fn try_send_tool_output( out_tx: &OutTx, call_id: &str, tool_leg_id: &str, tool: &mut ToolLegSlot, batch: ToolAudioBatch, ) { let reason = match tool.audio_tx.try_send(batch) { Ok(()) => return, Err(mpsc::error::TrySendError::Full(_)) => "full", Err(mpsc::error::TrySendError::Closed(_)) => "closed", }; tool.dropped_batches += 1; emit_output_drop_event( out_tx, call_id, None, Some(tool_leg_id), "tool-batch", reason, tool.dropped_batches, ); } fn cancel_prompt_producer(state: &mut IsolationState) { if let Some(cancel_tx) = state.prompt_cancel_tx.take() { let _ = cancel_tx.send(true); } } fn cancel_isolated_interaction(state: &mut IsolationState) { cancel_prompt_producer(state); if let Some(tx) = state.result_tx.take() { let _ = tx.send(InteractionResult::Cancelled); } } fn drain_prompt_stream( out_tx: &OutTx, call_id: &str, leg_id: &str, state: &mut IsolationState, ) { loop { let Some(mut stream_rx) = state.prompt_stream_rx.take() else { return; }; match stream_rx.try_recv() { Ok(TtsStreamMessage::Frames(frames)) => { state.prompt_frames.extend(frames); state.prompt_stream_rx = Some(stream_rx); } Ok(TtsStreamMessage::Finished) => { state.prompt_stream_finished = true; return; } Ok(TtsStreamMessage::Failed(error)) => { emit_event( out_tx, "mixer_error", serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "error": format!("tts stream failed: {error}"), }), ); state.prompt_stream_finished = true; return; } Err(mpsc::error::TryRecvError::Empty) => { state.prompt_stream_rx = Some(stream_rx); return; } Err(mpsc::error::TryRecvError::Disconnected) => { state.prompt_stream_finished = true; return; } } } } /// Spawn the mixer task for a call. Returns the command sender and task handle. pub fn spawn_mixer(call_id: String, out_tx: OutTx) -> (mpsc::Sender, JoinHandle<()>) { let (cmd_tx, cmd_rx) = mpsc::channel::(32); let handle = tokio::spawn(async move { mixer_loop(call_id, cmd_rx, out_tx).await; }); (cmd_tx, handle) } /// The 20ms mixing loop. async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver, out_tx: OutTx) { let mut legs: HashMap = HashMap::new(); let mut tool_legs: HashMap = HashMap::new(); let mut interval = time::interval(Duration::from_millis(20)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await; // ── 1. Process control commands (non-blocking). ───────────── loop { match cmd_rx.try_recv() { Ok(MixerCommand::AddLeg { leg_id, codec_pt, inbound_rx, outbound_tx, }) => { let transcoder = match TranscodeState::new() { Ok(t) => t, Err(e) => { emit_event( &out_tx, "mixer_error", serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "error": format!("codec init: {e}"), }), ); continue; } }; legs.insert( leg_id, MixerLegSlot { codec_pt, transcoder, denoiser: new_denoiser(), inbound_rx, outbound_tx, pcm_buffer: VecDeque::new(), last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE], expected_rtp_timestamp: None, estimated_packet_ts: rtp_clock_increment(codec_pt), silent_ticks: 0, rtp_seq: 0, rtp_ts: 0, rtp_ssrc: rand::random(), outbound_drops: 0, role: LegRole::Participant, jitter: JitterBuffer::new(), }, ); } Ok(MixerCommand::RemoveLeg { leg_id }) => { // If the leg is isolated, send Cancelled before dropping. if let Some(slot) = legs.get_mut(&leg_id) { if let LegRole::Isolated(ref mut state) = slot.role { cancel_isolated_interaction(state); } } legs.remove(&leg_id); // Channels drop → I/O tasks exit cleanly. } Ok(MixerCommand::Shutdown) => { // Cancel all outstanding interactions before shutting down. for slot in legs.values_mut() { if let LegRole::Isolated(ref mut state) = slot.role { cancel_isolated_interaction(state); } } return; } Ok(MixerCommand::StartInteraction { leg_id, prompt_pcm_frames, prompt_stream_rx, prompt_cancel_tx, expected_digits, timeout_ms, result_tx, }) => { if let Some(slot) = legs.get_mut(&leg_id) { // Cancel any existing interaction first. if let LegRole::Isolated(ref mut old_state) = slot.role { cancel_isolated_interaction(old_state); } let timeout_ticks = timeout_ms / 20; slot.role = LegRole::Isolated(IsolationState { prompt_frames: VecDeque::from(prompt_pcm_frames), prompt_stream_rx, prompt_cancel_tx, prompt_stream_finished: false, expected_digits, timeout_ticks_remaining: timeout_ticks, prompt_done: false, result_tx: Some(result_tx), }); } else { // Leg not found — immediately cancel. if let Some(cancel_tx) = prompt_cancel_tx { let _ = cancel_tx.send(true); } let _ = result_tx.send(InteractionResult::Cancelled); } } Ok(MixerCommand::AddToolLeg { leg_id, tool_type, audio_tx, }) => { tool_legs.insert( leg_id, ToolLegSlot { tool_type, audio_tx, dropped_batches: 0, }, ); } Ok(MixerCommand::RemoveToolLeg { leg_id }) => { tool_legs.remove(&leg_id); // Dropping the ToolLegSlot drops audio_tx → background task sees channel close. } Err(mpsc::error::TryRecvError::Empty) => break, Err(mpsc::error::TryRecvError::Disconnected) => return, } } if legs.is_empty() && tool_legs.is_empty() { continue; } // ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ──── // DTMF (PT 101) packets are collected separately. // Audio packets are sorted by sequence number and decoded // in order to maintain codec state (critical for G.722 ADPCM). let leg_ids: Vec = legs.keys().cloned().collect(); let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new(); for lid in &leg_ids { let slot = legs.get_mut(lid).unwrap(); // Step 2a: Drain all pending packets into the jitter buffer. let mut got_audio = false; loop { match slot.inbound_rx.try_recv() { Ok(pkt) => { if pkt.payload_type == 101 { dtmf_forward.push((lid.clone(), pkt)); } else { got_audio = true; slot.jitter.push(pkt); } } Err(_) => break, } } // Step 2b: Decode enough RTP to cover one 20ms playout frame. // Variable-duration packets (10ms, 20ms, 60ms, ...) accumulate in // the per-leg PCM FIFO; we pop exactly one 20ms frame below. fill_leg_playout_buffer(slot); slot.last_pcm_frame = take_mix_frame(slot); // Run jitter adaptation + prune stale packets. slot.jitter.adapt(); slot.jitter.prune_stale(); // Silent ticks: based on actual network reception, not jitter buffer state. if got_audio || dtmf_forward.iter().any(|(src, _)| src == lid) { slot.silent_ticks = 0; } else { slot.silent_ticks += 1; } if slot.silent_ticks > 150 { slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; slot.pcm_buffer.clear(); slot.expected_rtp_timestamp = None; slot.estimated_packet_ts = rtp_clock_increment(slot.codec_pt); } } // ── 3. Compute total mix from PARTICIPANT legs only. ──────── // Accumulate as f64 to prevent precision loss when summing f32. let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE]; for slot in legs.values() { if matches!(slot.role, LegRole::Participant) { for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) { total_mix[i] += s as f64; } } } // ── 4. Per-leg output. ────────────────────────────────────── // Collect interaction completions to apply after the loop // (can't mutate role while iterating mutably for encode). let mut completed_interactions: Vec<(String, InteractionResult)> = Vec::new(); for (lid, slot) in legs.iter_mut() { match &mut slot.role { LegRole::Participant => { // Mix-minus: total minus this leg's own contribution. // Apply a light soft limiter instead of hard clipping the sum. let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE); for i in 0..MIX_FRAME_SIZE { let sample = (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32; mix_minus.push(soft_limit_sample(sample)); } // Resample from 48kHz to the leg's codec native rate. let target_rate = codec_sample_rate(slot.codec_pt); let resampled = if target_rate == MIX_RATE { mix_minus } else { slot.transcoder .resample_f32(&mix_minus, MIX_RATE, target_rate) .unwrap_or_default() }; // Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32). let encoded = match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) { Ok(e) if !e.is_empty() => e, _ => continue, }; // Build RTP packet with header. let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc); let mut rtp = header.to_vec(); rtp.extend_from_slice(&encoded); slot.rtp_seq = slot.rtp_seq.wrapping_add(1); slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); try_send_leg_output(&out_tx, &call_id, lid, slot, rtp, "participant-audio"); } LegRole::Isolated(state) => { drain_prompt_stream(&out_tx, &call_id, lid, state); // Check for DTMF digit from this leg. let mut matched_digit: Option = None; for (src_lid, dtmf_pkt) in &dtmf_forward { if src_lid == lid && dtmf_pkt.payload.len() >= 4 { let event_id = dtmf_pkt.payload[0]; let end_bit = (dtmf_pkt.payload[1] & 0x80) != 0; if end_bit { const EVENT_CHARS: &[char] = &[ '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#', 'A', 'B', 'C', 'D', ]; if let Some(&ch) = EVENT_CHARS.get(event_id as usize) { if state.expected_digits.contains(&ch) { matched_digit = Some(ch); break; } } } } } if let Some(digit) = matched_digit { // Interaction complete — digit matched. completed_interactions.push((lid.clone(), InteractionResult::Digit(digit))); } else { // Play prompt frame, wait for live TTS, or move to timeout once the // prompt stream has fully drained. let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() { frame } else if !state.prompt_stream_finished { vec![0.0f32; MIX_FRAME_SIZE] } else { state.prompt_done = true; vec![0.0f32; MIX_FRAME_SIZE] }; // Encode prompt frame to the leg's codec. let target_rate = codec_sample_rate(slot.codec_pt); let resampled = if target_rate == MIX_RATE { pcm_frame } else { slot.transcoder .resample_f32(&pcm_frame, MIX_RATE, target_rate) .unwrap_or_default() }; let mut prompt_rtp: Option> = None; if let Ok(encoded) = slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) { if !encoded.is_empty() { let header = build_rtp_header( slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc, ); let mut rtp = header.to_vec(); rtp.extend_from_slice(&encoded); slot.rtp_seq = slot.rtp_seq.wrapping_add(1); slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); prompt_rtp = Some(rtp); } } // Check timeout (only after prompt finishes). if state.prompt_done { if state.timeout_ticks_remaining == 0 { completed_interactions .push((lid.clone(), InteractionResult::Timeout)); } else { state.timeout_ticks_remaining -= 1; } } if let Some(rtp) = prompt_rtp { try_send_leg_output( &out_tx, &call_id, lid, slot, rtp, "isolated-prompt", ); } } } } } // Apply completed interactions — revert legs to Participant. for (lid, result) in completed_interactions { if let Some(slot) = legs.get_mut(&lid) { if let LegRole::Isolated(ref mut state) = slot.role { cancel_prompt_producer(state); if let Some(tx) = state.result_tx.take() { let _ = tx.send(result); } } slot.role = LegRole::Participant; } } // ── 5. Distribute per-source audio to tool legs. ──────────── if !tool_legs.is_empty() { // Collect participant PCM frames (computed in step 2). let sources: Vec = legs .iter() .filter(|(_, s)| matches!(s.role, LegRole::Participant)) .map(|(lid, s)| ToolAudioSource { leg_id: lid.clone(), pcm_48k: s.last_pcm_frame.clone(), }) .collect(); for (tool_leg_id, tool) in tool_legs.iter_mut() { let batch = ToolAudioBatch { sources: sources .iter() .map(|s| ToolAudioSource { leg_id: s.leg_id.clone(), pcm_48k: s.pcm_48k.clone(), }) .collect(), }; try_send_tool_output(&out_tx, &call_id, tool_leg_id, tool, batch); } } // ── 6. Forward DTMF packets between participant legs only. ── for (source_lid, dtmf_pkt) in &dtmf_forward { // Skip if the source is an isolated leg (its DTMF was handled in step 4). if let Some(src_slot) = legs.get(source_lid) { if matches!(src_slot.role, LegRole::Isolated(_)) { continue; } } for (target_lid, target_slot) in legs.iter_mut() { if target_lid == source_lid { continue; // Don't echo DTMF back to sender. } // Don't forward to isolated legs. if matches!(target_slot.role, LegRole::Isolated(_)) { continue; } let mut header = build_rtp_header( 101, target_slot.rtp_seq, target_slot.rtp_ts, target_slot.rtp_ssrc, ); if dtmf_pkt.marker { header[1] |= 0x80; // Set marker bit. } let mut rtp_out = header.to_vec(); rtp_out.extend_from_slice(&dtmf_pkt.payload); target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1); // Don't increment rtp_ts for DTMF — it shares timestamp context with audio. try_send_leg_output(&out_tx, &call_id, target_lid, target_slot, rtp_out, "dtmf"); } } } }