feat(proxy-engine,codec-lib): add adaptive RTP jitter buffering with Opus packet loss concealment and stable 20ms resampling
This commit is contained in:
@@ -1,5 +1,12 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## 2026-04-10 - 1.19.0 - feat(proxy-engine,codec-lib)
|
||||||
|
add adaptive RTP jitter buffering with Opus packet loss concealment and stable 20ms resampling
|
||||||
|
|
||||||
|
- introduces a per-leg adaptive jitter buffer in the mixer to reorder RTP packets, gate initial playout, and deliver one frame per 20ms tick
|
||||||
|
- adds Opus PLC support to synthesize missing audio frames when packets are lost, with fade-based fallback handling for non-Opus codecs
|
||||||
|
- updates i16 and f32 resamplers to use canonical 20ms chunks so cached resamplers preserve filter state and avoid variable-size cache thrashing
|
||||||
|
|
||||||
## 2026-04-10 - 1.18.0 - feat(readme)
|
## 2026-04-10 - 1.18.0 - feat(readme)
|
||||||
expand documentation for voicemail, IVR, audio engine, and API capabilities
|
expand documentation for voicemail, IVR, audio engine, and API capabilities
|
||||||
|
|
||||||
|
|||||||
@@ -142,8 +142,10 @@ impl TranscodeState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// High-quality sample rate conversion using rubato FFT resampler.
|
/// High-quality sample rate conversion using rubato FFT resampler.
|
||||||
/// Resamplers are cached by (from_rate, to_rate, chunk_size) and reused,
|
///
|
||||||
/// maintaining proper inter-frame state for continuous audio streams.
|
/// To maintain continuous filter state, the resampler always processes at a
|
||||||
|
/// canonical chunk size (20ms at the source rate). This prevents cache
|
||||||
|
/// thrashing from variable input sizes and preserves inter-frame filter state.
|
||||||
pub fn resample(
|
pub fn resample(
|
||||||
&mut self,
|
&mut self,
|
||||||
pcm: &[i16],
|
pcm: &[i16],
|
||||||
@@ -154,28 +156,61 @@ impl TranscodeState {
|
|||||||
return Ok(pcm.to_vec());
|
return Ok(pcm.to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk = pcm.len();
|
let canonical_chunk = (from_rate as usize) / 50; // 20ms
|
||||||
let key = (from_rate, to_rate, chunk);
|
let key = (from_rate, to_rate, canonical_chunk);
|
||||||
|
|
||||||
if !self.resamplers.contains_key(&key) {
|
if !self.resamplers.contains_key(&key) {
|
||||||
let r =
|
let r = FftFixedIn::<f64>::new(
|
||||||
FftFixedIn::<f64>::new(from_rate as usize, to_rate as usize, chunk, 1, 1)
|
from_rate as usize,
|
||||||
|
to_rate as usize,
|
||||||
|
canonical_chunk,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
)
|
||||||
.map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?;
|
.map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?;
|
||||||
self.resamplers.insert(key, r);
|
self.resamplers.insert(key, r);
|
||||||
}
|
}
|
||||||
let resampler = self.resamplers.get_mut(&key).unwrap();
|
let resampler = self.resamplers.get_mut(&key).unwrap();
|
||||||
|
|
||||||
let float_in: Vec<f64> = pcm.iter().map(|&s| s as f64 / 32768.0).collect();
|
let mut output = Vec::with_capacity(
|
||||||
let input = vec![float_in];
|
(pcm.len() as f64 * to_rate as f64 / from_rate as f64).ceil() as usize + 16,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut offset = 0;
|
||||||
|
while offset < pcm.len() {
|
||||||
|
let remaining = pcm.len() - offset;
|
||||||
|
let copy_len = remaining.min(canonical_chunk);
|
||||||
|
let mut chunk = vec![0.0f64; canonical_chunk];
|
||||||
|
for i in 0..copy_len {
|
||||||
|
chunk[i] = pcm[offset + i] as f64 / 32768.0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let input = vec![chunk];
|
||||||
let result = resampler
|
let result = resampler
|
||||||
.process(&input, None)
|
.process(&input, None)
|
||||||
.map_err(|e| format!("resample {from_rate}->{to_rate}: {e}"))?;
|
.map_err(|e| format!("resample {from_rate}->{to_rate}: {e}"))?;
|
||||||
|
|
||||||
Ok(result[0]
|
if remaining < canonical_chunk {
|
||||||
|
let expected =
|
||||||
|
(copy_len as f64 * to_rate as f64 / from_rate as f64).round() as usize;
|
||||||
|
let take = expected.min(result[0].len());
|
||||||
|
output.extend(
|
||||||
|
result[0][..take]
|
||||||
.iter()
|
.iter()
|
||||||
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
|
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16),
|
||||||
.collect())
|
);
|
||||||
|
} else {
|
||||||
|
output.extend(
|
||||||
|
result[0]
|
||||||
|
.iter()
|
||||||
|
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += canonical_chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Apply RNNoise ML noise suppression to 48kHz PCM audio.
|
/// Apply RNNoise ML noise suppression to 48kHz PCM audio.
|
||||||
@@ -329,6 +364,21 @@ impl TranscodeState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Opus packet loss concealment — synthesize one frame to fill a gap.
|
||||||
|
/// Returns f32 PCM at 48kHz. `frame_size` should be 960 for 20ms.
|
||||||
|
pub fn opus_plc(&mut self, frame_size: usize) -> Result<Vec<f32>, String> {
|
||||||
|
let mut pcm = vec![0.0f32; frame_size];
|
||||||
|
let out = MutSignals::try_from(&mut pcm[..])
|
||||||
|
.map_err(|e| format!("opus plc signals: {e}"))?;
|
||||||
|
let n: usize = self
|
||||||
|
.opus_dec
|
||||||
|
.decode_float(None::<OpusPacket<'_>>, out, false)
|
||||||
|
.map_err(|e| format!("opus plc: {e}"))?
|
||||||
|
.into();
|
||||||
|
pcm.truncate(n);
|
||||||
|
Ok(pcm)
|
||||||
|
}
|
||||||
|
|
||||||
/// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec.
|
/// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec.
|
||||||
///
|
///
|
||||||
/// For Opus, uses native float encode (no i16 quantization).
|
/// For Opus, uses native float encode (no i16 quantization).
|
||||||
@@ -357,7 +407,10 @@ impl TranscodeState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// High-quality sample rate conversion for f32 PCM using rubato FFT resampler.
|
/// High-quality sample rate conversion for f32 PCM using rubato FFT resampler.
|
||||||
/// Uses a separate cache from the i16 resampler.
|
///
|
||||||
|
/// To maintain continuous filter state, the resampler always processes at a
|
||||||
|
/// canonical chunk size (20ms at the source rate). This prevents cache
|
||||||
|
/// thrashing from variable input sizes and preserves inter-frame filter state.
|
||||||
pub fn resample_f32(
|
pub fn resample_f32(
|
||||||
&mut self,
|
&mut self,
|
||||||
pcm: &[f32],
|
pcm: &[f32],
|
||||||
@@ -368,23 +421,50 @@ impl TranscodeState {
|
|||||||
return Ok(pcm.to_vec());
|
return Ok(pcm.to_vec());
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunk = pcm.len();
|
let canonical_chunk = (from_rate as usize) / 50; // 20ms
|
||||||
let key = (from_rate, to_rate, chunk);
|
let key = (from_rate, to_rate, canonical_chunk);
|
||||||
|
|
||||||
if !self.resamplers_f32.contains_key(&key) {
|
if !self.resamplers_f32.contains_key(&key) {
|
||||||
let r =
|
let r = FftFixedIn::<f32>::new(
|
||||||
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, chunk, 1, 1)
|
from_rate as usize,
|
||||||
|
to_rate as usize,
|
||||||
|
canonical_chunk,
|
||||||
|
1,
|
||||||
|
1,
|
||||||
|
)
|
||||||
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
|
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
|
||||||
self.resamplers_f32.insert(key, r);
|
self.resamplers_f32.insert(key, r);
|
||||||
}
|
}
|
||||||
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
|
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
|
||||||
|
|
||||||
let input = vec![pcm.to_vec()];
|
let mut output = Vec::with_capacity(
|
||||||
|
(pcm.len() as f64 * to_rate as f64 / from_rate as f64).ceil() as usize + 16,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut offset = 0;
|
||||||
|
while offset < pcm.len() {
|
||||||
|
let remaining = pcm.len() - offset;
|
||||||
|
let mut chunk = vec![0.0f32; canonical_chunk];
|
||||||
|
let copy_len = remaining.min(canonical_chunk);
|
||||||
|
chunk[..copy_len].copy_from_slice(&pcm[offset..offset + copy_len]);
|
||||||
|
|
||||||
|
let input = vec![chunk];
|
||||||
let result = resampler
|
let result = resampler
|
||||||
.process(&input, None)
|
.process(&input, None)
|
||||||
.map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?;
|
.map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?;
|
||||||
|
|
||||||
Ok(result[0].clone())
|
if remaining < canonical_chunk {
|
||||||
|
let expected =
|
||||||
|
(copy_len as f64 * to_rate as f64 / from_rate as f64).round() as usize;
|
||||||
|
output.extend_from_slice(&result[0][..expected.min(result[0].len())]);
|
||||||
|
} else {
|
||||||
|
output.extend_from_slice(&result[0]);
|
||||||
|
}
|
||||||
|
|
||||||
|
offset += canonical_chunk;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(output)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Apply RNNoise ML noise suppression to 48kHz f32 PCM audio.
|
/// Apply RNNoise ML noise suppression to 48kHz f32 PCM audio.
|
||||||
|
|||||||
188
rust/crates/proxy-engine/src/jitter_buffer.rs
Normal file
188
rust/crates/proxy-engine/src/jitter_buffer.rs
Normal file
@@ -0,0 +1,188 @@
|
|||||||
|
//! Per-leg adaptive jitter buffer for the audio mixer.
|
||||||
|
//!
|
||||||
|
//! Sits between inbound RTP packet reception and the mixer's decode step.
|
||||||
|
//! Reorders packets by sequence number and delivers exactly one frame per
|
||||||
|
//! 20ms mixer tick, smoothing out network jitter. When a packet is missing,
|
||||||
|
//! the mixer can invoke codec PLC to conceal the gap.
|
||||||
|
|
||||||
|
use crate::mixer::RtpPacket;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
/// Per-leg jitter buffer. Collects RTP packets keyed by sequence number,
|
||||||
|
/// delivers one frame per 20ms tick in sequence order.
|
||||||
|
///
|
||||||
|
/// Adaptive target depth: starts at 3 frames (60ms), adjusts between
|
||||||
|
/// 2–6 frames based on observed jitter.
|
||||||
|
pub struct JitterBuffer {
|
||||||
|
/// Packets waiting for playout, keyed by seq number.
|
||||||
|
buffer: BTreeMap<u16, RtpPacket>,
|
||||||
|
/// Next expected sequence number for playout.
|
||||||
|
next_seq: Option<u16>,
|
||||||
|
/// Target buffer depth in frames (adaptive).
|
||||||
|
target_depth: u32,
|
||||||
|
/// Current fill level high-water mark (for adaptation).
|
||||||
|
max_fill_seen: u32,
|
||||||
|
/// Ticks since last adaptation adjustment.
|
||||||
|
adapt_counter: u32,
|
||||||
|
/// Consecutive ticks where buffer was empty (for ramp-up).
|
||||||
|
empty_streak: u32,
|
||||||
|
/// Consecutive ticks where buffer had excess (for ramp-down).
|
||||||
|
excess_streak: u32,
|
||||||
|
/// Whether we've started playout (initial fill complete).
|
||||||
|
playing: bool,
|
||||||
|
/// Number of frames consumed since start (for stats).
|
||||||
|
frames_consumed: u64,
|
||||||
|
/// Number of frames lost (gap in sequence).
|
||||||
|
frames_lost: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// What the mixer gets back each tick.
|
||||||
|
pub enum JitterResult {
|
||||||
|
/// A packet is available for decoding.
|
||||||
|
Packet(RtpPacket),
|
||||||
|
/// Packet was expected but missing — invoke PLC.
|
||||||
|
Missing,
|
||||||
|
/// Buffer is in initial fill phase — output silence.
|
||||||
|
Filling,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JitterBuffer {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
buffer: BTreeMap::new(),
|
||||||
|
next_seq: None,
|
||||||
|
target_depth: 3, // 60ms initial target
|
||||||
|
max_fill_seen: 0,
|
||||||
|
adapt_counter: 0,
|
||||||
|
empty_streak: 0,
|
||||||
|
excess_streak: 0,
|
||||||
|
playing: false,
|
||||||
|
frames_consumed: 0,
|
||||||
|
frames_lost: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push a received RTP packet into the buffer.
|
||||||
|
pub fn push(&mut self, pkt: RtpPacket) {
|
||||||
|
// Ignore duplicates.
|
||||||
|
if self.buffer.contains_key(&pkt.seq) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Detect large forward seq jump (hold/resume, SSRC change).
|
||||||
|
if let Some(next) = self.next_seq {
|
||||||
|
let jump = pkt.seq.wrapping_sub(next);
|
||||||
|
if jump > 1000 && jump < 0x8000 {
|
||||||
|
// Massive forward jump — reset buffer.
|
||||||
|
self.reset();
|
||||||
|
self.next_seq = Some(pkt.seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.next_seq.is_none() {
|
||||||
|
self.next_seq = Some(pkt.seq);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.buffer.insert(pkt.seq, pkt);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consume one frame for the current 20ms tick.
|
||||||
|
/// Called once per mixer tick per leg.
|
||||||
|
pub fn consume(&mut self) -> JitterResult {
|
||||||
|
// Track fill level for adaptation.
|
||||||
|
let fill = self.buffer.len() as u32;
|
||||||
|
if fill > self.max_fill_seen {
|
||||||
|
self.max_fill_seen = fill;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initial fill phase: wait until we have target_depth packets.
|
||||||
|
if !self.playing {
|
||||||
|
if fill >= self.target_depth {
|
||||||
|
self.playing = true;
|
||||||
|
} else {
|
||||||
|
return JitterResult::Filling;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let seq = match self.next_seq {
|
||||||
|
Some(s) => s,
|
||||||
|
None => return JitterResult::Filling,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Advance next_seq (wrapping u16).
|
||||||
|
self.next_seq = Some(seq.wrapping_add(1));
|
||||||
|
|
||||||
|
// Try to pull the expected sequence number.
|
||||||
|
if let Some(pkt) = self.buffer.remove(&seq) {
|
||||||
|
self.frames_consumed += 1;
|
||||||
|
self.empty_streak = 0;
|
||||||
|
|
||||||
|
// Adaptive: if buffer is consistently deep, we can tighten.
|
||||||
|
if fill > self.target_depth + 2 {
|
||||||
|
self.excess_streak += 1;
|
||||||
|
} else {
|
||||||
|
self.excess_streak = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
JitterResult::Packet(pkt)
|
||||||
|
} else {
|
||||||
|
// Packet missing — PLC needed.
|
||||||
|
self.frames_lost += 1;
|
||||||
|
self.empty_streak += 1;
|
||||||
|
self.excess_streak = 0;
|
||||||
|
|
||||||
|
JitterResult::Missing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Run adaptation logic. Call every tick; internally gates to ~1s intervals.
|
||||||
|
pub fn adapt(&mut self) {
|
||||||
|
self.adapt_counter += 1;
|
||||||
|
if self.adapt_counter < 50 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.adapt_counter = 0;
|
||||||
|
|
||||||
|
// If we had many empty ticks, increase depth.
|
||||||
|
if self.empty_streak > 3 && self.target_depth < 6 {
|
||||||
|
self.target_depth += 1;
|
||||||
|
}
|
||||||
|
// If buffer consistently overfull, decrease depth.
|
||||||
|
else if self.excess_streak > 25 && self.target_depth > 2 {
|
||||||
|
self.target_depth -= 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.max_fill_seen = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Discard packets that are too old (seq far behind next_seq).
|
||||||
|
/// Prevents unbounded memory growth from reordered/late packets.
|
||||||
|
pub fn prune_stale(&mut self) {
|
||||||
|
if let Some(next) = self.next_seq {
|
||||||
|
// Remove anything more than 100 frames behind playout point.
|
||||||
|
// Use wrapping arithmetic: if (next - seq) > 100, it's stale.
|
||||||
|
let stale: Vec<u16> = self
|
||||||
|
.buffer
|
||||||
|
.keys()
|
||||||
|
.filter(|&&seq| {
|
||||||
|
let age = next.wrapping_sub(seq);
|
||||||
|
age > 100 && age < 0x8000 // < 0x8000 means it's actually behind, not ahead
|
||||||
|
})
|
||||||
|
.copied()
|
||||||
|
.collect();
|
||||||
|
for seq in stale {
|
||||||
|
self.buffer.remove(&seq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reset the buffer (e.g., after re-INVITE / hold-resume).
|
||||||
|
pub fn reset(&mut self) {
|
||||||
|
self.buffer.clear();
|
||||||
|
self.next_seq = None;
|
||||||
|
self.playing = false;
|
||||||
|
self.empty_streak = 0;
|
||||||
|
self.excess_streak = 0;
|
||||||
|
self.adapt_counter = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -12,6 +12,7 @@ mod call_manager;
|
|||||||
mod config;
|
mod config;
|
||||||
mod dtmf;
|
mod dtmf;
|
||||||
mod ipc;
|
mod ipc;
|
||||||
|
mod jitter_buffer;
|
||||||
mod leg_io;
|
mod leg_io;
|
||||||
mod mixer;
|
mod mixer;
|
||||||
mod provider;
|
mod provider;
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
//! 6. Forward DTMF between participant legs only
|
//! 6. Forward DTMF between participant legs only
|
||||||
|
|
||||||
use crate::ipc::{emit_event, OutTx};
|
use crate::ipc::{emit_event, OutTx};
|
||||||
|
use crate::jitter_buffer::{JitterBuffer, JitterResult};
|
||||||
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
||||||
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
|
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
|
||||||
use nnnoiseless::DenoiseState;
|
use nnnoiseless::DenoiseState;
|
||||||
@@ -164,6 +165,8 @@ struct MixerLegSlot {
|
|||||||
last_pcm_frame: Vec<f32>,
|
last_pcm_frame: Vec<f32>,
|
||||||
/// Number of consecutive ticks with no inbound packet.
|
/// Number of consecutive ticks with no inbound packet.
|
||||||
silent_ticks: u32,
|
silent_ticks: u32,
|
||||||
|
/// Per-leg jitter buffer for packet reordering and timing.
|
||||||
|
jitter: JitterBuffer,
|
||||||
// RTP output state.
|
// RTP output state.
|
||||||
rtp_seq: u16,
|
rtp_seq: u16,
|
||||||
rtp_ts: u32,
|
rtp_ts: u32,
|
||||||
@@ -238,6 +241,7 @@ async fn mixer_loop(
|
|||||||
rtp_ts: 0,
|
rtp_ts: 0,
|
||||||
rtp_ssrc: rand::random(),
|
rtp_ssrc: rand::random(),
|
||||||
role: LegRole::Participant,
|
role: LegRole::Participant,
|
||||||
|
jitter: JitterBuffer::new(),
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -331,35 +335,27 @@ async fn mixer_loop(
|
|||||||
for lid in &leg_ids {
|
for lid in &leg_ids {
|
||||||
let slot = legs.get_mut(lid).unwrap();
|
let slot = legs.get_mut(lid).unwrap();
|
||||||
|
|
||||||
// Drain channel — collect DTMF separately, collect ALL audio packets.
|
// Step 2a: Drain all pending packets into the jitter buffer.
|
||||||
let mut audio_packets: Vec<RtpPacket> = Vec::new();
|
let mut got_audio = false;
|
||||||
loop {
|
loop {
|
||||||
match slot.inbound_rx.try_recv() {
|
match slot.inbound_rx.try_recv() {
|
||||||
Ok(pkt) => {
|
Ok(pkt) => {
|
||||||
if pkt.payload_type == 101 {
|
if pkt.payload_type == 101 {
|
||||||
// DTMF telephone-event: collect for processing.
|
|
||||||
dtmf_forward.push((lid.clone(), pkt));
|
dtmf_forward.push((lid.clone(), pkt));
|
||||||
} else {
|
} else {
|
||||||
audio_packets.push(pkt);
|
got_audio = true;
|
||||||
|
slot.jitter.push(pkt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !audio_packets.is_empty() {
|
// Step 2b: Consume exactly one frame from the jitter buffer.
|
||||||
slot.silent_ticks = 0;
|
match slot.jitter.consume() {
|
||||||
|
JitterResult::Packet(pkt) => {
|
||||||
// Sort by sequence number for correct codec state progression.
|
|
||||||
// This prevents G.722 ADPCM state corruption from out-of-order packets.
|
|
||||||
audio_packets.sort_by_key(|p| p.seq);
|
|
||||||
|
|
||||||
// Decode ALL packets in order (maintains codec state),
|
|
||||||
// but only keep the last decoded frame for mixing.
|
|
||||||
for pkt in &audio_packets {
|
|
||||||
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
|
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
|
||||||
Ok((pcm, rate)) => {
|
Ok((pcm, rate)) => {
|
||||||
// Resample to 48kHz mixing rate if needed.
|
|
||||||
let pcm_48k = if rate == MIX_RATE {
|
let pcm_48k = if rate == MIX_RATE {
|
||||||
pcm
|
pcm
|
||||||
} else {
|
} else {
|
||||||
@@ -367,15 +363,11 @@ async fn mixer_loop(
|
|||||||
.resample_f32(&pcm, rate, MIX_RATE)
|
.resample_f32(&pcm, rate, MIX_RATE)
|
||||||
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
|
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
|
||||||
};
|
};
|
||||||
// Per-leg inbound denoising at 48kHz.
|
|
||||||
// Only for SIP telephony legs — WebRTC browsers
|
|
||||||
// already apply noise suppression via getUserMedia.
|
|
||||||
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
|
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
|
||||||
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
|
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
|
||||||
} else {
|
} else {
|
||||||
pcm_48k
|
pcm_48k
|
||||||
};
|
};
|
||||||
// Pad or truncate to exactly MIX_FRAME_SIZE.
|
|
||||||
let mut frame = processed;
|
let mut frame = processed;
|
||||||
frame.resize(MIX_FRAME_SIZE, 0.0);
|
frame.resize(MIX_FRAME_SIZE, 0.0);
|
||||||
slot.last_pcm_frame = frame;
|
slot.last_pcm_frame = frame;
|
||||||
@@ -383,17 +375,45 @@ async fn mixer_loop(
|
|||||||
Err(_) => {}
|
Err(_) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
|
JitterResult::Missing => {
|
||||||
// Got DTMF but no audio — don't bump silent_ticks (DTMF counts as activity).
|
// Invoke Opus PLC or fade for non-Opus codecs.
|
||||||
|
if slot.codec_pt == codec_lib::PT_OPUS {
|
||||||
|
match slot.transcoder.opus_plc(MIX_FRAME_SIZE) {
|
||||||
|
Ok(pcm) => {
|
||||||
|
slot.last_pcm_frame = pcm;
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
for s in slot.last_pcm_frame.iter_mut() {
|
||||||
|
*s *= 0.8;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Non-Opus: fade last frame toward silence.
|
||||||
|
for s in slot.last_pcm_frame.iter_mut() {
|
||||||
|
*s *= 0.85;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
JitterResult::Filling => {
|
||||||
|
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run jitter adaptation + prune stale packets.
|
||||||
|
slot.jitter.adapt();
|
||||||
|
slot.jitter.prune_stale();
|
||||||
|
|
||||||
|
// Silent ticks: based on actual network reception, not jitter buffer state.
|
||||||
|
if got_audio || dtmf_forward.iter().any(|(src, _)| src == lid) {
|
||||||
slot.silent_ticks = 0;
|
slot.silent_ticks = 0;
|
||||||
} else {
|
} else {
|
||||||
slot.silent_ticks += 1;
|
slot.silent_ticks += 1;
|
||||||
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
|
}
|
||||||
if slot.silent_ticks > 150 {
|
if slot.silent_ticks > 150 {
|
||||||
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
|
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
|
||||||
// Accumulate as f64 to prevent precision loss when summing f32.
|
// Accumulate as f64 to prevent precision loss when summing f32.
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: 'siprouter',
|
name: 'siprouter',
|
||||||
version: '1.18.0',
|
version: '1.19.0',
|
||||||
description: 'undefined'
|
description: 'undefined'
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,6 @@
|
|||||||
*/
|
*/
|
||||||
export const commitinfo = {
|
export const commitinfo = {
|
||||||
name: 'siprouter',
|
name: 'siprouter',
|
||||||
version: '1.18.0',
|
version: '1.19.0',
|
||||||
description: 'undefined'
|
description: 'undefined'
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user