feat(proxy-engine): upgrade the internal audio bus to 48kHz f32 with per-leg denoising and improve SIP leg routing
This commit is contained in:
@@ -3,9 +3,12 @@
|
||||
//! Each Call spawns one mixer task. Legs communicate with the mixer via
|
||||
//! tokio mpsc channels — no shared mutable state, no lock contention.
|
||||
//!
|
||||
//! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame).
|
||||
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
|
||||
//!
|
||||
//! The mixer runs a 20ms tick loop:
|
||||
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
|
||||
//! 2. Compute total mix (sum of all **participant** legs' PCM as i32)
|
||||
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
|
||||
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
|
||||
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
|
||||
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
|
||||
//! 5. For each tool leg: send per-source unmerged audio batch
|
||||
@@ -13,16 +16,18 @@
|
||||
|
||||
use crate::ipc::{emit_event, OutTx};
|
||||
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
||||
use codec_lib::{codec_sample_rate, TranscodeState};
|
||||
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
|
||||
use nnnoiseless::DenoiseState;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{self, Duration, MissedTickBehavior};
|
||||
|
||||
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
|
||||
const MIX_RATE: u32 = 16000;
|
||||
/// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample.
|
||||
/// All processing (denoising, mixing) happens at this rate in f32 for maximum quality.
|
||||
const MIX_RATE: u32 = 48000;
|
||||
/// Samples per 20ms frame at the mixing rate.
|
||||
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
|
||||
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
|
||||
|
||||
/// A raw RTP payload received from a leg (no RTP header).
|
||||
pub struct RtpPacket {
|
||||
@@ -47,8 +52,8 @@ enum LegRole {
|
||||
}
|
||||
|
||||
struct IsolationState {
|
||||
/// PCM frames at MIX_RATE (320 samples each) queued for playback.
|
||||
prompt_frames: VecDeque<Vec<i16>>,
|
||||
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
|
||||
prompt_frames: VecDeque<Vec<f32>>,
|
||||
/// Digits that complete the interaction (e.g., ['1', '2']).
|
||||
expected_digits: Vec<char>,
|
||||
/// Ticks remaining before timeout (decremented each tick after prompt ends).
|
||||
@@ -88,8 +93,8 @@ pub struct ToolAudioBatch {
|
||||
/// One participant's 20ms audio frame.
|
||||
pub struct ToolAudioSource {
|
||||
pub leg_id: String,
|
||||
/// PCM at 16kHz, MIX_FRAME_SIZE (320) samples.
|
||||
pub pcm_16k: Vec<i16>,
|
||||
/// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples.
|
||||
pub pcm_48k: Vec<f32>,
|
||||
}
|
||||
|
||||
/// Internal storage for a tool leg inside the mixer.
|
||||
@@ -122,8 +127,8 @@ pub enum MixerCommand {
|
||||
/// DTMF from the leg is checked against expected_digits.
|
||||
StartInteraction {
|
||||
leg_id: String,
|
||||
/// PCM frames at MIX_RATE (16kHz), each 320 samples.
|
||||
prompt_pcm_frames: Vec<Vec<i16>>,
|
||||
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
|
||||
prompt_pcm_frames: Vec<Vec<f32>>,
|
||||
expected_digits: Vec<char>,
|
||||
timeout_ms: u32,
|
||||
result_tx: oneshot::Sender<InteractionResult>,
|
||||
@@ -149,10 +154,12 @@ pub enum MixerCommand {
|
||||
struct MixerLegSlot {
|
||||
codec_pt: u8,
|
||||
transcoder: TranscodeState,
|
||||
/// Per-leg inbound denoiser (48kHz, 480-sample frames).
|
||||
denoiser: Box<DenoiseState<'static>>,
|
||||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
|
||||
last_pcm_frame: Vec<i16>,
|
||||
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
|
||||
last_pcm_frame: Vec<f32>,
|
||||
/// Number of consecutive ticks with no inbound packet.
|
||||
silent_ticks: u32,
|
||||
// RTP output state.
|
||||
@@ -220,9 +227,10 @@ async fn mixer_loop(
|
||||
MixerLegSlot {
|
||||
codec_pt,
|
||||
transcoder,
|
||||
denoiser: new_denoiser(),
|
||||
inbound_rx,
|
||||
outbound_tx,
|
||||
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
|
||||
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
|
||||
silent_ticks: 0,
|
||||
rtp_seq: 0,
|
||||
rtp_ts: 0,
|
||||
@@ -337,24 +345,26 @@ async fn mixer_loop(
|
||||
|
||||
if let Some(pkt) = latest_audio {
|
||||
slot.silent_ticks = 0;
|
||||
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
|
||||
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
|
||||
Ok((pcm, rate)) => {
|
||||
// Resample to mixing rate if needed.
|
||||
let pcm_mix = if rate == MIX_RATE {
|
||||
// Resample to 48kHz mixing rate if needed.
|
||||
let pcm_48k = if rate == MIX_RATE {
|
||||
pcm
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&pcm, rate, MIX_RATE)
|
||||
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
|
||||
.resample_f32(&pcm, rate, MIX_RATE)
|
||||
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
|
||||
};
|
||||
// Per-leg inbound denoising at 48kHz.
|
||||
let denoised = TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k);
|
||||
// Pad or truncate to exactly MIX_FRAME_SIZE.
|
||||
let mut frame = pcm_mix;
|
||||
frame.resize(MIX_FRAME_SIZE, 0);
|
||||
let mut frame = denoised;
|
||||
frame.resize(MIX_FRAME_SIZE, 0.0);
|
||||
slot.last_pcm_frame = frame;
|
||||
}
|
||||
Err(_) => {
|
||||
// Decode failed — use silence.
|
||||
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
|
||||
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||||
}
|
||||
}
|
||||
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
|
||||
@@ -364,17 +374,18 @@ async fn mixer_loop(
|
||||
slot.silent_ticks += 1;
|
||||
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
|
||||
if slot.silent_ticks > 150 {
|
||||
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
|
||||
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
|
||||
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
|
||||
// Accumulate as f64 to prevent precision loss when summing f32.
|
||||
let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE];
|
||||
for slot in legs.values() {
|
||||
if matches!(slot.role, LegRole::Participant) {
|
||||
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
|
||||
total_mix[i] += s as i32;
|
||||
total_mix[i] += s as f64;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -387,27 +398,27 @@ async fn mixer_loop(
|
||||
for (lid, slot) in legs.iter_mut() {
|
||||
match &mut slot.role {
|
||||
LegRole::Participant => {
|
||||
// Mix-minus: total minus this leg's own contribution.
|
||||
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
|
||||
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
|
||||
for i in 0..MIX_FRAME_SIZE {
|
||||
let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32)
|
||||
.clamp(-32768, 32767) as i16;
|
||||
mix_minus.push(sample);
|
||||
let sample =
|
||||
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
|
||||
mix_minus.push(sample.clamp(-1.0, 1.0));
|
||||
}
|
||||
|
||||
// Resample from 16kHz to the leg's codec native rate.
|
||||
// Resample from 48kHz to the leg's codec native rate.
|
||||
let target_rate = codec_sample_rate(slot.codec_pt);
|
||||
let resampled = if target_rate == MIX_RATE {
|
||||
mix_minus
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&mix_minus, MIX_RATE, target_rate)
|
||||
.resample_f32(&mix_minus, MIX_RATE, target_rate)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
// Encode to the leg's codec.
|
||||
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
|
||||
let encoded =
|
||||
match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
|
||||
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
|
||||
Ok(e) if !e.is_empty() => e,
|
||||
_ => continue,
|
||||
};
|
||||
@@ -456,21 +467,21 @@ async fn mixer_loop(
|
||||
frame
|
||||
} else {
|
||||
state.prompt_done = true;
|
||||
vec![0i16; MIX_FRAME_SIZE]
|
||||
vec![0.0f32; MIX_FRAME_SIZE]
|
||||
};
|
||||
|
||||
// Encode prompt frame to the leg's codec (reuses existing encode path).
|
||||
// Encode prompt frame to the leg's codec.
|
||||
let target_rate = codec_sample_rate(slot.codec_pt);
|
||||
let resampled = if target_rate == MIX_RATE {
|
||||
pcm_frame
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&pcm_frame, MIX_RATE, target_rate)
|
||||
.resample_f32(&pcm_frame, MIX_RATE, target_rate)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
if let Ok(encoded) =
|
||||
slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt)
|
||||
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
|
||||
{
|
||||
if !encoded.is_empty() {
|
||||
let header = build_rtp_header(
|
||||
@@ -523,7 +534,7 @@ async fn mixer_loop(
|
||||
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
|
||||
.map(|(lid, s)| ToolAudioSource {
|
||||
leg_id: lid.clone(),
|
||||
pcm_16k: s.last_pcm_frame.clone(),
|
||||
pcm_48k: s.last_pcm_frame.clone(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -533,7 +544,7 @@ async fn mixer_loop(
|
||||
.iter()
|
||||
.map(|s| ToolAudioSource {
|
||||
leg_id: s.leg_id.clone(),
|
||||
pcm_16k: s.pcm_16k.clone(),
|
||||
pcm_48k: s.pcm_48k.clone(),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user