//! RTP port pool for media sockets. //! //! Manages a pool of even-numbered UDP ports for RTP media. `allocate()` //! hands back an `Arc` to the caller (stored on the owning //! `LegInfo`), while the pool itself keeps only a `Weak`. When //! the call terminates and `LegInfo` is dropped, the strong refcount //! reaches zero, the socket is closed, and `allocate()` prunes the dead //! weak ref the next time it scans that slot — so the port automatically //! becomes available for reuse without any explicit `release()` plumbing. //! //! This fixes the previous leak where the pool held `Arc` and //! `release()` was never called, eventually exhausting the port range and //! causing "503 Service Unavailable" on new calls. use std::collections::HashMap; use std::sync::{Arc, Weak}; use tokio::net::UdpSocket; /// A single RTP port allocation. pub struct RtpAllocation { pub port: u16, pub socket: Arc, } /// RTP port pool — allocates even-numbered UDP ports. pub struct RtpPortPool { min: u16, max: u16, allocated: HashMap>, } impl RtpPortPool { pub fn new(min: u16, max: u16) -> Self { let min = if min % 2 == 0 { min } else { min + 1 }; Self { min, max, allocated: HashMap::new(), } } /// Allocate an even-numbered port and bind a UDP socket. pub async fn allocate(&mut self) -> Option { let mut port = self.min; while port < self.max { // Prune a dead weak ref at this slot: if the last strong Arc // (held by the owning LegInfo) was dropped when the call ended, // the socket is already closed and the slot is free again. if let Some(weak) = self.allocated.get(&port) { if weak.strong_count() == 0 { self.allocated.remove(&port); } } if !self.allocated.contains_key(&port) { match UdpSocket::bind(format!("0.0.0.0:{port}")).await { Ok(sock) => { let sock = Arc::new(sock); self.allocated.insert(port, Arc::downgrade(&sock)); return Some(RtpAllocation { port, socket: sock }); } Err(_) => { // Port in use, try next. } } } port += 2; } None // Pool exhausted. } } /// Build an RTP header with the given parameters. pub fn build_rtp_header(pt: u8, seq: u16, timestamp: u32, ssrc: u32) -> [u8; 12] { let mut header = [0u8; 12]; header[0] = 0x80; // V=2 header[1] = pt & 0x7F; header[2..4].copy_from_slice(&seq.to_be_bytes()); header[4..8].copy_from_slice(×tamp.to_be_bytes()); header[8..12].copy_from_slice(&ssrc.to_be_bytes()); header } /// Get the RTP clock increment per 20ms frame for a payload type. pub fn rtp_clock_increment(pt: u8) -> u32 { match pt { 9 => 160, // G.722: 8000 Hz clock rate (despite 16kHz audio) × 0.02s 0 | 8 => 160, // PCMU/PCMA: 8000 × 0.02 111 => 960, // Opus: 48000 × 0.02 _ => 160, } }