Files
siprouter/rust/crates/proxy-engine/src/sip_leg.rs

476 lines
15 KiB
Rust

//! SipLeg — manages one side of a B2BUA call.
//!
//! Handles the full INVITE lifecycle:
//! - Send INVITE with SDP
//! - Handle 407 Proxy Authentication (digest auth retry)
//! - Handle 200 OK (ACK, learn media endpoint)
//! - Handle BYE/CANCEL (teardown)
//! - Track SIP dialog state (early → confirmed → terminated)
//!
//! Ported from ts/call/sip-leg.ts.
use sip_proto::dialog::{DialogState, SipDialog};
use sip_proto::helpers::{
build_sdp, compute_digest_auth, generate_branch, generate_tag, parse_digest_challenge,
parse_sdp_endpoint, SdpOptions,
};
use sip_proto::message::{RequestOptions, SipMessage};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
/// State of a SIP leg.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LegState {
Inviting,
Ringing,
Connected,
Terminating,
Terminated,
}
/// Configuration for creating a SIP leg.
pub struct SipLegConfig {
/// Proxy LAN IP (for Via, Contact, SDP).
pub lan_ip: String,
/// Proxy LAN port.
pub lan_port: u16,
/// Public IP (for provider-facing legs).
pub public_ip: Option<String>,
/// SIP target endpoint (provider outbound proxy or device address).
pub sip_target: SocketAddr,
/// Provider credentials (for 407 auth).
pub username: Option<String>,
pub password: Option<String>,
pub registered_aor: Option<String>,
/// Codec payload types to offer.
pub codecs: Vec<u8>,
/// Our RTP port for SDP.
pub rtp_port: u16,
}
/// A SIP leg with full dialog management.
pub struct SipLeg {
pub id: String,
pub state: LegState,
pub config: SipLegConfig,
pub dialog: Option<SipDialog>,
/// The INVITE we sent (needed for CANCEL and 407 ACK).
invite: Option<SipMessage>,
/// Original unauthenticated INVITE (for re-ACKing retransmitted 407s).
orig_invite: Option<SipMessage>,
/// Whether we've attempted digest auth.
auth_attempted: bool,
/// Remote media endpoint (learned from SDP in 200 OK).
pub remote_media: Option<SocketAddr>,
}
impl SipLeg {
pub fn new(id: String, config: SipLegConfig) -> Self {
Self {
id,
state: LegState::Inviting,
config,
dialog: None,
invite: None,
orig_invite: None,
auth_attempted: false,
remote_media: None,
}
}
/// Build and send an INVITE to establish this leg.
pub async fn send_invite(
&mut self,
from_uri: &str,
to_uri: &str,
sip_call_id: &str,
socket: &UdpSocket,
) {
let ip = self
.config
.public_ip
.as_deref()
.unwrap_or(&self.config.lan_ip);
let sdp = build_sdp(&SdpOptions {
ip,
port: self.config.rtp_port,
payload_types: &self.config.codecs,
..Default::default()
});
let invite = SipMessage::create_request(
"INVITE",
to_uri,
RequestOptions {
via_host: ip.to_string(),
via_port: self.config.lan_port,
via_transport: None,
via_branch: Some(generate_branch()),
from_uri: from_uri.to_string(),
from_display_name: None,
from_tag: Some(generate_tag()),
to_uri: to_uri.to_string(),
to_display_name: None,
to_tag: None,
call_id: Some(sip_call_id.to_string()),
cseq: Some(1),
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
},
);
self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port));
self.invite = Some(invite.clone());
self.state = LegState::Inviting;
let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await;
}
/// Handle an incoming SIP message routed to this leg.
/// Returns an optional reply to send (e.g. ACK, auth retry INVITE).
pub fn handle_message(&mut self, msg: &SipMessage) -> SipLegAction {
if msg.is_response() {
self.handle_response(msg)
} else {
self.handle_request(msg)
}
}
fn handle_response(&mut self, msg: &SipMessage) -> SipLegAction {
let code = msg.status_code().unwrap_or(0);
let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase();
if cseq_method != "INVITE" {
return SipLegAction::None;
}
// Handle retransmitted 407 for the original unauthenticated INVITE.
if self.auth_attempted {
if let Some(dialog) = &self.dialog {
let response_cseq: u32 = msg
.get_header("CSeq")
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
if response_cseq < dialog.local_cseq && code >= 400 {
// ACK the retransmitted error response.
if let Some(orig) = &self.orig_invite {
let ack = build_non_2xx_ack(orig, msg);
return SipLegAction::Send(ack.serialize());
}
return SipLegAction::None;
}
}
}
// Handle 407 Proxy Authentication Required.
if code == 407 {
return self.handle_auth_challenge(msg);
}
// Update dialog state.
if let Some(dialog) = &mut self.dialog {
dialog.process_response(msg);
}
if code == 180 || code == 183 {
self.state = LegState::Ringing;
SipLegAction::StateChange(LegState::Ringing)
} else if code >= 200 && code < 300 {
// ACK the 200 OK.
let ack_buf = if let Some(dialog) = &self.dialog {
let ack = dialog.create_ack();
Some(ack.serialize())
} else {
None
};
// If already connected (200 retransmit), just re-ACK.
if self.state == LegState::Connected {
return match ack_buf {
Some(buf) => SipLegAction::Send(buf),
None => SipLegAction::None,
};
}
// Learn media endpoint from SDP.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
self.remote_media = Some(addr);
}
}
}
self.state = LegState::Connected;
match ack_buf {
Some(buf) => SipLegAction::ConnectedWithAck(buf),
None => SipLegAction::StateChange(LegState::Connected),
}
} else if code >= 300 {
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
SipLegAction::Terminated(format!("rejected_{code}"))
} else {
SipLegAction::None // 1xx provisional
}
}
fn handle_auth_challenge(&mut self, msg: &SipMessage) -> SipLegAction {
if self.auth_attempted {
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
return SipLegAction::Terminated("auth_rejected".to_string());
}
self.auth_attempted = true;
let challenge_header = match msg.get_header("Proxy-Authenticate") {
Some(h) => h,
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_challenge".to_string());
}
};
let challenge = match parse_digest_challenge(challenge_header) {
Some(c) => c,
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_bad_challenge".to_string());
}
};
let password = match &self.config.password {
Some(p) => p.clone(),
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_password".to_string());
}
};
let aor = match &self.config.registered_aor {
Some(a) => a.clone(),
None => {
self.state = LegState::Terminated;
return SipLegAction::Terminated("407_no_aor".to_string());
}
};
let username = aor
.trim_start_matches("sip:")
.trim_start_matches("sips:")
.split('@')
.next()
.unwrap_or("")
.to_string();
let dest_uri = self
.invite
.as_ref()
.and_then(|i| i.request_uri())
.unwrap_or("")
.to_string();
let auth_value = compute_digest_auth(
&username,
&password,
&challenge.realm,
&challenge.nonce,
"INVITE",
&dest_uri,
challenge.algorithm.as_deref(),
challenge.opaque.as_deref(),
);
// ACK the 407.
let mut ack_buf = None;
if let Some(invite) = &self.invite {
let ack = build_non_2xx_ack(invite, msg);
ack_buf = Some(ack.serialize());
}
// Save original INVITE for retransmission handling.
self.orig_invite = self.invite.clone();
// Build authenticated INVITE with same From tag, CSeq=2.
let ip = self
.config
.public_ip
.as_deref()
.unwrap_or(&self.config.lan_ip);
let from_tag = self
.dialog
.as_ref()
.map(|d| d.local_tag.clone())
.unwrap_or_else(generate_tag);
let sdp = build_sdp(&SdpOptions {
ip,
port: self.config.rtp_port,
payload_types: &self.config.codecs,
..Default::default()
});
let call_id = self
.dialog
.as_ref()
.map(|d| d.call_id.clone())
.unwrap_or_default();
let invite_auth = SipMessage::create_request(
"INVITE",
&dest_uri,
RequestOptions {
via_host: ip.to_string(),
via_port: self.config.lan_port,
via_transport: None,
via_branch: Some(generate_branch()),
from_uri: aor,
from_display_name: None,
from_tag: Some(from_tag),
to_uri: dest_uri.clone(),
to_display_name: None,
to_tag: None,
call_id: Some(call_id),
cseq: Some(2),
contact: Some(format!("<sip:{ip}:{}>", self.config.lan_port)),
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("Proxy-Authorization".to_string(), auth_value),
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
},
);
self.invite = Some(invite_auth.clone());
if let Some(dialog) = &mut self.dialog {
dialog.local_cseq = 2;
}
// Return both the ACK for the 407 and the new authenticated INVITE.
let invite_buf = invite_auth.serialize();
SipLegAction::AuthRetry {
ack_407: ack_buf,
invite_with_auth: invite_buf,
}
}
fn handle_request(&mut self, msg: &SipMessage) -> SipLegAction {
let method = msg.method().unwrap_or("");
if method == "BYE" {
let ok = SipMessage::create_response(200, "OK", msg, None);
self.state = LegState::Terminated;
if let Some(dialog) = &mut self.dialog {
dialog.terminate();
}
return SipLegAction::SendAndTerminate(ok.serialize(), "bye".to_string());
}
if method == "INFO" {
let ok = SipMessage::create_response(200, "OK", msg, None);
return SipLegAction::Send(ok.serialize());
}
SipLegAction::None
}
/// Build a BYE or CANCEL to tear down this leg.
pub fn build_hangup(&mut self) -> Option<Vec<u8>> {
let dialog = self.dialog.as_mut()?;
let msg = if dialog.state == DialogState::Confirmed {
dialog.create_request("BYE", None, None, None)
} else if dialog.state == DialogState::Early {
if let Some(invite) = &self.invite {
dialog.create_cancel(invite)
} else {
return None;
}
} else {
return None;
};
self.state = LegState::Terminating;
dialog.terminate();
Some(msg.serialize())
}
/// Get the SIP Call-ID for routing.
pub fn sip_call_id(&self) -> Option<&str> {
self.dialog.as_ref().map(|d| d.call_id.as_str())
}
}
/// Actions produced by the SipLeg message handler.
pub enum SipLegAction {
/// No action needed.
None,
/// Send a SIP message (ACK, 200 OK to INFO, etc.).
Send(Vec<u8>),
/// Leg state changed.
StateChange(LegState),
/// Connected — send this ACK.
ConnectedWithAck(Vec<u8>),
/// Terminated with a reason.
Terminated(String),
/// Send 200 OK and terminate.
SendAndTerminate(Vec<u8>, String),
/// 407 auth retry — send ACK for 407, then send new INVITE with auth.
AuthRetry {
ack_407: Option<Vec<u8>>,
invite_with_auth: Vec<u8>,
},
}
/// Build an ACK for a non-2xx response (same transaction as the INVITE).
fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage {
let via = original_invite.get_header("Via").unwrap_or("").to_string();
let from = original_invite
.get_header("From")
.unwrap_or("")
.to_string();
let to = response.get_header("To").unwrap_or("").to_string();
let call_id = original_invite.call_id().to_string();
let cseq_num: u32 = original_invite
.get_header("CSeq")
.and_then(|s| s.split_whitespace().next())
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let ruri = original_invite
.request_uri()
.unwrap_or("sip:unknown")
.to_string();
SipMessage::new(
format!("ACK {ruri} SIP/2.0"),
vec![
("Via".to_string(), via),
("From".to_string(), from),
("To".to_string(), to),
("Call-ID".to_string(), call_id),
("CSeq".to_string(), format!("{cseq_num} ACK")),
("Max-Forwards".to_string(), "70".to_string()),
("Content-Length".to_string(), "0".to_string()),
],
String::new(),
)
}