189 lines
6.0 KiB
Rust
189 lines
6.0 KiB
Rust
//! Per-leg adaptive jitter buffer for the audio mixer.
|
||
//!
|
||
//! Sits between inbound RTP packet reception and the mixer's decode step.
|
||
//! Reorders packets by sequence number and delivers exactly one frame per
|
||
//! 20ms mixer tick, smoothing out network jitter. When a packet is missing,
|
||
//! the mixer can invoke codec PLC to conceal the gap.
|
||
|
||
use crate::mixer::RtpPacket;
|
||
use std::collections::BTreeMap;
|
||
|
||
/// Per-leg jitter buffer. Collects RTP packets keyed by sequence number,
|
||
/// delivers one frame per 20ms tick in sequence order.
|
||
///
|
||
/// Adaptive target depth: starts at 3 frames (60ms), adjusts between
|
||
/// 2–6 frames based on observed jitter.
|
||
pub struct JitterBuffer {
|
||
/// Packets waiting for playout, keyed by seq number.
|
||
buffer: BTreeMap<u16, RtpPacket>,
|
||
/// Next expected sequence number for playout.
|
||
next_seq: Option<u16>,
|
||
/// Target buffer depth in frames (adaptive).
|
||
target_depth: u32,
|
||
/// Current fill level high-water mark (for adaptation).
|
||
max_fill_seen: u32,
|
||
/// Ticks since last adaptation adjustment.
|
||
adapt_counter: u32,
|
||
/// Consecutive ticks where buffer was empty (for ramp-up).
|
||
empty_streak: u32,
|
||
/// Consecutive ticks where buffer had excess (for ramp-down).
|
||
excess_streak: u32,
|
||
/// Whether we've started playout (initial fill complete).
|
||
playing: bool,
|
||
/// Number of frames consumed since start (for stats).
|
||
frames_consumed: u64,
|
||
/// Number of frames lost (gap in sequence).
|
||
frames_lost: u64,
|
||
}
|
||
|
||
/// What the mixer gets back each tick.
|
||
pub enum JitterResult {
|
||
/// A packet is available for decoding.
|
||
Packet(RtpPacket),
|
||
/// Packet was expected but missing — invoke PLC.
|
||
Missing,
|
||
/// Buffer is in initial fill phase — output silence.
|
||
Filling,
|
||
}
|
||
|
||
impl JitterBuffer {
|
||
pub fn new() -> Self {
|
||
Self {
|
||
buffer: BTreeMap::new(),
|
||
next_seq: None,
|
||
target_depth: 3, // 60ms initial target
|
||
max_fill_seen: 0,
|
||
adapt_counter: 0,
|
||
empty_streak: 0,
|
||
excess_streak: 0,
|
||
playing: false,
|
||
frames_consumed: 0,
|
||
frames_lost: 0,
|
||
}
|
||
}
|
||
|
||
/// Push a received RTP packet into the buffer.
|
||
pub fn push(&mut self, pkt: RtpPacket) {
|
||
// Ignore duplicates.
|
||
if self.buffer.contains_key(&pkt.seq) {
|
||
return;
|
||
}
|
||
|
||
// Detect large forward seq jump (hold/resume, SSRC change).
|
||
if let Some(next) = self.next_seq {
|
||
let jump = pkt.seq.wrapping_sub(next);
|
||
if jump > 1000 && jump < 0x8000 {
|
||
// Massive forward jump — reset buffer.
|
||
self.reset();
|
||
self.next_seq = Some(pkt.seq);
|
||
}
|
||
}
|
||
|
||
if self.next_seq.is_none() {
|
||
self.next_seq = Some(pkt.seq);
|
||
}
|
||
|
||
self.buffer.insert(pkt.seq, pkt);
|
||
}
|
||
|
||
/// Consume one frame for the current 20ms tick.
|
||
/// Called once per mixer tick per leg.
|
||
pub fn consume(&mut self) -> JitterResult {
|
||
// Track fill level for adaptation.
|
||
let fill = self.buffer.len() as u32;
|
||
if fill > self.max_fill_seen {
|
||
self.max_fill_seen = fill;
|
||
}
|
||
|
||
// Initial fill phase: wait until we have target_depth packets.
|
||
if !self.playing {
|
||
if fill >= self.target_depth {
|
||
self.playing = true;
|
||
} else {
|
||
return JitterResult::Filling;
|
||
}
|
||
}
|
||
|
||
let seq = match self.next_seq {
|
||
Some(s) => s,
|
||
None => return JitterResult::Filling,
|
||
};
|
||
|
||
// Advance next_seq (wrapping u16).
|
||
self.next_seq = Some(seq.wrapping_add(1));
|
||
|
||
// Try to pull the expected sequence number.
|
||
if let Some(pkt) = self.buffer.remove(&seq) {
|
||
self.frames_consumed += 1;
|
||
self.empty_streak = 0;
|
||
|
||
// Adaptive: if buffer is consistently deep, we can tighten.
|
||
if fill > self.target_depth + 2 {
|
||
self.excess_streak += 1;
|
||
} else {
|
||
self.excess_streak = 0;
|
||
}
|
||
|
||
JitterResult::Packet(pkt)
|
||
} else {
|
||
// Packet missing — PLC needed.
|
||
self.frames_lost += 1;
|
||
self.empty_streak += 1;
|
||
self.excess_streak = 0;
|
||
|
||
JitterResult::Missing
|
||
}
|
||
}
|
||
|
||
/// Run adaptation logic. Call every tick; internally gates to ~1s intervals.
|
||
pub fn adapt(&mut self) {
|
||
self.adapt_counter += 1;
|
||
if self.adapt_counter < 50 {
|
||
return;
|
||
}
|
||
self.adapt_counter = 0;
|
||
|
||
// If we had many empty ticks, increase depth.
|
||
if self.empty_streak > 3 && self.target_depth < 6 {
|
||
self.target_depth += 1;
|
||
}
|
||
// If buffer consistently overfull, decrease depth.
|
||
else if self.excess_streak > 25 && self.target_depth > 2 {
|
||
self.target_depth -= 1;
|
||
}
|
||
|
||
self.max_fill_seen = 0;
|
||
}
|
||
|
||
/// Discard packets that are too old (seq far behind next_seq).
|
||
/// Prevents unbounded memory growth from reordered/late packets.
|
||
pub fn prune_stale(&mut self) {
|
||
if let Some(next) = self.next_seq {
|
||
// Remove anything more than 100 frames behind playout point.
|
||
// Use wrapping arithmetic: if (next - seq) > 100, it's stale.
|
||
let stale: Vec<u16> = self
|
||
.buffer
|
||
.keys()
|
||
.filter(|&&seq| {
|
||
let age = next.wrapping_sub(seq);
|
||
age > 100 && age < 0x8000 // < 0x8000 means it's actually behind, not ahead
|
||
})
|
||
.copied()
|
||
.collect();
|
||
for seq in stale {
|
||
self.buffer.remove(&seq);
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Reset the buffer (e.g., after re-INVITE / hold-resume).
|
||
pub fn reset(&mut self) {
|
||
self.buffer.clear();
|
||
self.next_seq = None;
|
||
self.playing = false;
|
||
self.empty_streak = 0;
|
||
self.excess_streak = 0;
|
||
self.adapt_counter = 0;
|
||
}
|
||
}
|