//! RTP port pool and media forwarding. //! //! Manages a pool of even-numbered UDP ports for RTP media. //! Each port gets a bound tokio UdpSocket. Supports: //! - Direct forwarding (SIP-to-SIP, no transcoding) //! - Transcoding forwarding (via codec-lib, e.g. G.722 ↔ Opus) //! - Silence generation //! - NAT priming //! //! Ported from ts/call/rtp-port-pool.ts + sip-leg.ts RTP handling. use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::UdpSocket; /// A single RTP port allocation. pub struct RtpAllocation { pub port: u16, pub socket: Arc, } /// RTP port pool — allocates even-numbered UDP ports. pub struct RtpPortPool { min: u16, max: u16, allocated: HashMap>, } impl RtpPortPool { pub fn new(min: u16, max: u16) -> Self { let min = if min % 2 == 0 { min } else { min + 1 }; Self { min, max, allocated: HashMap::new(), } } /// Allocate an even-numbered port and bind a UDP socket. pub async fn allocate(&mut self) -> Option { let mut port = self.min; while port < self.max { if !self.allocated.contains_key(&port) { match UdpSocket::bind(format!("0.0.0.0:{port}")).await { Ok(sock) => { let sock = Arc::new(sock); self.allocated.insert(port, sock.clone()); return Some(RtpAllocation { port, socket: sock }); } Err(_) => { // Port in use, try next. } } } port += 2; } None // Pool exhausted. } /// Release a port back to the pool. pub fn release(&mut self, port: u16) { self.allocated.remove(&port); // Socket is dropped when the last Arc reference goes away. } pub fn size(&self) -> usize { self.allocated.len() } pub fn capacity(&self) -> usize { ((self.max - self.min) / 2) as usize } } /// An active RTP relay between two endpoints. /// Receives on `local_socket` and forwards to `remote_addr`. pub struct RtpRelay { pub local_port: u16, pub local_socket: Arc, pub remote_addr: Option, /// If set, transcode packets using this codec session before forwarding. pub transcode: Option, /// Packets received counter. pub pkt_received: u64, /// Packets sent counter. pub pkt_sent: u64, } pub struct TranscodeConfig { pub from_pt: u8, pub to_pt: u8, pub session_id: String, } impl RtpRelay { pub fn new(port: u16, socket: Arc) -> Self { Self { local_port: port, local_socket: socket, remote_addr: None, transcode: None, pkt_received: 0, pkt_sent: 0, } } pub fn set_remote(&mut self, addr: SocketAddr) { self.remote_addr = Some(addr); } } /// Send a 1-byte NAT priming packet to open a pinhole. pub async fn prime_nat(socket: &UdpSocket, remote: SocketAddr) { let _ = socket.send_to(&[0u8], remote).await; } /// Build an RTP silence frame for PCMU (payload type 0). pub fn silence_frame_pcmu() -> Vec { // 12-byte RTP header + 160 bytes of µ-law silence (0xFF) let mut frame = vec![0u8; 172]; frame[0] = 0x80; // V=2 frame[1] = 0; // PT=0 (PCMU) // seq, timestamp, ssrc left as 0 — caller should set these frame[12..].fill(0xFF); // µ-law silence frame } /// Build an RTP silence frame for G.722 (payload type 9). pub fn silence_frame_g722() -> Vec { // 12-byte RTP header + 160 bytes of G.722 silence let mut frame = vec![0u8; 172]; frame[0] = 0x80; // V=2 frame[1] = 9; // PT=9 (G.722) // G.722 silence: all zeros is valid silence frame } /// Build an RTP header with the given parameters. pub fn build_rtp_header(pt: u8, seq: u16, timestamp: u32, ssrc: u32) -> [u8; 12] { let mut header = [0u8; 12]; header[0] = 0x80; // V=2 header[1] = pt & 0x7F; header[2..4].copy_from_slice(&seq.to_be_bytes()); header[4..8].copy_from_slice(×tamp.to_be_bytes()); header[8..12].copy_from_slice(&ssrc.to_be_bytes()); header } /// Get the RTP clock increment per 20ms frame for a payload type. pub fn rtp_clock_increment(pt: u8) -> u32 { match pt { 9 => 160, // G.722: 8000 Hz clock rate (despite 16kHz audio) × 0.02s 0 | 8 => 160, // PCMU/PCMA: 8000 × 0.02 111 => 960, // Opus: 48000 × 0.02 _ => 160, } }