Files
siprouter/rust/crates/proxy-engine/src/leg_io.rs

112 lines
4.0 KiB
Rust

//! Leg I/O task spawners.
//!
//! Each SIP leg gets two tasks:
//! - Inbound: recv_from on RTP socket → strip header → send RtpPacket to mixer channel
//! - Outbound: recv encoded RTP from mixer channel → send_to remote media endpoint
//!
//! WebRTC leg I/O is handled inside webrtc_engine.rs (on_track + track.write).
use crate::mixer::RtpPacket;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::mpsc;
/// Channel pair for connecting a leg to the mixer.
pub struct LegChannels {
/// Mixer receives decoded packets from this leg.
pub inbound_tx: mpsc::Sender<RtpPacket>,
pub inbound_rx: mpsc::Receiver<RtpPacket>,
/// Mixer sends encoded RTP to this leg.
pub outbound_tx: mpsc::Sender<Vec<u8>>,
pub outbound_rx: mpsc::Receiver<Vec<u8>>,
}
/// Create a channel pair for a leg.
pub fn create_leg_channels() -> LegChannels {
let (inbound_tx, inbound_rx) = mpsc::channel::<RtpPacket>(64);
let (outbound_tx, outbound_rx) = mpsc::channel::<Vec<u8>>(8);
LegChannels {
inbound_tx,
inbound_rx,
outbound_tx,
outbound_rx,
}
}
/// Spawn the inbound I/O task for a SIP leg.
/// Reads RTP from the socket, parses the variable-length header (RFC 3550),
/// and sends the payload to the mixer.
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
pub fn spawn_sip_inbound(
rtp_socket: Arc<UdpSocket>,
inbound_tx: mpsc::Sender<RtpPacket>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut buf = vec![0u8; 1500];
loop {
match rtp_socket.recv_from(&mut buf).await {
Ok((n, _from)) => {
if n < 12 {
continue; // Too small for RTP header.
}
let pt = buf[1] & 0x7F;
let marker = (buf[1] & 0x80) != 0;
let seq = u16::from_be_bytes([buf[2], buf[3]]);
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
// RFC 3550: header length = 12 + (CC * 4) + optional extension.
let cc = (buf[0] & 0x0F) as usize;
let has_extension = (buf[0] & 0x10) != 0;
let mut offset = 12 + cc * 4;
if has_extension {
if offset + 4 > n {
continue; // Malformed: extension header truncated.
}
let ext_len =
u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_len * 4;
}
if offset >= n {
continue; // No payload after header.
}
let payload = buf[offset..n].to_vec();
if payload.is_empty() {
continue;
}
if inbound_tx
.send(RtpPacket {
payload,
payload_type: pt,
marker,
seq,
timestamp,
})
.await
.is_err()
{
break; // Channel closed — leg removed.
}
}
Err(_) => break, // Socket error.
}
}
})
}
/// Spawn the outbound I/O task for a SIP leg.
/// Reads encoded RTP packets from the mixer and sends them to the remote media endpoint.
/// Returns the JoinHandle (exits when the outbound_rx channel is closed).
pub fn spawn_sip_outbound(
rtp_socket: Arc<UdpSocket>,
remote_media: SocketAddr,
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(rtp_data) = outbound_rx.recv().await {
let _ = rtp_socket.send_to(&rtp_data, remote_media).await;
}
})
}