fix(proxy-engine): improve inbound SIP routing diagnostics and enrich leg media state reporting

This commit is contained in:
2026-04-14 20:19:34 +00:00
parent 0d82a626b5
commit 88768f0586
46 changed files with 555689 additions and 107 deletions

View File

@@ -5,7 +5,7 @@
//! The mixer provides mix-minus audio to all participants.
use crate::call::{Call, CallDirection, CallState, LegId, LegInfo, LegKind, LegState};
use crate::config::{normalize_routing_identity, AppConfig, ProviderConfig};
use crate::config::{extract_inbound_called_number, normalize_routing_identity, AppConfig, ProviderConfig};
use crate::ipc::{emit_event, OutTx};
use crate::leg_io::{create_leg_channels, spawn_sip_inbound, spawn_sip_outbound};
use crate::mixer::spawn_mixer;
@@ -25,6 +25,32 @@ use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
fn emit_inbound_diagnostic(
out_tx: &OutTx,
label: &str,
invite: &SipMessage,
from_addr: SocketAddr,
provider_id: &str,
called_number: &str,
caller_number: &str,
) {
emit_event(
out_tx,
"sip_unhandled",
serde_json::json!({
"method_or_status": format!(
"INVITE {label} provider={provider_id} called={called_number} caller={caller_number} ruri={} to={} pcalled={}",
invite.request_uri().unwrap_or(""),
invite.get_header("To").unwrap_or(""),
invite.get_header("P-Called-Party-ID").unwrap_or(""),
),
"call_id": invite.call_id(),
"from_addr": from_addr.ip().to_string(),
"from_port": from_addr.port(),
}),
);
}
/// Result of creating an inbound call — carries both the call id and
/// whether browsers should be notified (flows from the matched inbound
/// route's `ring_browsers` flag).
@@ -35,7 +61,17 @@ pub struct InboundCallCreated {
/// Emit a `leg_added` event with full leg information.
/// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed.
fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
fn codec_label(codec_pt: u8) -> String {
match codec_pt {
0 => "PCMU".to_string(),
8 => "PCMA".to_string(),
9 => "G.722".to_string(),
111 => "Opus".to_string(),
_ => format!("PT{codec_pt}"),
}
}
fn leg_metadata_json(leg: &LegInfo) -> serde_json::Value {
let metadata: serde_json::Value = if leg.metadata.is_empty() {
serde_json::json!({})
} else {
@@ -46,22 +82,35 @@ fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
.collect(),
)
};
metadata
}
fn leg_event_payload(call_id: &str, leg: &LegInfo) -> serde_json::Value {
serde_json::json!({
"call_id": call_id,
"leg_id": leg.id,
"kind": leg.kind.as_str(),
"state": leg.state.as_str(),
"codec": codec_label(leg.codec_pt),
"rtpPort": leg.rtp_port,
"remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
"metadata": leg_metadata_json(leg),
})
}
fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
emit_event(
tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg.id,
"kind": leg.kind.as_str(),
"state": leg.state.as_str(),
"codec": sip_proto::helpers::codec_name(leg.codec_pt),
"rtpPort": leg.rtp_port,
"remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
"metadata": metadata,
}),
leg_event_payload(call_id, leg),
);
}
fn emit_leg_state_changed_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
emit_event(tx, "leg_state_changed", leg_event_payload(call_id, leg));
}
pub struct CallManager {
/// All active calls, keyed by internal call ID.
pub calls: HashMap<String, Call>,
@@ -232,11 +281,11 @@ impl CallManager {
"call_ringing",
serde_json::json!({ "call_id": call_id }),
);
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(leg_id) {
emit_leg_state_changed_event(&self.out_tx, call_id, leg);
}
}
}
SipLegAction::ConnectedWithAck(ack_buf) => {
let _ = socket.send_to(&ack_buf, target).await;
@@ -248,6 +297,7 @@ impl CallManager {
let sip_leg = leg.sip_leg.as_ref().unwrap();
let remote = sip_leg.remote_media;
leg.state = LegState::Connected;
leg.codec_pt = sip_pt;
leg.remote_media = remote;
call.state = CallState::Connected;
remote
@@ -298,8 +348,17 @@ impl CallManager {
dev_rtp_socket,
dev_remote,
dev_leg_id,
)) = device_leg_info
)) = device_leg_info
{
// Use the device's preferred codec from its INVITE SDP,
// not the provider's negotiated codec.
let dev_pt = device_invite
.has_sdp_body()
.then(|| parse_sdp_endpoint(&device_invite.body))
.flatten()
.and_then(|ep| ep.codec_pt)
.unwrap_or(sip_pt);
// Build SDP pointing device to our device_rtp port.
// Use LAN IP for the device (it's on the local network).
let call_ref = self.calls.get(call_id).unwrap();
@@ -336,23 +395,16 @@ impl CallManager {
if let Some(call) = self.calls.get_mut(call_id) {
if let Some(dev_leg) = call.legs.get_mut(&dev_leg_id) {
dev_leg.state = LegState::Connected;
dev_leg.codec_pt = dev_pt;
}
}
if let Some(call) = self.calls.get(call_id) {
if let Some(dev_leg) = call.legs.get(&dev_leg_id) {
emit_leg_state_changed_event(&self.out_tx, call_id, dev_leg);
}
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": dev_leg_id, "state": "connected" }),
);
// Wire device leg to mixer.
// Use the device's preferred codec from its INVITE SDP,
// not the provider's negotiated codec.
let dev_pt = device_invite
.has_sdp_body()
.then(|| parse_sdp_endpoint(&device_invite.body))
.flatten()
.and_then(|ep| ep.codec_pt)
.unwrap_or(sip_pt);
if let Some(dev_remote_addr) = dev_remote {
let dev_channels = create_leg_channels();
spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx);
@@ -385,11 +437,11 @@ impl CallManager {
"sip_pt": sip_pt,
}),
);
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(leg_id) {
emit_leg_state_changed_event(&self.out_tx, call_id, leg);
}
}
}
SipLegAction::Terminated(reason) => {
let duration = self
@@ -436,11 +488,11 @@ impl CallManager {
leg.state = LegState::Terminated;
}
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(leg_id) {
emit_leg_state_changed_event(&self.out_tx, call_id, leg);
}
}
emit_event(
&self.out_tx,
"call_ended",
@@ -684,11 +736,9 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }),
);
if let Some(leg) = call.legs.get(this_leg_id) {
emit_leg_state_changed_event(&self.out_tx, call_id, leg);
}
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
@@ -708,11 +758,9 @@ impl CallManager {
needs_wiring = true;
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }),
);
if let Some(leg) = call.legs.get(this_leg_id) {
emit_leg_state_changed_event(&self.out_tx, call_id, leg);
}
if call.state != CallState::Connected {
call.state = CallState::Connected;
@@ -811,7 +859,7 @@ impl CallManager {
// Extract caller/callee info.
let from_header = invite.get_header("From").unwrap_or("");
let caller_number = normalize_routing_identity(from_header);
let called_number = normalize_routing_identity(invite.request_uri().unwrap_or(""));
let called_number = extract_inbound_called_number(invite);
// Resolve via the configured inbound routing table. The matched route
// is the source of truth for which external numbers this provider is
@@ -826,6 +874,15 @@ impl CallManager {
{
Some(route) => route,
None => {
emit_inbound_diagnostic(
&self.out_tx,
"route_miss",
invite,
from_addr,
provider_id,
&called_number,
&caller_number,
);
let resp = SipMessage::create_response(404, "Not Found", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
@@ -940,6 +997,15 @@ impl CallManager {
let provider_rtp = match rtp_pool.allocate().await {
Some(a) => a,
None => {
emit_inbound_diagnostic(
&self.out_tx,
"provider_rtp_unavailable",
invite,
from_addr,
provider_id,
&called_number,
&caller_number,
);
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
@@ -948,6 +1014,15 @@ impl CallManager {
let device_rtp = match rtp_pool.allocate().await {
Some(a) => a,
None => {
emit_inbound_diagnostic(
&self.out_tx,
"device_rtp_unavailable",
invite,
from_addr,
provider_id,
&called_number,
&caller_number,
);
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
@@ -1707,11 +1782,7 @@ impl CallManager {
}
}
leg.state = LegState::Terminated;
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg.id, "state": "terminated" }),
);
emit_leg_state_changed_event(&self.out_tx, call_id, leg);
}
emit_event(
@@ -1760,6 +1831,16 @@ impl CallManager {
let rtp_alloc = match rtp_pool.allocate().await {
Some(a) => a,
None => {
let called_number = extract_inbound_called_number(invite);
emit_inbound_diagnostic(
&self.out_tx,
"voicemail_rtp_unavailable",
invite,
from_addr,
provider_id,
&called_number,
caller_number,
);
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
@@ -1901,6 +1982,16 @@ impl CallManager {
let rtp_alloc = match rtp_pool.allocate().await {
Some(a) => a,
None => {
let called_number = extract_inbound_called_number(invite);
emit_inbound_diagnostic(
&self.out_tx,
"ivr_rtp_unavailable",
invite,
from_addr,
provider_id,
&called_number,
caller_number,
);
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;

View File

@@ -273,6 +273,38 @@ pub fn normalize_routing_identity(value: &str) -> String {
digits
}
fn looks_like_phone_identity(value: &str) -> bool {
let digits = value.chars().filter(|c| c.is_ascii_digit()).count();
digits >= 6 && value.chars().all(|c| c.is_ascii_digit() || c == '+')
}
/// Pick the best inbound called-number identity from common SIP headers.
///
/// Some providers deliver the DID in `To` / `P-Called-Party-ID` while the
/// request URI contains an account username. Prefer a phone-like identity when
/// present; otherwise fall back to the request URI user part.
pub fn extract_inbound_called_number(msg: &SipMessage) -> String {
let request_uri = normalize_routing_identity(msg.request_uri().unwrap_or(""));
if looks_like_phone_identity(&request_uri) {
return request_uri;
}
for header_name in [
"P-Called-Party-ID",
"X-Called-Party-ID",
"Diversion",
"History-Info",
"To",
] {
let candidate = normalize_routing_identity(msg.get_header(header_name).unwrap_or(""));
if looks_like_phone_identity(&candidate) {
return candidate;
}
}
request_uri
}
fn parse_numeric_range_value(value: &str) -> Option<(bool, &str)> {
let trimmed = value.trim();
if trimmed.is_empty() {
@@ -636,6 +668,20 @@ mod tests {
assert!(!support.ring_browsers);
}
#[test]
fn extract_inbound_called_number_prefers_did_headers_over_username_ruri() {
let raw = b"INVITE sip:2830573e1@proxy.example SIP/2.0\r\nTo: <sip:+4942116767548@proxy.example>\r\nFrom: <sip:+491701234567@provider.example>;tag=abc\r\nCall-ID: test-1\r\nCSeq: 1 INVITE\r\nContent-Length: 0\r\n\r\n";
let msg = SipMessage::parse(raw).expect("invite should parse");
assert_eq!(extract_inbound_called_number(&msg), "+4942116767548");
}
#[test]
fn extract_inbound_called_number_keeps_phone_ruri_when_already_present() {
let raw = b"INVITE sip:042116767548@proxy.example SIP/2.0\r\nTo: <sip:2830573e1@proxy.example>\r\nFrom: <sip:+491701234567@provider.example>;tag=abc\r\nCall-ID: test-2\r\nCSeq: 1 INVITE\r\nContent-Length: 0\r\n\r\n";
let msg = SipMessage::parse(raw).expect("invite should parse");
assert_eq!(extract_inbound_called_number(&msg), "042116767548");
}
#[test]
fn matches_pattern_supports_numeric_ranges() {
assert!(matches_pattern(

View File

@@ -25,7 +25,7 @@ mod voicemail;
mod webrtc_engine;
use crate::call_manager::CallManager;
use crate::config::{normalize_routing_identity, AppConfig};
use crate::config::{extract_inbound_called_number, normalize_routing_identity, AppConfig};
use crate::ipc::{emit_event, respond_err, respond_ok, Command, OutTx};
use crate::provider::ProviderManager;
use crate::registrar::Registrar;
@@ -346,7 +346,7 @@ async fn handle_sip_packet(
// Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc).
let from_header = msg.get_header("From").unwrap_or("");
let from_uri = normalize_routing_identity(from_header);
let called_number = normalize_routing_identity(msg.request_uri().unwrap_or(""));
let called_number = extract_inbound_called_number(&msg);
emit_event(
&eng.out_tx,
@@ -369,6 +369,20 @@ async fn handle_sip_packet(
let dialed_number = normalize_routing_identity(msg.request_uri().unwrap_or(""));
let device = eng.registrar.find_by_address(&from_addr);
if device.is_none() {
emit_event(
&eng.out_tx,
"sip_unhandled",
serde_json::json!({
"method_or_status": "INVITE",
"call_id": msg.call_id(),
"from_addr": from_addr.ip().to_string(),
"from_port": from_addr.port(),
"is_from_provider": false,
}),
);
return;
}
let device_id = device.map(|d| d.device_id.clone());
// Find provider via routing rules.

View File

@@ -313,6 +313,23 @@ impl ProviderManager {
if ps.config.outbound_proxy.address == addr.ip().to_string() {
return Some(ps_arc.clone());
}
// Hostname-based providers (e.g. sipgate.de) often deliver inbound
// INVITEs from resolved IPs rather than the literal configured host.
// Resolve the proxy host and accept any matching IP/port variant.
use std::net::ToSocketAddrs;
if let Ok(resolved) = format!(
"{}:{}",
ps.config.outbound_proxy.address, ps.config.outbound_proxy.port
)
.to_socket_addrs()
{
for resolved_addr in resolved {
if resolved_addr == *addr || resolved_addr.ip() == addr.ip() {
return Some(ps_arc.clone());
}
}
}
}
None
}

View File

@@ -13,6 +13,7 @@ use crate::audio_player::pcm_to_mix_frames;
use kokoro_tts::{KokoroTts, Voice};
use std::path::Path;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::{mpsc, watch};
pub const DEFAULT_MODEL_PATH: &str = ".nogit/tts/kokoro-v1.0.onnx";
@@ -47,6 +48,10 @@ pub struct TtsEngine {
/// Path that was used to load the current model (for cache invalidation).
loaded_model_path: String,
loaded_voices_path: String,
/// On-disk TTS WAVs are cacheable only within a single engine lifetime.
/// Every restart gets a new generation token, so prior process outputs are
/// treated as stale and regenerated on first use.
cache_generation: String,
}
impl TtsEngine {
@@ -55,6 +60,10 @@ impl TtsEngine {
tts: None,
loaded_model_path: String::new(),
loaded_voices_path: String::new(),
cache_generation: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos().to_string())
.unwrap_or_else(|_| "0".to_string()),
}
}
@@ -228,7 +237,7 @@ impl TtsEngine {
return false;
}
match std::fs::read_to_string(&meta_path) {
Ok(contents) => contents == Self::cache_key(text, voice),
Ok(contents) => contents == self.cache_key(text, voice),
Err(_) => false,
}
}
@@ -236,12 +245,12 @@ impl TtsEngine {
/// Write the sidecar `.meta` file next to the WAV.
fn write_cache_meta(&self, output_path: &str, text: &str, voice: &str) {
let meta_path = format!("{output_path}.meta");
let _ = std::fs::write(&meta_path, Self::cache_key(text, voice));
let _ = std::fs::write(&meta_path, self.cache_key(text, voice));
}
/// Build the cache key from text + voice.
fn cache_key(text: &str, voice: &str) -> String {
format!("{}\0{}", text, voice)
/// Build the cache key from process generation + text + voice.
fn cache_key(&self, text: &str, voice: &str) -> String {
format!("{}\0{}\0{}", self.cache_generation, text, voice)
}
}