diff --git a/changelog.md b/changelog.md index 092a854..c20e031 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,12 @@ # Changelog +## 2026-04-10 - 1.19.0 - feat(proxy-engine,codec-lib) +add adaptive RTP jitter buffering with Opus packet loss concealment and stable 20ms resampling + +- introduces a per-leg adaptive jitter buffer in the mixer to reorder RTP packets, gate initial playout, and deliver one frame per 20ms tick +- adds Opus PLC support to synthesize missing audio frames when packets are lost, with fade-based fallback handling for non-Opus codecs +- updates i16 and f32 resamplers to use canonical 20ms chunks so cached resamplers preserve filter state and avoid variable-size cache thrashing + ## 2026-04-10 - 1.18.0 - feat(readme) expand documentation for voicemail, IVR, audio engine, and API capabilities diff --git a/rust/crates/codec-lib/src/lib.rs b/rust/crates/codec-lib/src/lib.rs index f18ba82..30e7c67 100644 --- a/rust/crates/codec-lib/src/lib.rs +++ b/rust/crates/codec-lib/src/lib.rs @@ -142,8 +142,10 @@ impl TranscodeState { } /// High-quality sample rate conversion using rubato FFT resampler. - /// Resamplers are cached by (from_rate, to_rate, chunk_size) and reused, - /// maintaining proper inter-frame state for continuous audio streams. + /// + /// To maintain continuous filter state, the resampler always processes at a + /// canonical chunk size (20ms at the source rate). This prevents cache + /// thrashing from variable input sizes and preserves inter-frame filter state. pub fn resample( &mut self, pcm: &[i16], @@ -154,28 +156,61 @@ impl TranscodeState { return Ok(pcm.to_vec()); } - let chunk = pcm.len(); - let key = (from_rate, to_rate, chunk); + let canonical_chunk = (from_rate as usize) / 50; // 20ms + let key = (from_rate, to_rate, canonical_chunk); if !self.resamplers.contains_key(&key) { - let r = - FftFixedIn::::new(from_rate as usize, to_rate as usize, chunk, 1, 1) - .map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?; + let r = FftFixedIn::::new( + from_rate as usize, + to_rate as usize, + canonical_chunk, + 1, + 1, + ) + .map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?; self.resamplers.insert(key, r); } let resampler = self.resamplers.get_mut(&key).unwrap(); - let float_in: Vec = pcm.iter().map(|&s| s as f64 / 32768.0).collect(); - let input = vec![float_in]; + let mut output = Vec::with_capacity( + (pcm.len() as f64 * to_rate as f64 / from_rate as f64).ceil() as usize + 16, + ); - let result = resampler - .process(&input, None) - .map_err(|e| format!("resample {from_rate}->{to_rate}: {e}"))?; + let mut offset = 0; + while offset < pcm.len() { + let remaining = pcm.len() - offset; + let copy_len = remaining.min(canonical_chunk); + let mut chunk = vec![0.0f64; canonical_chunk]; + for i in 0..copy_len { + chunk[i] = pcm[offset + i] as f64 / 32768.0; + } - Ok(result[0] - .iter() - .map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16) - .collect()) + let input = vec![chunk]; + let result = resampler + .process(&input, None) + .map_err(|e| format!("resample {from_rate}->{to_rate}: {e}"))?; + + if remaining < canonical_chunk { + let expected = + (copy_len as f64 * to_rate as f64 / from_rate as f64).round() as usize; + let take = expected.min(result[0].len()); + output.extend( + result[0][..take] + .iter() + .map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16), + ); + } else { + output.extend( + result[0] + .iter() + .map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16), + ); + } + + offset += canonical_chunk; + } + + Ok(output) } /// Apply RNNoise ML noise suppression to 48kHz PCM audio. @@ -329,6 +364,21 @@ impl TranscodeState { } } + /// Opus packet loss concealment — synthesize one frame to fill a gap. + /// Returns f32 PCM at 48kHz. `frame_size` should be 960 for 20ms. + pub fn opus_plc(&mut self, frame_size: usize) -> Result, String> { + let mut pcm = vec![0.0f32; frame_size]; + let out = MutSignals::try_from(&mut pcm[..]) + .map_err(|e| format!("opus plc signals: {e}"))?; + let n: usize = self + .opus_dec + .decode_float(None::>, out, false) + .map_err(|e| format!("opus plc: {e}"))? + .into(); + pcm.truncate(n); + Ok(pcm) + } + /// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec. /// /// For Opus, uses native float encode (no i16 quantization). @@ -357,7 +407,10 @@ impl TranscodeState { } /// High-quality sample rate conversion for f32 PCM using rubato FFT resampler. - /// Uses a separate cache from the i16 resampler. + /// + /// To maintain continuous filter state, the resampler always processes at a + /// canonical chunk size (20ms at the source rate). This prevents cache + /// thrashing from variable input sizes and preserves inter-frame filter state. pub fn resample_f32( &mut self, pcm: &[f32], @@ -368,23 +421,50 @@ impl TranscodeState { return Ok(pcm.to_vec()); } - let chunk = pcm.len(); - let key = (from_rate, to_rate, chunk); + let canonical_chunk = (from_rate as usize) / 50; // 20ms + let key = (from_rate, to_rate, canonical_chunk); if !self.resamplers_f32.contains_key(&key) { - let r = - FftFixedIn::::new(from_rate as usize, to_rate as usize, chunk, 1, 1) - .map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?; + let r = FftFixedIn::::new( + from_rate as usize, + to_rate as usize, + canonical_chunk, + 1, + 1, + ) + .map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?; self.resamplers_f32.insert(key, r); } let resampler = self.resamplers_f32.get_mut(&key).unwrap(); - let input = vec![pcm.to_vec()]; - let result = resampler - .process(&input, None) - .map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?; + let mut output = Vec::with_capacity( + (pcm.len() as f64 * to_rate as f64 / from_rate as f64).ceil() as usize + 16, + ); - Ok(result[0].clone()) + let mut offset = 0; + while offset < pcm.len() { + let remaining = pcm.len() - offset; + let mut chunk = vec![0.0f32; canonical_chunk]; + let copy_len = remaining.min(canonical_chunk); + chunk[..copy_len].copy_from_slice(&pcm[offset..offset + copy_len]); + + let input = vec![chunk]; + let result = resampler + .process(&input, None) + .map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?; + + if remaining < canonical_chunk { + let expected = + (copy_len as f64 * to_rate as f64 / from_rate as f64).round() as usize; + output.extend_from_slice(&result[0][..expected.min(result[0].len())]); + } else { + output.extend_from_slice(&result[0]); + } + + offset += canonical_chunk; + } + + Ok(output) } /// Apply RNNoise ML noise suppression to 48kHz f32 PCM audio. diff --git a/rust/crates/proxy-engine/src/jitter_buffer.rs b/rust/crates/proxy-engine/src/jitter_buffer.rs new file mode 100644 index 0000000..3923952 --- /dev/null +++ b/rust/crates/proxy-engine/src/jitter_buffer.rs @@ -0,0 +1,188 @@ +//! Per-leg adaptive jitter buffer for the audio mixer. +//! +//! Sits between inbound RTP packet reception and the mixer's decode step. +//! Reorders packets by sequence number and delivers exactly one frame per +//! 20ms mixer tick, smoothing out network jitter. When a packet is missing, +//! the mixer can invoke codec PLC to conceal the gap. + +use crate::mixer::RtpPacket; +use std::collections::BTreeMap; + +/// Per-leg jitter buffer. Collects RTP packets keyed by sequence number, +/// delivers one frame per 20ms tick in sequence order. +/// +/// Adaptive target depth: starts at 3 frames (60ms), adjusts between +/// 2–6 frames based on observed jitter. +pub struct JitterBuffer { + /// Packets waiting for playout, keyed by seq number. + buffer: BTreeMap, + /// Next expected sequence number for playout. + next_seq: Option, + /// Target buffer depth in frames (adaptive). + target_depth: u32, + /// Current fill level high-water mark (for adaptation). + max_fill_seen: u32, + /// Ticks since last adaptation adjustment. + adapt_counter: u32, + /// Consecutive ticks where buffer was empty (for ramp-up). + empty_streak: u32, + /// Consecutive ticks where buffer had excess (for ramp-down). + excess_streak: u32, + /// Whether we've started playout (initial fill complete). + playing: bool, + /// Number of frames consumed since start (for stats). + frames_consumed: u64, + /// Number of frames lost (gap in sequence). + frames_lost: u64, +} + +/// What the mixer gets back each tick. +pub enum JitterResult { + /// A packet is available for decoding. + Packet(RtpPacket), + /// Packet was expected but missing — invoke PLC. + Missing, + /// Buffer is in initial fill phase — output silence. + Filling, +} + +impl JitterBuffer { + pub fn new() -> Self { + Self { + buffer: BTreeMap::new(), + next_seq: None, + target_depth: 3, // 60ms initial target + max_fill_seen: 0, + adapt_counter: 0, + empty_streak: 0, + excess_streak: 0, + playing: false, + frames_consumed: 0, + frames_lost: 0, + } + } + + /// Push a received RTP packet into the buffer. + pub fn push(&mut self, pkt: RtpPacket) { + // Ignore duplicates. + if self.buffer.contains_key(&pkt.seq) { + return; + } + + // Detect large forward seq jump (hold/resume, SSRC change). + if let Some(next) = self.next_seq { + let jump = pkt.seq.wrapping_sub(next); + if jump > 1000 && jump < 0x8000 { + // Massive forward jump — reset buffer. + self.reset(); + self.next_seq = Some(pkt.seq); + } + } + + if self.next_seq.is_none() { + self.next_seq = Some(pkt.seq); + } + + self.buffer.insert(pkt.seq, pkt); + } + + /// Consume one frame for the current 20ms tick. + /// Called once per mixer tick per leg. + pub fn consume(&mut self) -> JitterResult { + // Track fill level for adaptation. + let fill = self.buffer.len() as u32; + if fill > self.max_fill_seen { + self.max_fill_seen = fill; + } + + // Initial fill phase: wait until we have target_depth packets. + if !self.playing { + if fill >= self.target_depth { + self.playing = true; + } else { + return JitterResult::Filling; + } + } + + let seq = match self.next_seq { + Some(s) => s, + None => return JitterResult::Filling, + }; + + // Advance next_seq (wrapping u16). + self.next_seq = Some(seq.wrapping_add(1)); + + // Try to pull the expected sequence number. + if let Some(pkt) = self.buffer.remove(&seq) { + self.frames_consumed += 1; + self.empty_streak = 0; + + // Adaptive: if buffer is consistently deep, we can tighten. + if fill > self.target_depth + 2 { + self.excess_streak += 1; + } else { + self.excess_streak = 0; + } + + JitterResult::Packet(pkt) + } else { + // Packet missing — PLC needed. + self.frames_lost += 1; + self.empty_streak += 1; + self.excess_streak = 0; + + JitterResult::Missing + } + } + + /// Run adaptation logic. Call every tick; internally gates to ~1s intervals. + pub fn adapt(&mut self) { + self.adapt_counter += 1; + if self.adapt_counter < 50 { + return; + } + self.adapt_counter = 0; + + // If we had many empty ticks, increase depth. + if self.empty_streak > 3 && self.target_depth < 6 { + self.target_depth += 1; + } + // If buffer consistently overfull, decrease depth. + else if self.excess_streak > 25 && self.target_depth > 2 { + self.target_depth -= 1; + } + + self.max_fill_seen = 0; + } + + /// Discard packets that are too old (seq far behind next_seq). + /// Prevents unbounded memory growth from reordered/late packets. + pub fn prune_stale(&mut self) { + if let Some(next) = self.next_seq { + // Remove anything more than 100 frames behind playout point. + // Use wrapping arithmetic: if (next - seq) > 100, it's stale. + let stale: Vec = self + .buffer + .keys() + .filter(|&&seq| { + let age = next.wrapping_sub(seq); + age > 100 && age < 0x8000 // < 0x8000 means it's actually behind, not ahead + }) + .copied() + .collect(); + for seq in stale { + self.buffer.remove(&seq); + } + } + } + + /// Reset the buffer (e.g., after re-INVITE / hold-resume). + pub fn reset(&mut self) { + self.buffer.clear(); + self.next_seq = None; + self.playing = false; + self.empty_streak = 0; + self.excess_streak = 0; + self.adapt_counter = 0; + } +} diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index baec084..b5fc4dc 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -12,6 +12,7 @@ mod call_manager; mod config; mod dtmf; mod ipc; +mod jitter_buffer; mod leg_io; mod mixer; mod provider; diff --git a/rust/crates/proxy-engine/src/mixer.rs b/rust/crates/proxy-engine/src/mixer.rs index 4d57dbb..725e212 100644 --- a/rust/crates/proxy-engine/src/mixer.rs +++ b/rust/crates/proxy-engine/src/mixer.rs @@ -15,6 +15,7 @@ //! 6. Forward DTMF between participant legs only use crate::ipc::{emit_event, OutTx}; +use crate::jitter_buffer::{JitterBuffer, JitterResult}; use crate::rtp::{build_rtp_header, rtp_clock_increment}; use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState}; use nnnoiseless::DenoiseState; @@ -164,6 +165,8 @@ struct MixerLegSlot { last_pcm_frame: Vec, /// Number of consecutive ticks with no inbound packet. silent_ticks: u32, + /// Per-leg jitter buffer for packet reordering and timing. + jitter: JitterBuffer, // RTP output state. rtp_seq: u16, rtp_ts: u32, @@ -238,6 +241,7 @@ async fn mixer_loop( rtp_ts: 0, rtp_ssrc: rand::random(), role: LegRole::Participant, + jitter: JitterBuffer::new(), }, ); } @@ -331,35 +335,27 @@ async fn mixer_loop( for lid in &leg_ids { let slot = legs.get_mut(lid).unwrap(); - // Drain channel — collect DTMF separately, collect ALL audio packets. - let mut audio_packets: Vec = Vec::new(); + // Step 2a: Drain all pending packets into the jitter buffer. + let mut got_audio = false; loop { match slot.inbound_rx.try_recv() { Ok(pkt) => { if pkt.payload_type == 101 { - // DTMF telephone-event: collect for processing. dtmf_forward.push((lid.clone(), pkt)); } else { - audio_packets.push(pkt); + got_audio = true; + slot.jitter.push(pkt); } } Err(_) => break, } } - if !audio_packets.is_empty() { - slot.silent_ticks = 0; - - // Sort by sequence number for correct codec state progression. - // This prevents G.722 ADPCM state corruption from out-of-order packets. - audio_packets.sort_by_key(|p| p.seq); - - // Decode ALL packets in order (maintains codec state), - // but only keep the last decoded frame for mixing. - for pkt in &audio_packets { + // Step 2b: Consume exactly one frame from the jitter buffer. + match slot.jitter.consume() { + JitterResult::Packet(pkt) => { match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) { Ok((pcm, rate)) => { - // Resample to 48kHz mixing rate if needed. let pcm_48k = if rate == MIX_RATE { pcm } else { @@ -367,15 +363,11 @@ async fn mixer_loop( .resample_f32(&pcm, rate, MIX_RATE) .unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE]) }; - // Per-leg inbound denoising at 48kHz. - // Only for SIP telephony legs — WebRTC browsers - // already apply noise suppression via getUserMedia. let processed = if slot.codec_pt != codec_lib::PT_OPUS { TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k) } else { pcm_48k }; - // Pad or truncate to exactly MIX_FRAME_SIZE. let mut frame = processed; frame.resize(MIX_FRAME_SIZE, 0.0); slot.last_pcm_frame = frame; @@ -383,15 +375,43 @@ async fn mixer_loop( Err(_) => {} } } - } else if dtmf_forward.iter().any(|(src, _)| src == lid) { - // Got DTMF but no audio — don't bump silent_ticks (DTMF counts as activity). + JitterResult::Missing => { + // Invoke Opus PLC or fade for non-Opus codecs. + if slot.codec_pt == codec_lib::PT_OPUS { + match slot.transcoder.opus_plc(MIX_FRAME_SIZE) { + Ok(pcm) => { + slot.last_pcm_frame = pcm; + } + Err(_) => { + for s in slot.last_pcm_frame.iter_mut() { + *s *= 0.8; + } + } + } + } else { + // Non-Opus: fade last frame toward silence. + for s in slot.last_pcm_frame.iter_mut() { + *s *= 0.85; + } + } + } + JitterResult::Filling => { + slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; + } + } + + // Run jitter adaptation + prune stale packets. + slot.jitter.adapt(); + slot.jitter.prune_stale(); + + // Silent ticks: based on actual network reception, not jitter buffer state. + if got_audio || dtmf_forward.iter().any(|(src, _)| src == lid) { slot.silent_ticks = 0; } else { slot.silent_ticks += 1; - // After 150 ticks (3 seconds) of silence, zero out to avoid stale audio. - if slot.silent_ticks > 150 { - slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; - } + } + if slot.silent_ticks > 150 { + slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; } } diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 3c59e1c..88c60d5 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.18.0', + version: '1.19.0', description: 'undefined' } diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 3c59e1c..88c60d5 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.18.0', + version: '1.19.0', description: 'undefined' }