Files
siprouter/rust/crates/proxy-engine/src/rtp.rs

92 lines
3.2 KiB
Rust
Raw Normal View History

//! RTP port pool for media sockets.
//!
//! Manages a pool of even-numbered UDP ports for RTP media. `allocate()`
//! hands back an `Arc<UdpSocket>` to the caller (stored on the owning
//! `LegInfo`), while the pool itself keeps only a `Weak<UdpSocket>`. When
//! the call terminates and `LegInfo` is dropped, the strong refcount
//! reaches zero, the socket is closed, and `allocate()` prunes the dead
//! weak ref the next time it scans that slot — so the port automatically
//! becomes available for reuse without any explicit `release()` plumbing.
//!
//! This fixes the previous leak where the pool held `Arc<UdpSocket>` and
//! `release()` was never called, eventually exhausting the port range and
//! causing "503 Service Unavailable" on new calls.
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use tokio::net::UdpSocket;
/// A single RTP port allocation.
pub struct RtpAllocation {
pub port: u16,
pub socket: Arc<UdpSocket>,
}
/// RTP port pool — allocates even-numbered UDP ports.
pub struct RtpPortPool {
min: u16,
max: u16,
allocated: HashMap<u16, Weak<UdpSocket>>,
}
impl RtpPortPool {
pub fn new(min: u16, max: u16) -> Self {
let min = if min % 2 == 0 { min } else { min + 1 };
Self {
min,
max,
allocated: HashMap::new(),
}
}
/// Allocate an even-numbered port and bind a UDP socket.
pub async fn allocate(&mut self) -> Option<RtpAllocation> {
let mut port = self.min;
while port < self.max {
// Prune a dead weak ref at this slot: if the last strong Arc
// (held by the owning LegInfo) was dropped when the call ended,
// the socket is already closed and the slot is free again.
if let Some(weak) = self.allocated.get(&port) {
if weak.strong_count() == 0 {
self.allocated.remove(&port);
}
}
if !self.allocated.contains_key(&port) {
match UdpSocket::bind(format!("0.0.0.0:{port}")).await {
Ok(sock) => {
let sock = Arc::new(sock);
self.allocated.insert(port, Arc::downgrade(&sock));
return Some(RtpAllocation { port, socket: sock });
}
Err(_) => {
// Port in use, try next.
}
}
}
port += 2;
}
None // Pool exhausted.
}
}
/// Build an RTP header with the given parameters.
pub fn build_rtp_header(pt: u8, seq: u16, timestamp: u32, ssrc: u32) -> [u8; 12] {
let mut header = [0u8; 12];
header[0] = 0x80; // V=2
header[1] = pt & 0x7F;
header[2..4].copy_from_slice(&seq.to_be_bytes());
header[4..8].copy_from_slice(&timestamp.to_be_bytes());
header[8..12].copy_from_slice(&ssrc.to_be_bytes());
header
}
/// Get the RTP clock increment per 20ms frame for a payload type.
pub fn rtp_clock_increment(pt: u8) -> u32 {
match pt {
9 => 160, // G.722: 8000 Hz clock rate (despite 16kHz audio) × 0.02s
0 | 8 => 160, // PCMU/PCMA: 8000 × 0.02
111 => 960, // Opus: 48000 × 0.02
_ => 160,
}
}