diff --git a/changelog.md b/changelog.md index 8ff9886..ed7e489 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-04-14 - 1.24.0 - feat(routing) +require explicit inbound DID routes and normalize SIP identities for provider-based number matching + +- Inbound route resolution now returns no match unless a configured inbound route explicitly matches the provider and called number. +- Normalized routing identities were added for SIP/TEL URIs so inbound DIDs and outbound dialed numbers match consistently across provider-specific formats. +- Call handling and incoming call events now use normalized numbers, improving routing accuracy for shared trunk providers. +- Route configuration docs and the web route editor were updated to support explicit inbound DID ownership, voicemail fallback, and IVR selection. +- Mixer RTP handling was enhanced to better support variable packet durations, timestamp-based gap fill, and non-blocking output drop reporting. + ## 2026-04-14 - 1.23.0 - feat(runtime) refactor runtime state and proxy event handling for typed WebRTC linking and shared status models diff --git a/readme.md b/readme.md index f8ef986..1c2e494 100644 --- a/readme.md +++ b/readme.md @@ -148,24 +148,41 @@ Create `.nogit/config.json`: "routing": { "routes": [ { - "id": "inbound-default", - "name": "Ring all devices", - "priority": 100, - "direction": "inbound", - "match": {}, + "id": "inbound-main-did", + "name": "Main DID", + "priority": 200, + "enabled": true, + "match": { + "direction": "inbound", + "sourceProvider": "my-trunk", + "numberPattern": "+49421219694" + }, "action": { "targets": ["desk-phone"], "ringBrowsers": true, - "voicemailBox": "main", - "noAnswerTimeout": 25 + "voicemailBox": "main" + } + }, + { + "id": "inbound-support-did", + "name": "Support DID", + "priority": 190, + "enabled": true, + "match": { + "direction": "inbound", + "sourceProvider": "my-trunk", + "numberPattern": "+49421219695" + }, + "action": { + "ivrMenuId": "support-menu" } }, { "id": "outbound-default", "name": "Route via trunk", "priority": 100, - "direction": "outbound", - "match": {}, + "enabled": true, + "match": { "direction": "outbound" }, "action": { "provider": "my-trunk" } } ] @@ -187,6 +204,8 @@ Create `.nogit/config.json`: } ``` +Inbound number ownership is explicit: add one inbound route per DID (or DID prefix) and scope it with `sourceProvider` when a provider delivers multiple external numbers. + ### TTS Setup (Optional) For neural voicemail greetings and IVR prompts, download the Kokoro TTS model: diff --git a/rust/crates/codec-lib/src/lib.rs b/rust/crates/codec-lib/src/lib.rs index 30e7c67..22c1255 100644 --- a/rust/crates/codec-lib/src/lib.rs +++ b/rust/crates/codec-lib/src/lib.rs @@ -115,9 +115,8 @@ pub struct TranscodeState { impl TranscodeState { /// Create a new transcoding session with fresh codec state. pub fn new() -> Result { - let mut opus_enc = - OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip) - .map_err(|e| format!("opus encoder: {e}"))?; + let mut opus_enc = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip) + .map_err(|e| format!("opus encoder: {e}"))?; opus_enc .set_complexity(5) .map_err(|e| format!("opus set_complexity: {e}"))?; @@ -160,14 +159,9 @@ impl TranscodeState { let key = (from_rate, to_rate, canonical_chunk); if !self.resamplers.contains_key(&key) { - let r = FftFixedIn::::new( - from_rate as usize, - to_rate as usize, - canonical_chunk, - 1, - 1, - ) - .map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?; + let r = + FftFixedIn::::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1) + .map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?; self.resamplers.insert(key, r); } let resampler = self.resamplers.get_mut(&key).unwrap(); @@ -284,8 +278,7 @@ impl TranscodeState { match pt { PT_OPUS => { let mut pcm = vec![0i16; 5760]; // up to 120ms at 48kHz - let packet = - OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?; + let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?; let out = MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?; let n: usize = self @@ -343,8 +336,7 @@ impl TranscodeState { match pt { PT_OPUS => { let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz - let packet = - OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?; + let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?; let out = MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?; let n: usize = self @@ -368,8 +360,8 @@ impl TranscodeState { /// Returns f32 PCM at 48kHz. `frame_size` should be 960 for 20ms. pub fn opus_plc(&mut self, frame_size: usize) -> Result, String> { let mut pcm = vec![0.0f32; frame_size]; - let out = MutSignals::try_from(&mut pcm[..]) - .map_err(|e| format!("opus plc signals: {e}"))?; + let out = + MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus plc signals: {e}"))?; let n: usize = self .opus_dec .decode_float(None::>, out, false) @@ -425,14 +417,9 @@ impl TranscodeState { let key = (from_rate, to_rate, canonical_chunk); if !self.resamplers_f32.contains_key(&key) { - let r = FftFixedIn::::new( - from_rate as usize, - to_rate as usize, - canonical_chunk, - 1, - 1, - ) - .map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?; + let r = + FftFixedIn::::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1) + .map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?; self.resamplers_f32.insert(key, r); } let resampler = self.resamplers_f32.get_mut(&key).unwrap(); @@ -508,8 +495,10 @@ mod tests { let encoded = mulaw_encode(sample); let decoded = mulaw_decode(encoded); // µ-law is lossy; verify the decoded value is close. - assert!((sample as i32 - decoded as i32).abs() < 1000, - "µ-law roundtrip failed for {sample}: got {decoded}"); + assert!( + (sample as i32 - decoded as i32).abs() < 1000, + "µ-law roundtrip failed for {sample}: got {decoded}" + ); } } @@ -518,8 +507,10 @@ mod tests { for sample in [-32768i16, -1000, -1, 0, 1, 1000, 32767] { let encoded = alaw_encode(sample); let decoded = alaw_decode(encoded); - assert!((sample as i32 - decoded as i32).abs() < 1000, - "A-law roundtrip failed for {sample}: got {decoded}"); + assert!( + (sample as i32 - decoded as i32).abs() < 1000, + "A-law roundtrip failed for {sample}: got {decoded}" + ); } } @@ -543,7 +534,9 @@ mod tests { fn pcmu_to_pcma_roundtrip() { let mut st = TranscodeState::new().unwrap(); // 160 bytes = 20ms of PCMU at 8kHz - let pcmu_data: Vec = (0..160).map(|i| mulaw_encode((i as i16 * 200) - 16000)).collect(); + let pcmu_data: Vec = (0..160) + .map(|i| mulaw_encode((i as i16 * 200) - 16000)) + .collect(); let pcma = st.transcode(&pcmu_data, PT_PCMU, PT_PCMA, None).unwrap(); assert_eq!(pcma.len(), 160); // Same frame size let back = st.transcode(&pcma, PT_PCMA, PT_PCMU, None).unwrap(); diff --git a/rust/crates/proxy-engine/src/audio_player.rs b/rust/crates/proxy-engine/src/audio_player.rs index 4fce73d..6e08a53 100644 --- a/rust/crates/proxy-engine/src/audio_player.rs +++ b/rust/crates/proxy-engine/src/audio_player.rs @@ -36,10 +36,7 @@ pub async fn play_wav_file( // Read all samples as i16. let samples: Vec = if spec.bits_per_sample == 16 { - reader - .samples::() - .filter_map(|s| s.ok()) - .collect() + reader.samples::().filter_map(|s| s.ok()).collect() } else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float { reader .samples::() @@ -199,10 +196,7 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result>, String> { .map(|s| s as f32 / 32768.0) .collect() } else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float { - reader - .samples::() - .filter_map(|s| s.ok()) - .collect() + reader.samples::().filter_map(|s| s.ok()).collect() } else { return Err(format!( "unsupported WAV format: {}bit {:?}", diff --git a/rust/crates/proxy-engine/src/call_manager.rs b/rust/crates/proxy-engine/src/call_manager.rs index 806121c..29dbed1 100644 --- a/rust/crates/proxy-engine/src/call_manager.rs +++ b/rust/crates/proxy-engine/src/call_manager.rs @@ -5,7 +5,7 @@ //! The mixer provides mix-minus audio to all participants. use crate::call::{Call, CallDirection, CallState, LegId, LegInfo, LegKind, LegState}; -use crate::config::{AppConfig, ProviderConfig}; +use crate::config::{normalize_routing_identity, AppConfig, ProviderConfig}; use crate::ipc::{emit_event, OutTx}; use crate::leg_io::{create_leg_channels, spawn_sip_inbound, spawn_sip_outbound}; use crate::mixer::spawn_mixer; @@ -13,7 +13,9 @@ use crate::registrar::Registrar; use crate::rtp::RtpPortPool; use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig}; use crate::tts::TtsEngine; -use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions}; +use sip_proto::helpers::{ + build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions, +}; use sip_proto::message::{ResponseOptions, SipMessage}; use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri}; use std::collections::HashMap; @@ -25,7 +27,7 @@ use tokio::sync::Mutex; /// Result of creating an inbound call — carries both the call id and /// whether browsers should be notified (flows from the matched inbound -/// route's `ring_browsers` flag, or the fallback default). +/// route's `ring_browsers` flag). pub struct InboundCallCreated { pub call_id: String, pub ring_browsers: bool, @@ -214,15 +216,27 @@ impl CallManager { let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice); if let Some(dev) = device_leg { if let Some(dev_addr) = dev.signaling_addr { - let ringing = SipMessage::create_response(180, "Ringing", device_invite, None); + let ringing = SipMessage::create_response( + 180, + "Ringing", + device_invite, + None, + ); let _ = socket.send_to(&ringing.serialize(), dev_addr).await; } } } } - emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); - emit_event(&self.out_tx, "leg_state_changed", - serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" })); + emit_event( + &self.out_tx, + "call_ringing", + serde_json::json!({ "call_id": call_id }), + ); + emit_event( + &self.out_tx, + "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }), + ); } SipLegAction::ConnectedWithAck(ack_buf) => { let _ = socket.send_to(&ack_buf, target).await; @@ -245,16 +259,30 @@ impl CallManager { spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx); spawn_sip_outbound(rtp_socket, remote_addr, channels.outbound_rx); if let Some(call) = self.calls.get(call_id) { - call.add_leg_to_mixer(leg_id, sip_pt, channels.inbound_rx, channels.outbound_tx) - .await; + call.add_leg_to_mixer( + leg_id, + sip_pt, + channels.inbound_rx, + channels.outbound_tx, + ) + .await; } } // For device-originated calls: send 200 OK to device and wire device leg. if let Some(call) = self.calls.get(call_id) { if let Some(device_invite) = call.device_invite.clone() { - let device_leg_info: Option<(SocketAddr, u16, Arc, Option, String)> = - call.legs.values().find(|l| l.kind == LegKind::SipDevice).and_then(|dev| { + let device_leg_info: Option<( + SocketAddr, + u16, + Arc, + Option, + String, + )> = call + .legs + .values() + .find(|l| l.kind == LegKind::SipDevice) + .and_then(|dev| { Some(( dev.signaling_addr?, dev.rtp_port, @@ -264,11 +292,21 @@ impl CallManager { )) }); - if let Some((dev_addr, dev_rtp_port, dev_rtp_socket, dev_remote, dev_leg_id)) = device_leg_info { + if let Some(( + dev_addr, + dev_rtp_port, + dev_rtp_socket, + dev_remote, + dev_leg_id, + )) = device_leg_info + { // Build SDP pointing device to our device_rtp port. // Use LAN IP for the device (it's on the local network). let call_ref = self.calls.get(call_id).unwrap(); - let prov_leg = call_ref.legs.values().find(|l| l.kind == LegKind::SipProvider); + let prov_leg = call_ref + .legs + .values() + .find(|l| l.kind == LegKind::SipProvider); let lan_ip_str = prov_leg .and_then(|l| l.sip_leg.as_ref()) .map(|sl| sl.config.lan_ip.clone()) @@ -280,13 +318,18 @@ impl CallManager { ..Default::default() }); - let ok = SipMessage::create_response(200, "OK", &device_invite, Some(ResponseOptions { - to_tag: Some(generate_tag()), - contact: Some(format!("", lan_ip_str, 5060)), - body: Some(sdp), - content_type: Some("application/sdp".to_string()), - extra_headers: None, - })); + let ok = SipMessage::create_response( + 200, + "OK", + &device_invite, + Some(ResponseOptions { + to_tag: Some(generate_tag()), + contact: Some(format!("", lan_ip_str, 5060)), + body: Some(sdp), + content_type: Some("application/sdp".to_string()), + extra_headers: None, + }), + ); let _ = socket.send_to(&ok.serialize(), dev_addr).await; // Update device leg state. @@ -313,27 +356,47 @@ impl CallManager { if let Some(dev_remote_addr) = dev_remote { let dev_channels = create_leg_channels(); spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx); - spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx); + spawn_sip_outbound( + dev_rtp_socket, + dev_remote_addr, + dev_channels.outbound_rx, + ); if let Some(call) = self.calls.get(call_id) { - call.add_leg_to_mixer(&dev_leg_id, dev_pt, dev_channels.inbound_rx, dev_channels.outbound_tx) - .await; + call.add_leg_to_mixer( + &dev_leg_id, + dev_pt, + dev_channels.inbound_rx, + dev_channels.outbound_tx, + ) + .await; } } } } } - emit_event(&self.out_tx, "call_answered", serde_json::json!({ - "call_id": call_id, - "provider_media_addr": remote.map(|a| a.ip().to_string()), - "provider_media_port": remote.map(|a| a.port()), - "sip_pt": sip_pt, - })); - emit_event(&self.out_tx, "leg_state_changed", - serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" })); + emit_event( + &self.out_tx, + "call_answered", + serde_json::json!({ + "call_id": call_id, + "provider_media_addr": remote.map(|a| a.ip().to_string()), + "provider_media_port": remote.map(|a| a.port()), + "sip_pt": sip_pt, + }), + ); + emit_event( + &self.out_tx, + "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }), + ); } SipLegAction::Terminated(reason) => { - let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0); + let duration = self + .calls + .get(call_id) + .map(|c| c.duration_secs()) + .unwrap_or(0); // Notify device if this is a device-originated outbound call. if let Some(call) = self.calls.get(call_id) { @@ -343,7 +406,8 @@ impl CallManager { if let Some(dev_addr) = dev.signaling_addr { // Map reason to SIP response code. let code: u16 = if reason.starts_with("rejected_") { - reason.strip_prefix("rejected_") + reason + .strip_prefix("rejected_") .and_then(|s| s.parse().ok()) .unwrap_or(503) } else if reason == "bye" { @@ -354,7 +418,12 @@ impl CallManager { 503 }; if code > 0 && dev.state != LegState::Connected { - let resp = SipMessage::create_response(code, "Service Unavailable", device_invite, None); + let resp = SipMessage::create_response( + code, + "Service Unavailable", + device_invite, + None, + ); let _ = socket.send_to(&resp.serialize(), dev_addr).await; } } @@ -367,22 +436,38 @@ impl CallManager { leg.state = LegState::Terminated; } } - emit_event(&self.out_tx, "leg_state_changed", - serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" })); - emit_event(&self.out_tx, "call_ended", - serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration })); + emit_event( + &self.out_tx, + "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }), + ); + emit_event( + &self.out_tx, + "call_ended", + serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }), + ); self.terminate_call(call_id).await; return true; } SipLegAction::SendAndTerminate(buf, reason) => { let _ = socket.send_to(&buf, from_addr).await; - let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0); - emit_event(&self.out_tx, "call_ended", - serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration })); + let duration = self + .calls + .get(call_id) + .map(|c| c.duration_secs()) + .unwrap_or(0); + emit_event( + &self.out_tx, + "call_ended", + serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }), + ); self.terminate_call(call_id).await; return true; } - SipLegAction::AuthRetry { ack_407, invite_with_auth } => { + SipLegAction::AuthRetry { + ack_407, + invite_with_auth, + } => { if let Some(ack) = ack_407 { let _ = socket.send_to(&ack, target).await; } @@ -416,11 +501,21 @@ impl CallManager { let this_kind = this_leg.map(|l| l.kind).unwrap_or(LegKind::SipProvider); // Find the counterpart leg. - let other_leg = call.legs.values().find(|l| l.id != this_leg_id && l.state != LegState::Terminated); - let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) = match other_leg { - Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone(), l.kind, l.public_ip.clone()), - None => return false, - }; + let other_leg = call + .legs + .values() + .find(|l| l.id != this_leg_id && l.state != LegState::Terminated); + let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) = + match other_leg { + Some(l) => ( + l.signaling_addr, + l.rtp_port, + l.id.clone(), + l.kind, + l.public_ip.clone(), + ), + None => return false, + }; let forward_to = match other_addr { Some(a) => a, None => return false, @@ -439,7 +534,9 @@ impl CallManager { }; // Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt). - let other_has_sip_leg = call.legs.get(&other_leg_id) + let other_has_sip_leg = call + .legs + .get(&other_leg_id) .map(|l| l.sip_leg.is_some()) .unwrap_or(false); @@ -473,7 +570,8 @@ impl CallManager { if let Some(other) = call.legs.get_mut(&other_leg_id) { if let Some(sip_leg) = &mut other.sip_leg { if let Some(hangup_buf) = sip_leg.build_hangup() { - let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await; + let _ = + socket.send_to(&hangup_buf, sip_leg.config.sip_target).await; } } } @@ -504,7 +602,8 @@ impl CallManager { if let Some(other) = call.legs.get_mut(&other_leg_id) { if let Some(sip_leg) = &mut other.sip_leg { if let Some(hangup_buf) = sip_leg.build_hangup() { - let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await; + let _ = + socket.send_to(&hangup_buf, sip_leg.config.sip_target).await; } } } @@ -541,13 +640,17 @@ impl CallManager { if this_kind == LegKind::SipProvider { // From provider → forward to device: rewrite request URI. if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) { - let new_ruri = rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port()); + let new_ruri = + rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port()); fwd.set_request_uri(&new_ruri); } } if fwd.is_dialog_establishing() { // Record-Route must also be routable from the destination leg. - fwd.prepend_header("Record-Route", &format!("")); + fwd.prepend_header( + "Record-Route", + &format!(""), + ); } let _ = socket.send_to(&fwd.serialize(), forward_to).await; return true; @@ -572,13 +675,20 @@ impl CallManager { if code == 180 || code == 183 { if call.state == CallState::SettingUp { call.state = CallState::Ringing; - emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); + emit_event( + &self.out_tx, + "call_ringing", + serde_json::json!({ "call_id": call_id }), + ); } if let Some(leg) = call.legs.get_mut(this_leg_id) { leg.state = LegState::Ringing; } - emit_event(&self.out_tx, "leg_state_changed", - serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" })); + emit_event( + &self.out_tx, + "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }), + ); } else if code >= 200 && code < 300 { let mut needs_wiring = false; if let Some(leg) = call.legs.get_mut(this_leg_id) { @@ -598,12 +708,19 @@ impl CallManager { needs_wiring = true; } - emit_event(&self.out_tx, "leg_state_changed", - serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" })); + emit_event( + &self.out_tx, + "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }), + ); if call.state != CallState::Connected { call.state = CallState::Connected; - emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id })); + emit_event( + &self.out_tx, + "call_answered", + serde_json::json!({ "call_id": call_id }), + ); } // Forward the response before wiring (drop call borrow). @@ -693,28 +810,27 @@ impl CallManager { // Extract caller/callee info. let from_header = invite.get_header("From").unwrap_or(""); - let caller_number = SipMessage::extract_uri(from_header) - .unwrap_or("Unknown") - .to_string(); - let called_number = invite - .request_uri() - .and_then(|uri| SipMessage::extract_uri(uri)) - .unwrap_or("") - .to_string(); + let caller_number = normalize_routing_identity(from_header); + let called_number = normalize_routing_identity(invite.request_uri().unwrap_or("")); - // Resolve via the configured inbound routing table. This honors - // user-defined routes from the UI (numberPattern, callerPattern, - // sourceProvider, targets, ringBrowsers). If no route matches, the - // fallback returns an empty `device_ids` and `ring_browsers: true`, - // which preserves pre-routing behavior via the `resolve_first_device` - // fallback below. + // Resolve via the configured inbound routing table. The matched route + // is the source of truth for which external numbers this provider is + // allowed to deliver to us. // // TODO: Multi-target inbound fork is not yet implemented. // - `route.device_ids` beyond the first registered target are ignored. // - `ring_browsers` is informational only — browsers see a toast but // do not race the SIP device. First-to-answer-wins requires a // multi-leg fork + per-leg CANCEL, which is not built yet. - let route = config.resolve_inbound_route(provider_id, &called_number, &caller_number); + let route = match config.resolve_inbound_route(provider_id, &called_number, &caller_number) + { + Some(route) => route, + None => { + let resp = SipMessage::create_response(404, "Not Found", invite, None); + let _ = socket.send_to(&resp.serialize(), from_addr).await; + return None; + } + }; let ring_browsers = route.ring_browsers; // IVR routing: if the route targets an IVR menu, go there directly. @@ -724,12 +840,24 @@ impl CallManager { if let Some(menu) = ivr.menus.iter().find(|m| m.id == *ivr_menu_id) { let call_id = self .route_to_ivr( - &call_id, invite, from_addr, &caller_number, - provider_id, provider_config, config, rtp_pool, socket, - public_ip, menu, &tts_engine, + &call_id, + invite, + from_addr, + &caller_number, + provider_id, + provider_config, + config, + rtp_pool, + socket, + public_ip, + menu, + &tts_engine, ) .await?; - return Some(InboundCallCreated { call_id, ring_browsers }); + return Some(InboundCallCreated { + call_id, + ring_browsers, + }); } } } @@ -748,19 +876,27 @@ impl CallManager { None => { // No device registered → voicemail. // Resolve greeting WAV on-demand (may trigger TTS generation). - let greeting_wav = resolve_greeting_wav( - config, - route.voicemail_box.as_deref(), - &tts_engine, - ).await; + let greeting_wav = + resolve_greeting_wav(config, route.voicemail_box.as_deref(), &tts_engine).await; let call_id = self .route_to_voicemail( - &call_id, invite, from_addr, &caller_number, - provider_id, provider_config, config, rtp_pool, socket, public_ip, + &call_id, + invite, + from_addr, + &caller_number, + provider_id, + provider_config, + config, + rtp_pool, + socket, + public_ip, greeting_wav, ) .await?; - return Some(InboundCallCreated { call_id, ring_browsers }); + return Some(InboundCallCreated { + call_id, + ring_browsers, + }); } }; @@ -855,8 +991,10 @@ impl CallManager { // Register SIP Call-ID → both legs (provider leg handles provider messages). // For passthrough, both legs share the same SIP Call-ID. // We route based on source address in route_passthrough_message. - self.sip_index - .insert(sip_call_id.clone(), (call_id.clone(), provider_leg_id.clone())); + self.sip_index.insert( + sip_call_id.clone(), + (call_id.clone(), provider_leg_id.clone()), + ); // Rewrite and forward INVITE to device. let mut fwd_invite = invite.clone(); @@ -889,7 +1027,10 @@ impl CallManager { } } - Some(InboundCallCreated { call_id, ring_browsers }) + Some(InboundCallCreated { + call_id, + ring_browsers, + }) } /// Initiate an outbound B2BUA call from the dashboard. @@ -938,7 +1079,9 @@ impl CallManager { // Send INVITE. let to_uri = format!("sip:{number}@{}", provider_config.domain); - sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await; + sip_leg + .send_invite(registered_aor, &to_uri, &sip_call_id, socket) + .await; // Create call with mixer. let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone()); @@ -1109,7 +1252,9 @@ impl CallManager { // Build proper To URI and send INVITE. let to_uri = format!("sip:{}@{}", dialed_number, provider_config.domain); - sip_leg.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket).await; + sip_leg + .send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket) + .await; call.legs.insert( provider_leg_id.clone(), @@ -1185,7 +1330,9 @@ impl CallManager { let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config); let to_uri = format!("sip:{number}@{}", provider_config.domain); - sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await; + sip_leg + .send_invite(registered_aor, &to_uri, &sip_call_id, socket) + .await; let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); @@ -1256,9 +1403,16 @@ impl CallManager { }; let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config); - let to_uri = format!("sip:{}@{}:{}", device_id, device_addr.ip(), device_addr.port()); + let to_uri = format!( + "sip:{}@{}:{}", + device_id, + device_addr.ip(), + device_addr.port() + ); let from_uri = format!("sip:sipproxy@{lan_ip}:{lan_port}"); - sip_leg.send_invite(&from_uri, &to_uri, &sip_call_id, socket).await; + sip_leg + .send_invite(&from_uri, &to_uri, &sip_call_id, socket) + .await; let leg_info = LegInfo { id: leg_id.clone(), @@ -1292,12 +1446,7 @@ impl CallManager { } /// Remove a leg from a call. - pub async fn remove_leg( - &mut self, - call_id: &str, - leg_id: &str, - socket: &UdpSocket, - ) -> bool { + pub async fn remove_leg(&mut self, call_id: &str, leg_id: &str, socket: &UdpSocket) -> bool { let call = match self.calls.get_mut(call_id) { Some(c) => c, None => return false, @@ -1310,7 +1459,9 @@ impl CallManager { if let Some(leg) = call.legs.get_mut(leg_id) { if let Some(sip_leg) = &mut leg.sip_leg { if let Some(hangup_bytes) = sip_leg.build_hangup() { - let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await; + let _ = socket + .send_to(&hangup_bytes, sip_leg.config.sip_target) + .await; } } leg.state = LegState::Terminated; @@ -1356,9 +1507,7 @@ impl CallManager { target_call_id: &str, ) -> bool { // Validate both calls exist and the leg is in the source call. - if !self.calls.contains_key(source_call_id) - || !self.calls.contains_key(target_call_id) - { + if !self.calls.contains_key(source_call_id) || !self.calls.contains_key(target_call_id) { return false; } @@ -1503,7 +1652,9 @@ impl CallManager { } if let Some(sip_leg) = &mut leg.sip_leg { if let Some(hangup_bytes) = sip_leg.build_hangup() { - let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await; + let _ = socket + .send_to(&hangup_bytes, sip_leg.config.sip_target) + .await; } } else if let Some(addr) = leg.signaling_addr { // Passthrough leg — send a simple BYE. @@ -1588,7 +1739,9 @@ impl CallManager { }); let response = SipMessage::create_response( - 200, "OK", invite, + 200, + "OK", + invite, Some(sip_proto::message::ResponseOptions { to_tag: Some(sip_proto::helpers::generate_tag()), contact: Some(format!("", lan_ip, config.proxy.lan_port)), @@ -1665,9 +1818,15 @@ impl CallManager { let rtp_socket = rtp_alloc.socket; tokio::spawn(async move { crate::voicemail::run_voicemail_session( - rtp_socket, provider_media, codec_pt, - greeting_wav, recording_path, 120_000, - call_id_owned, caller_owned, out_tx, + rtp_socket, + provider_media, + codec_pt, + greeting_wav, + recording_path, + 120_000, + call_id_owned, + caller_owned, + out_tx, ) .await; }); @@ -1717,7 +1876,9 @@ impl CallManager { }); let response = SipMessage::create_response( - 200, "OK", invite, + 200, + "OK", + invite, Some(sip_proto::message::ResponseOptions { to_tag: Some(sip_proto::helpers::generate_tag()), contact: Some(format!("", lan_ip, config.proxy.lan_port)), @@ -1816,9 +1977,9 @@ impl CallManager { tokio::spawn(async move { // Load prompt PCM frames if available. - let prompt_frames = prompt_wav.as_ref().and_then(|wav| { - crate::audio_player::load_prompt_pcm_frames(wav).ok() - }); + let prompt_frames = prompt_wav + .as_ref() + .and_then(|wav| crate::audio_player::load_prompt_pcm_frames(wav).ok()); if let Some(frames) = prompt_frames { let (result_tx, result_rx) = tokio::sync::oneshot::channel(); @@ -1884,7 +2045,11 @@ impl CallManager { // Internal helpers // ----------------------------------------------------------------------- - fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option { + fn resolve_first_device( + &self, + config: &AppConfig, + registrar: &Registrar, + ) -> Option { for device in &config.devices { if let Some(addr) = registrar.get_device_contact(&device.id) { return Some(addr); @@ -1907,8 +2072,7 @@ async fn resolve_greeting_wav( tts_engine: &Arc>, ) -> Option { // 1. Look up voicebox config. - let vb = voicebox_id - .and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled)); + let vb = voicebox_id.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled)); if let Some(vb) = vb { // 2. Pre-recorded WAV takes priority. diff --git a/rust/crates/proxy-engine/src/config.rs b/rust/crates/proxy-engine/src/config.rs index f9c1dd1..06fa2e4 100644 --- a/rust/crates/proxy-engine/src/config.rs +++ b/rust/crates/proxy-engine/src/config.rs @@ -4,6 +4,7 @@ //! proxy engine via the `configure` command. These types mirror the TS interfaces. use serde::Deserialize; +use sip_proto::message::SipMessage; use std::net::SocketAddr; /// Network endpoint. @@ -227,6 +228,51 @@ pub struct IvrMenuEntry { // Pattern matching (ported from ts/config.ts) // --------------------------------------------------------------------------- +/// Extract the URI user part and normalize phone-like identities for routing. +/// +/// This keeps inbound route matching stable across provider-specific URI shapes, +/// e.g. `sip:+49 421 219694@trunk.example` and `sip:0049421219694@trunk.example` +/// both normalize to `+49421219694`. +pub fn normalize_routing_identity(value: &str) -> String { + let extracted = SipMessage::extract_uri_user(value).unwrap_or(value).trim(); + if extracted.is_empty() { + return String::new(); + } + + let mut digits = String::new(); + let mut saw_plus = false; + + for (idx, ch) in extracted.chars().enumerate() { + if ch.is_ascii_digit() { + digits.push(ch); + continue; + } + + if ch == '+' && idx == 0 { + saw_plus = true; + continue; + } + + if matches!(ch, ' ' | '\t' | '-' | '.' | '/' | '(' | ')') { + continue; + } + + return extracted.to_string(); + } + + if digits.is_empty() { + return extracted.to_string(); + } + if saw_plus { + return format!("+{digits}"); + } + if digits.starts_with("00") && digits.len() > 2 { + return format!("+{}", &digits[2..]); + } + + digits +} + /// Test a value against a pattern string. /// - None/empty: matches everything (wildcard) /// - Trailing '*': prefix match @@ -363,7 +409,7 @@ impl AppConfig { provider_id: &str, called_number: &str, caller_number: &str, - ) -> InboundRouteResult { + ) -> Option { let mut routes: Vec<&Route> = self .routing .routes @@ -387,22 +433,150 @@ impl AppConfig { continue; } - return InboundRouteResult { + return Some(InboundRouteResult { device_ids: route.action.targets.clone().unwrap_or_default(), ring_browsers: route.action.ring_browsers.unwrap_or(false), voicemail_box: route.action.voicemail_box.clone(), ivr_menu_id: route.action.ivr_menu_id.clone(), no_answer_timeout: route.action.no_answer_timeout, - }; + }); } - // Fallback: ring all devices + browsers. - InboundRouteResult { - device_ids: vec![], - ring_browsers: true, - voicemail_box: None, - ivr_menu_id: None, - no_answer_timeout: None, - } + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_app_config(routes: Vec) -> AppConfig { + AppConfig { + proxy: ProxyConfig { + lan_ip: "127.0.0.1".to_string(), + lan_port: 5070, + public_ip_seed: None, + rtp_port_range: RtpPortRange { + min: 20_000, + max: 20_100, + }, + }, + providers: vec![ProviderConfig { + id: "provider-a".to_string(), + display_name: "Provider A".to_string(), + domain: "example.com".to_string(), + outbound_proxy: Endpoint { + address: "example.com".to_string(), + port: 5060, + }, + username: "user".to_string(), + password: "pass".to_string(), + register_interval_sec: 300, + codecs: vec![9], + quirks: Quirks { + early_media_silence: false, + silence_payload_type: None, + silence_max_packets: None, + }, + }], + devices: vec![DeviceConfig { + id: "desk".to_string(), + display_name: "Desk".to_string(), + expected_address: "127.0.0.1".to_string(), + extension: "100".to_string(), + }], + routing: RoutingConfig { routes }, + voiceboxes: vec![], + ivr: None, + } + } + + #[test] + fn normalize_routing_identity_extracts_uri_user_and_phone_number() { + assert_eq!( + normalize_routing_identity("sip:0049 421 219694@voip.easybell.de"), + "+49421219694" + ); + assert_eq!( + normalize_routing_identity(""), + "+49421219694" + ); + assert_eq!(normalize_routing_identity("sip:100@pbx.local"), "100"); + assert_eq!(normalize_routing_identity("sip:alice@pbx.local"), "alice"); + } + + #[test] + fn resolve_inbound_route_requires_explicit_match() { + let cfg = test_app_config(vec![]); + assert!(cfg + .resolve_inbound_route("provider-a", "+49421219694", "+491701234567") + .is_none()); + } + + #[test] + fn resolve_inbound_route_matches_per_number_on_shared_provider() { + let cfg = test_app_config(vec![ + Route { + id: "main".to_string(), + name: "Main DID".to_string(), + priority: 200, + enabled: true, + match_criteria: RouteMatch { + direction: "inbound".to_string(), + number_pattern: Some("+49421219694".to_string()), + caller_pattern: None, + source_provider: Some("provider-a".to_string()), + source_device: None, + }, + action: RouteAction { + targets: Some(vec!["desk".to_string()]), + ring_browsers: Some(true), + voicemail_box: None, + ivr_menu_id: None, + no_answer_timeout: None, + provider: None, + failover_providers: None, + strip_prefix: None, + prepend_prefix: None, + }, + }, + Route { + id: "support".to_string(), + name: "Support DID".to_string(), + priority: 100, + enabled: true, + match_criteria: RouteMatch { + direction: "inbound".to_string(), + number_pattern: Some("+49421219695".to_string()), + caller_pattern: None, + source_provider: Some("provider-a".to_string()), + source_device: None, + }, + action: RouteAction { + targets: None, + ring_browsers: Some(false), + voicemail_box: Some("support-box".to_string()), + ivr_menu_id: None, + no_answer_timeout: Some(20), + provider: None, + failover_providers: None, + strip_prefix: None, + prepend_prefix: None, + }, + }, + ]); + + let main = cfg + .resolve_inbound_route("provider-a", "+49421219694", "+491701234567") + .expect("main DID should match"); + assert_eq!(main.device_ids, vec!["desk".to_string()]); + assert!(main.ring_browsers); + + let support = cfg + .resolve_inbound_route("provider-a", "+49421219695", "+491701234567") + .expect("support DID should match"); + assert_eq!(support.voicemail_box.as_deref(), Some("support-box")); + assert_eq!(support.no_answer_timeout, Some(20)); + assert!(!support.ring_browsers); } } diff --git a/rust/crates/proxy-engine/src/ipc.rs b/rust/crates/proxy-engine/src/ipc.rs index 720e5cd..20d0cc5 100644 --- a/rust/crates/proxy-engine/src/ipc.rs +++ b/rust/crates/proxy-engine/src/ipc.rs @@ -19,7 +19,13 @@ pub struct Command { } /// Send a response to a command. -pub fn respond(tx: &OutTx, id: &str, success: bool, result: Option, error: Option<&str>) { +pub fn respond( + tx: &OutTx, + id: &str, + success: bool, + result: Option, + error: Option<&str>, +) { let mut resp = serde_json::json!({ "id": id, "success": success }); if let Some(r) = result { resp["result"] = r; diff --git a/rust/crates/proxy-engine/src/leg_io.rs b/rust/crates/proxy-engine/src/leg_io.rs index dc516e4..6341daf 100644 --- a/rust/crates/proxy-engine/src/leg_io.rs +++ b/rust/crates/proxy-engine/src/leg_io.rs @@ -63,7 +63,8 @@ pub fn spawn_sip_inbound( if offset + 4 > n { continue; // Malformed: extension header truncated. } - let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize; + let ext_len = + u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize; offset += 4 + ext_len * 4; } if offset >= n { @@ -74,7 +75,17 @@ pub fn spawn_sip_inbound( if payload.is_empty() { continue; } - if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() { + if inbound_tx + .send(RtpPacket { + payload, + payload_type: pt, + marker, + seq, + timestamp, + }) + .await + .is_err() + { break; // Channel closed — leg removed. } } diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index 4fb4b3e..7f7d57b 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -5,7 +5,6 @@ /// (incoming calls, registration state). /// /// No raw SIP ever touches TypeScript. - mod audio_player; mod call; mod call_manager; @@ -26,7 +25,7 @@ mod voicemail; mod webrtc_engine; use crate::call_manager::CallManager; -use crate::config::AppConfig; +use crate::config::{normalize_routing_identity, AppConfig}; use crate::ipc::{emit_event, respond_err, respond_ok, Command, OutTx}; use crate::provider::ProviderManager; use crate::registrar::Registrar; @@ -266,11 +265,7 @@ async fn handle_sip_packet( } // 2. Device REGISTER — handled by registrar. - let is_from_provider = eng - .provider_mgr - .find_by_address(&from_addr) - .await - .is_some(); + let is_from_provider = eng.provider_mgr.find_by_address(&from_addr).await.is_some(); if !is_from_provider && msg.method() == Some("REGISTER") { if let Some(response_buf) = eng.registrar.handle_register(&msg, from_addr) { @@ -349,11 +344,8 @@ async fn handle_sip_packet( if let Some(inbound) = inbound { // Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc). let from_header = msg.get_header("From").unwrap_or(""); - let from_uri = SipMessage::extract_uri(from_header).unwrap_or("Unknown"); - let called_number = msg - .request_uri() - .and_then(|uri| SipMessage::extract_uri(uri)) - .unwrap_or(""); + let from_uri = normalize_routing_identity(from_header); + let called_number = normalize_routing_identity(msg.request_uri().unwrap_or("")); emit_event( &eng.out_tx, @@ -373,11 +365,7 @@ async fn handle_sip_packet( // 5. New outbound INVITE from device. if !is_from_provider && msg.is_request() && msg.method() == Some("INVITE") { // Resolve outbound route. - let dialed_number = msg - .request_uri() - .and_then(|uri| SipMessage::extract_uri(uri)) - .unwrap_or(msg.request_uri().unwrap_or("")) - .to_string(); + let dialed_number = normalize_routing_identity(msg.request_uri().unwrap_or("")); let device = eng.registrar.find_by_address(&from_addr); let device_id = device.map(|d| d.device_id.clone()); @@ -395,13 +383,18 @@ async fn handle_sip_packet( if let Some(route) = route_result { // Look up provider state by config ID (not by device address). - let (public_ip, registered_aor) = if let Some(ps_arc) = - eng.provider_mgr.find_by_provider_id(&route.provider.id).await + let (public_ip, registered_aor) = if let Some(ps_arc) = eng + .provider_mgr + .find_by_provider_id(&route.provider.id) + .await { let ps = ps_arc.lock().await; (ps.public_ip.clone(), ps.registered_aor.clone()) } else { - (None, format!("sip:{}@{}", route.provider.username, route.provider.domain)) + ( + None, + format!("sip:{}@{}", route.provider.username, route.provider.domain), + ) }; let ProxyEngine { @@ -461,14 +454,20 @@ async fn handle_sip_packet( async fn handle_make_call(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let number = match cmd.params.get("number").and_then(|v| v.as_str()) { Some(n) => n.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing number"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing number"); + return; + } }; let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str()); let mut eng = engine.lock().await; let config_ref = match &eng.config { Some(c) => c.clone(), - None => { respond_err(out_tx, &cmd.id, "not configured"); return; } + None => { + respond_err(out_tx, &cmd.id, "not configured"); + return; + } }; // Resolve provider. @@ -482,49 +481,82 @@ async fn handle_make_call(engine: Arc>, out_tx: &OutTx, cmd: let provider_config = match provider_config { Some(p) => p, - None => { respond_err(out_tx, &cmd.id, "no provider available"); return; } + None => { + respond_err(out_tx, &cmd.id, "no provider available"); + return; + } }; // Get public IP and registered AOR from provider state. - let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_address( - &provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()) - ).await { + let (public_ip, registered_aor) = if let Some(ps_arc) = eng + .provider_mgr + .find_by_address( + &provider_config + .outbound_proxy + .to_socket_addr() + .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()), + ) + .await + { let ps = ps_arc.lock().await; (ps.public_ip.clone(), ps.registered_aor.clone()) } else { // Fallback — construct AOR from config. - (None, format!("sip:{}@{}", provider_config.username, provider_config.domain)) + ( + None, + format!( + "sip:{}@{}", + provider_config.username, provider_config.domain + ), + ) }; let socket = match &eng.transport { Some(t) => t.socket(), - None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + None => { + respond_err(out_tx, &cmd.id, "not initialized"); + return; + } }; - let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng; + let ProxyEngine { + ref mut call_mgr, + ref mut rtp_pool, + .. + } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); - let call_id = call_mgr.make_outbound_call( - &number, - &provider_config, - &config_ref, - rtp_pool, - &socket, - public_ip.as_deref(), - ®istered_aor, - ).await; + let call_id = call_mgr + .make_outbound_call( + &number, + &provider_config, + &config_ref, + rtp_pool, + &socket, + public_ip.as_deref(), + ®istered_aor, + ) + .await; match call_id { Some(id) => { - emit_event(out_tx, "outbound_call_started", serde_json::json!({ - "call_id": id, - "number": number, - "provider_id": provider_config.id, - })); + emit_event( + out_tx, + "outbound_call_started", + serde_json::json!({ + "call_id": id, + "number": number, + "provider_id": provider_config.id, + }), + ); respond_ok(out_tx, &cmd.id, serde_json::json!({ "call_id": id })); } None => { - respond_err(out_tx, &cmd.id, "call origination failed — provider not registered or no ports available"); + respond_err( + out_tx, + &cmd.id, + "call origination failed — provider not registered or no ports available", + ); } } } @@ -560,20 +592,30 @@ async fn handle_hangup(engine: Arc>, out_tx: &OutTx, cmd: &Co async fn handle_webrtc_offer(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing session_id"); + return; + } }; let offer_sdp = match cmd.params.get("sdp").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing sdp"); + return; + } }; let mut wrtc = webrtc.lock().await; match wrtc.handle_offer(&session_id, &offer_sdp).await { Ok(answer_sdp) => { - respond_ok(out_tx, &cmd.id, serde_json::json!({ - "session_id": session_id, - "sdp": answer_sdp, - })); + respond_ok( + out_tx, + &cmd.id, + serde_json::json!({ + "session_id": session_id, + "sdp": answer_sdp, + }), + ); } Err(e) => respond_err(out_tx, &cmd.id, &e), } @@ -584,14 +626,28 @@ async fn handle_webrtc_offer(webrtc: Arc>, out_tx: &OutTx, c async fn handle_webrtc_ice(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing session_id"); + return; + } }; - let candidate = cmd.params.get("candidate").and_then(|v| v.as_str()).unwrap_or(""); + let candidate = cmd + .params + .get("candidate") + .and_then(|v| v.as_str()) + .unwrap_or(""); let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str()); - let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16); + let sdp_mline_index = cmd + .params + .get("sdp_mline_index") + .and_then(|v| v.as_u64()) + .map(|v| v as u16); let wrtc = webrtc.lock().await; - match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await { + match wrtc + .add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index) + .await + { Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})), Err(e) => respond_err(out_tx, &cmd.id, &e), } @@ -608,11 +664,17 @@ async fn handle_webrtc_link( ) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing session_id"); + return; + } }; let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; // Create channels for the WebRTC leg. @@ -641,7 +703,12 @@ async fn handle_webrtc_link( // Lock webrtc to wire the channels. let mut wrtc = webrtc.lock().await; if wrtc - .link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx) + .link_to_mixer( + &session_id, + &call_id, + channels.inbound_tx, + channels.outbound_rx, + ) .await { // Also store the WebRTC leg info in the call. @@ -670,22 +737,30 @@ async fn handle_webrtc_link( } } - emit_event(out_tx, "leg_added", serde_json::json!({ - "call_id": call_id, - "leg_id": session_id, - "kind": "webrtc", - "state": "connected", - "codec": "Opus", - "rtpPort": 0, - "remoteMedia": null, - "metadata": {}, - })); + emit_event( + out_tx, + "leg_added", + serde_json::json!({ + "call_id": call_id, + "leg_id": session_id, + "kind": "webrtc", + "state": "connected", + "codec": "Opus", + "rtpPort": 0, + "remoteMedia": null, + "metadata": {}, + }), + ); - respond_ok(out_tx, &cmd.id, serde_json::json!({ - "session_id": session_id, - "call_id": call_id, - "bridged": true, - })); + respond_ok( + out_tx, + &cmd.id, + serde_json::json!({ + "session_id": session_id, + "call_id": call_id, + "bridged": true, + }), + ); } else { respond_err(out_tx, &cmd.id, &format!("session {session_id} not found")); } @@ -695,45 +770,76 @@ async fn handle_webrtc_link( async fn handle_add_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let number = match cmd.params.get("number").and_then(|v| v.as_str()) { Some(n) => n.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing number"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing number"); + return; + } }; let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str()); let mut eng = engine.lock().await; let config_ref = match &eng.config { Some(c) => c.clone(), - None => { respond_err(out_tx, &cmd.id, "not configured"); return; } + None => { + respond_err(out_tx, &cmd.id, "not configured"); + return; + } }; // Resolve provider. let provider_config = if let Some(pid) = provider_id { config_ref.providers.iter().find(|p| p.id == pid).cloned() } else { - config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider) + config_ref + .resolve_outbound_route(&number, None, &|_| true) + .map(|r| r.provider) }; let provider_config = match provider_config { Some(p) => p, - None => { respond_err(out_tx, &cmd.id, "no provider available"); return; } + None => { + respond_err(out_tx, &cmd.id, "no provider available"); + return; + } }; // Get registered AOR. - let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address( - &provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()) - ).await { + let registered_aor = if let Some(ps_arc) = eng + .provider_mgr + .find_by_address( + &provider_config + .outbound_proxy + .to_socket_addr() + .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()), + ) + .await + { let ps = ps_arc.lock().await; ps.registered_aor.clone() } else { - format!("sip:{}@{}", provider_config.username, provider_config.domain) + format!( + "sip:{}@{}", + provider_config.username, provider_config.domain + ) }; - let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address( - &provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()) - ).await { + let public_ip = if let Some(ps_arc) = eng + .provider_mgr + .find_by_address( + &provider_config + .outbound_proxy + .to_socket_addr() + .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()), + ) + .await + { let ps = ps_arc.lock().await; ps.public_ip.clone() } else { @@ -742,16 +848,31 @@ async fn handle_add_leg(engine: Arc>, out_tx: &OutTx, cmd: &C let socket = match &eng.transport { Some(t) => t.socket(), - None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + None => { + respond_err(out_tx, &cmd.id, "not initialized"); + return; + } }; - let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng; + let ProxyEngine { + ref mut call_mgr, + ref mut rtp_pool, + .. + } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); - let leg_id = call_mgr.add_external_leg( - &call_id, &number, &provider_config, &config_ref, - rtp_pool, &socket, public_ip.as_deref(), ®istered_aor, - ).await; + let leg_id = call_mgr + .add_external_leg( + &call_id, + &number, + &provider_config, + &config_ref, + rtp_pool, + &socket, + public_ip.as_deref(), + ®istered_aor, + ) + .await; match leg_id { Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })), @@ -763,33 +884,61 @@ async fn handle_add_leg(engine: Arc>, out_tx: &OutTx, cmd: &C async fn handle_add_device_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let device_id = match cmd.params.get("device_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing device_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing device_id"); + return; + } }; let mut eng = engine.lock().await; let config_ref = match &eng.config { Some(c) => c.clone(), - None => { respond_err(out_tx, &cmd.id, "not configured"); return; } + None => { + respond_err(out_tx, &cmd.id, "not configured"); + return; + } }; let socket = match &eng.transport { Some(t) => t.socket(), - None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + None => { + respond_err(out_tx, &cmd.id, "not initialized"); + return; + } }; - let ProxyEngine { ref registrar, ref mut call_mgr, ref mut rtp_pool, .. } = *eng; + let ProxyEngine { + ref registrar, + ref mut call_mgr, + ref mut rtp_pool, + .. + } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); - let leg_id = call_mgr.add_device_leg( - &call_id, &device_id, registrar, &config_ref, rtp_pool, &socket, - ).await; + let leg_id = call_mgr + .add_device_leg( + &call_id, + &device_id, + registrar, + &config_ref, + rtp_pool, + &socket, + ) + .await; match leg_id { Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })), - None => respond_err(out_tx, &cmd.id, "failed to add device leg — device not registered or call not found"), + None => respond_err( + out_tx, + &cmd.id, + "failed to add device leg — device not registered or call not found", + ), } } @@ -797,19 +946,32 @@ async fn handle_add_device_leg(engine: Arc>, out_tx: &OutTx, async fn handle_transfer_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let source_call_id = match cmd.params.get("source_call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing source_call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing source_call_id"); + return; + } }; let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing leg_id"); + return; + } }; let target_call_id = match cmd.params.get("target_call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing target_call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing target_call_id"); + return; + } }; let mut eng = engine.lock().await; - if eng.call_mgr.transfer_leg(&source_call_id, &leg_id, &target_call_id).await { + if eng + .call_mgr + .transfer_leg(&source_call_id, &leg_id, &target_call_id) + .await + { respond_ok(out_tx, &cmd.id, serde_json::json!({})); } else { respond_err(out_tx, &cmd.id, "transfer failed — call or leg not found"); @@ -820,57 +982,104 @@ async fn handle_transfer_leg(engine: Arc>, out_tx: &OutTx, cm async fn handle_replace_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let old_leg_id = match cmd.params.get("old_leg_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing old_leg_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing old_leg_id"); + return; + } }; let number = match cmd.params.get("number").and_then(|v| v.as_str()) { Some(n) => n.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing number"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing number"); + return; + } }; let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str()); let mut eng = engine.lock().await; let config_ref = match &eng.config { Some(c) => c.clone(), - None => { respond_err(out_tx, &cmd.id, "not configured"); return; } + None => { + respond_err(out_tx, &cmd.id, "not configured"); + return; + } }; let socket = match &eng.transport { Some(t) => t.socket(), - None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + None => { + respond_err(out_tx, &cmd.id, "not initialized"); + return; + } }; // Resolve provider. let provider_config = if let Some(pid) = provider_id { config_ref.providers.iter().find(|p| p.id == pid).cloned() } else { - config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider) + config_ref + .resolve_outbound_route(&number, None, &|_| true) + .map(|r| r.provider) }; let provider_config = match provider_config { Some(p) => p, - None => { respond_err(out_tx, &cmd.id, "no provider available"); return; } + None => { + respond_err(out_tx, &cmd.id, "no provider available"); + return; + } }; - let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_provider_id(&provider_config.id).await { + let (public_ip, registered_aor) = if let Some(ps_arc) = eng + .provider_mgr + .find_by_provider_id(&provider_config.id) + .await + { let ps = ps_arc.lock().await; (ps.public_ip.clone(), ps.registered_aor.clone()) } else { - (None, format!("sip:{}@{}", provider_config.username, provider_config.domain)) + ( + None, + format!( + "sip:{}@{}", + provider_config.username, provider_config.domain + ), + ) }; - let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng; + let ProxyEngine { + ref mut call_mgr, + ref mut rtp_pool, + .. + } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); - let new_leg_id = call_mgr.replace_leg( - &call_id, &old_leg_id, &number, &provider_config, &config_ref, - rtp_pool, &socket, public_ip.as_deref(), ®istered_aor, - ).await; + let new_leg_id = call_mgr + .replace_leg( + &call_id, + &old_leg_id, + &number, + &provider_config, + &config_ref, + rtp_pool, + &socket, + public_ip.as_deref(), + ®istered_aor, + ) + .await; match new_leg_id { Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "new_leg_id": lid })), - None => respond_err(out_tx, &cmd.id, "replace failed — call ended or dial failed"), + None => respond_err( + out_tx, + &cmd.id, + "replace failed — call ended or dial failed", + ), } } @@ -878,17 +1087,26 @@ async fn handle_replace_leg(engine: Arc>, out_tx: &OutTx, cmd async fn handle_remove_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing leg_id"); + return; + } }; let mut eng = engine.lock().await; let socket = match &eng.transport { Some(t) => t.socket(), - None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + None => { + respond_err(out_tx, &cmd.id, "not initialized"); + return; + } }; if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await { @@ -903,7 +1121,10 @@ async fn handle_remove_leg(engine: Arc>, out_tx: &OutTx, cmd: async fn handle_webrtc_close(webrtc: Arc>, out_tx: &OutTx, cmd: &Command) { let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing session_id"); + return; + } }; let mut wrtc = webrtc.lock().await; @@ -919,22 +1140,27 @@ async fn handle_webrtc_close(webrtc: Arc>, out_tx: &OutTx, c /// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF. /// This command blocks until the interaction completes (digit, timeout, or cancel). -async fn handle_start_interaction( - engine: Arc>, - out_tx: &OutTx, - cmd: &Command, -) { +async fn handle_start_interaction(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing leg_id"); + return; + } }; let prompt_wav = match cmd.params.get("prompt_wav").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing prompt_wav"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing prompt_wav"); + return; + } }; let expected_digits: Vec = cmd .params @@ -1007,10 +1233,8 @@ async fn handle_start_interaction( serde_json::json!(result_str), ); if let Some(ref d) = digit_str { - leg.metadata.insert( - "last_interaction_digit".to_string(), - serde_json::json!(d), - ); + leg.metadata + .insert("last_interaction_digit".to_string(), serde_json::json!(d)); } } } @@ -1024,18 +1248,20 @@ async fn handle_start_interaction( } /// Handle `add_tool_leg` — add a recording or transcription tool leg to a call. -async fn handle_add_tool_leg( - engine: Arc>, - out_tx: &OutTx, - cmd: &Command, -) { +async fn handle_add_tool_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let tool_type_str = match cmd.params.get("tool_type").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing tool_type"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing tool_type"); + return; + } }; let tool_type = match tool_type_str.as_str() { @@ -1066,13 +1292,11 @@ async fn handle_add_tool_leg( out_tx.clone(), ) } - crate::mixer::ToolType::Transcription => { - crate::tool_leg::spawn_transcription_tool( - tool_leg_id.clone(), - call_id.clone(), - out_tx.clone(), - ) - } + crate::mixer::ToolType::Transcription => crate::tool_leg::spawn_transcription_tool( + tool_leg_id.clone(), + call_id.clone(), + out_tx.clone(), + ), }; // Send AddToolLeg to the mixer and register in call. @@ -1097,10 +1321,7 @@ async fn handle_add_tool_leg( // Register tool leg in the call's leg map. let mut metadata = std::collections::HashMap::new(); - metadata.insert( - "tool_type".to_string(), - serde_json::json!(tool_type_str), - ); + metadata.insert("tool_type".to_string(), serde_json::json!(tool_type_str)); call.legs.insert( tool_leg_id.clone(), crate::call::LegInfo { @@ -1144,18 +1365,20 @@ async fn handle_add_tool_leg( } /// Handle `remove_tool_leg` — remove a tool leg from a call. -async fn handle_remove_tool_leg( - engine: Arc>, - out_tx: &OutTx, - cmd: &Command, -) { +async fn handle_remove_tool_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let tool_leg_id = match cmd.params.get("tool_leg_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing tool_leg_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing tool_leg_id"); + return; + } }; let mut eng = engine.lock().await; @@ -1191,26 +1414,34 @@ async fn handle_remove_tool_leg( } /// Handle `set_leg_metadata` — set a metadata key on a leg. -async fn handle_set_leg_metadata( - engine: Arc>, - out_tx: &OutTx, - cmd: &Command, -) { +async fn handle_set_leg_metadata(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing call_id"); + return; + } }; let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing leg_id"); + return; + } }; let key = match cmd.params.get("key").and_then(|v| v.as_str()) { Some(s) => s.to_string(), - None => { respond_err(out_tx, &cmd.id, "missing key"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing key"); + return; + } }; let value = match cmd.params.get("value") { Some(v) => v.clone(), - None => { respond_err(out_tx, &cmd.id, "missing value"); return; } + None => { + respond_err(out_tx, &cmd.id, "missing value"); + return; + } }; let mut eng = engine.lock().await; @@ -1234,11 +1465,7 @@ async fn handle_set_leg_metadata( } /// Handle `generate_tts` — synthesize text to a WAV file using Kokoro TTS. -async fn handle_generate_tts( - engine: Arc>, - out_tx: &OutTx, - cmd: &Command, -) { +async fn handle_generate_tts(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let tts_engine = engine.lock().await.tts_engine.clone(); let mut tts = tts_engine.lock().await; match tts.generate(&cmd.params).await { diff --git a/rust/crates/proxy-engine/src/mixer.rs b/rust/crates/proxy-engine/src/mixer.rs index 4301647..0b168cc 100644 --- a/rust/crates/proxy-engine/src/mixer.rs +++ b/rust/crates/proxy-engine/src/mixer.rs @@ -7,7 +7,8 @@ //! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz. //! //! The mixer runs a 20ms tick loop: -//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg +//! 1. Drain inbound channels, reorder RTP, decode variable-duration packets to 48kHz, +//! and queue them in per-leg PCM buffers //! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64) //! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send //! 4. For each isolated leg: play prompt frame or silence, check DTMF @@ -16,7 +17,7 @@ use crate::ipc::{emit_event, OutTx}; use crate::jitter_buffer::{JitterBuffer, JitterResult}; -use crate::rtp::{build_rtp_header, rtp_clock_increment}; +use crate::rtp::{build_rtp_header, rtp_clock_increment, rtp_clock_rate}; use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState}; use nnnoiseless::DenoiseState; use std::collections::{HashMap, VecDeque}; @@ -29,6 +30,12 @@ use tokio::time::{self, Duration, MissedTickBehavior}; const MIX_RATE: u32 = 48000; /// Samples per 20ms frame at the mixing rate. const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020 +/// Safety cap for how much timestamp-derived gap fill we synthesize at once. +const MAX_GAP_FILL_SAMPLES: usize = MIX_FRAME_SIZE * 6; // 120ms +/// Bound how many decode / concealment steps a leg can consume in one tick. +const MAX_PACKET_STEPS_PER_TICK: usize = 24; +/// Report the first output drop immediately, then every N drops. +const DROP_REPORT_INTERVAL: u64 = 50; /// A raw RTP payload received from a leg (no RTP header). pub struct RtpPacket { @@ -39,10 +46,6 @@ pub struct RtpPacket { /// RTP sequence number for reordering. pub seq: u16, /// RTP timestamp from the original packet header. - /// - /// Set on inbound RTP but not yet consumed downstream — reserved for - /// future jitter/sync work in the mixer. - #[allow(dead_code)] pub timestamp: u32, } @@ -109,6 +112,7 @@ struct ToolLegSlot { #[allow(dead_code)] tool_type: ToolType, audio_tx: mpsc::Sender, + dropped_batches: u64, } // --------------------------------------------------------------------------- @@ -163,8 +167,15 @@ struct MixerLegSlot { denoiser: Box>, inbound_rx: mpsc::Receiver, outbound_tx: mpsc::Sender>, + /// Decoded PCM waiting for playout. Variable-duration RTP packets are + /// decoded into this FIFO; the mixer consumes exactly one 20ms frame per tick. + pcm_buffer: VecDeque, /// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32). last_pcm_frame: Vec, + /// Next RTP timestamp expected from the inbound stream. + expected_rtp_timestamp: Option, + /// Best-effort estimate of packet duration in RTP clock units. + estimated_packet_ts: u32, /// Number of consecutive ticks with no inbound packet. silent_ticks: u32, /// Per-leg jitter buffer for packet reordering and timing. @@ -173,15 +184,242 @@ struct MixerLegSlot { rtp_seq: u16, rtp_ts: u32, rtp_ssrc: u32, + /// Dropped outbound frames for this leg (queue full / closed). + outbound_drops: u64, /// Current role of this leg in the mixer. role: LegRole, } +fn mix_samples_to_rtp_ts(codec_pt: u8, mix_samples: usize) -> u32 { + let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64; + (((mix_samples as u64 * clock_rate) + (MIX_RATE as u64 / 2)) / MIX_RATE as u64) as u32 +} + +fn rtp_ts_to_mix_samples(codec_pt: u8, rtp_ts: u32) -> usize { + let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64; + (((rtp_ts as u64 * MIX_RATE as u64) + (clock_rate / 2)) / clock_rate) as usize +} + +fn is_forward_rtp_delta(delta: u32) -> bool { + delta > 0 && delta < 0x8000_0000 +} + +fn should_emit_drop_event(total_drops: u64) -> bool { + total_drops == 1 || total_drops % DROP_REPORT_INTERVAL == 0 +} + +fn emit_output_drop_event( + out_tx: &OutTx, + call_id: &str, + leg_id: Option<&str>, + tool_leg_id: Option<&str>, + stream: &str, + reason: &str, + total_drops: u64, +) { + if !should_emit_drop_event(total_drops) { + return; + } + + emit_event( + out_tx, + "mixer_output_drop", + serde_json::json!({ + "call_id": call_id, + "leg_id": leg_id, + "tool_leg_id": tool_leg_id, + "stream": stream, + "reason": reason, + "total_drops": total_drops, + }), + ); +} + +fn fade_concealment_from_last_frame(slot: &mut MixerLegSlot, samples: usize, decay: f32) { + let mut template = if slot.last_pcm_frame.is_empty() { + vec![0.0f32; MIX_FRAME_SIZE] + } else { + slot.last_pcm_frame.clone() + }; + + let mut remaining = samples; + while remaining > 0 { + for sample in &mut template { + *sample *= decay; + } + let take = remaining.min(template.len()); + slot.pcm_buffer.extend(template.iter().take(take).copied()); + remaining -= take; + } +} + +fn append_packet_loss_concealment(slot: &mut MixerLegSlot, samples: usize) { + let mut remaining = samples.max(1); + while remaining > 0 { + let chunk = remaining.min(MIX_FRAME_SIZE); + if slot.codec_pt == codec_lib::PT_OPUS { + match slot.transcoder.opus_plc(chunk) { + Ok(mut pcm) => { + pcm.resize(chunk, 0.0); + slot.pcm_buffer.extend(pcm); + } + Err(_) => fade_concealment_from_last_frame(slot, chunk, 0.8), + } + } else { + fade_concealment_from_last_frame(slot, chunk, 0.85); + } + remaining -= chunk; + } +} + +fn decode_packet_to_mix_pcm(slot: &mut MixerLegSlot, pkt: &RtpPacket) -> Option> { + let (pcm, rate) = slot + .transcoder + .decode_to_f32(&pkt.payload, pkt.payload_type) + .ok()?; + + let pcm_48k = if rate == MIX_RATE { + pcm + } else { + slot.transcoder + .resample_f32(&pcm, rate, MIX_RATE) + .unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE]) + }; + + let processed = if slot.codec_pt != codec_lib::PT_OPUS { + TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k) + } else { + pcm_48k + }; + + Some(processed) +} + +fn queue_inbound_packet(slot: &mut MixerLegSlot, pkt: RtpPacket) { + if let Some(pcm_48k) = decode_packet_to_mix_pcm(slot, &pkt) { + if pcm_48k.is_empty() { + return; + } + + if let Some(expected_ts) = slot.expected_rtp_timestamp { + let gap_ts = pkt.timestamp.wrapping_sub(expected_ts); + if is_forward_rtp_delta(gap_ts) { + let gap_samples = rtp_ts_to_mix_samples(slot.codec_pt, gap_ts); + if gap_samples <= MAX_GAP_FILL_SAMPLES { + append_packet_loss_concealment(slot, gap_samples); + } else { + slot.pcm_buffer.clear(); + } + } + } + + let packet_ts = mix_samples_to_rtp_ts(slot.codec_pt, pcm_48k.len()); + if packet_ts > 0 { + slot.estimated_packet_ts = packet_ts; + slot.expected_rtp_timestamp = Some(pkt.timestamp.wrapping_add(packet_ts)); + } + slot.pcm_buffer.extend(pcm_48k); + } +} + +fn fill_leg_playout_buffer(slot: &mut MixerLegSlot) { + let mut steps = 0usize; + while slot.pcm_buffer.len() < MIX_FRAME_SIZE && steps < MAX_PACKET_STEPS_PER_TICK { + steps += 1; + match slot.jitter.consume() { + JitterResult::Packet(pkt) => queue_inbound_packet(slot, pkt), + JitterResult::Missing => { + let conceal_ts = slot.estimated_packet_ts.max(rtp_clock_increment(slot.codec_pt)); + let conceal_samples = rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts) + .clamp(1, MAX_GAP_FILL_SAMPLES); + append_packet_loss_concealment(slot, conceal_samples); + if let Some(expected_ts) = slot.expected_rtp_timestamp { + slot.expected_rtp_timestamp = Some(expected_ts.wrapping_add(conceal_ts)); + } + } + JitterResult::Filling => break, + } + } +} + +fn take_mix_frame(slot: &mut MixerLegSlot) -> Vec { + let mut frame = Vec::with_capacity(MIX_FRAME_SIZE); + while frame.len() < MIX_FRAME_SIZE { + if let Some(sample) = slot.pcm_buffer.pop_front() { + frame.push(sample); + } else { + frame.push(0.0); + } + } + frame +} + +fn soft_limit_sample(sample: f32) -> f32 { + const KNEE: f32 = 0.85; + + let abs = sample.abs(); + if abs <= KNEE { + sample + } else { + let excess = abs - KNEE; + let compressed = KNEE + (excess / (1.0 + (excess / (1.0 - KNEE)))); + sample.signum() * compressed.min(1.0) + } +} + +fn try_send_leg_output( + out_tx: &OutTx, + call_id: &str, + leg_id: &str, + slot: &mut MixerLegSlot, + rtp: Vec, + stream: &str, +) { + let reason = match slot.outbound_tx.try_send(rtp) { + Ok(()) => return, + Err(mpsc::error::TrySendError::Full(_)) => "full", + Err(mpsc::error::TrySendError::Closed(_)) => "closed", + }; + + slot.outbound_drops += 1; + emit_output_drop_event( + out_tx, + call_id, + Some(leg_id), + None, + stream, + reason, + slot.outbound_drops, + ); +} + +fn try_send_tool_output( + out_tx: &OutTx, + call_id: &str, + tool_leg_id: &str, + tool: &mut ToolLegSlot, + batch: ToolAudioBatch, +) { + let reason = match tool.audio_tx.try_send(batch) { + Ok(()) => return, + Err(mpsc::error::TrySendError::Full(_)) => "full", + Err(mpsc::error::TrySendError::Closed(_)) => "closed", + }; + + tool.dropped_batches += 1; + emit_output_drop_event( + out_tx, + call_id, + None, + Some(tool_leg_id), + "tool-batch", + reason, + tool.dropped_batches, + ); +} + /// Spawn the mixer task for a call. Returns the command sender and task handle. -pub fn spawn_mixer( - call_id: String, - out_tx: OutTx, -) -> (mpsc::Sender, JoinHandle<()>) { +pub fn spawn_mixer(call_id: String, out_tx: OutTx) -> (mpsc::Sender, JoinHandle<()>) { let (cmd_tx, cmd_rx) = mpsc::channel::(32); let handle = tokio::spawn(async move { @@ -192,11 +430,7 @@ pub fn spawn_mixer( } /// The 20ms mixing loop. -async fn mixer_loop( - call_id: String, - mut cmd_rx: mpsc::Receiver, - out_tx: OutTx, -) { +async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver, out_tx: OutTx) { let mut legs: HashMap = HashMap::new(); let mut tool_legs: HashMap = HashMap::new(); let mut interval = time::interval(Duration::from_millis(20)); @@ -237,11 +471,15 @@ async fn mixer_loop( denoiser: new_denoiser(), inbound_rx, outbound_tx, + pcm_buffer: VecDeque::new(), last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE], + expected_rtp_timestamp: None, + estimated_packet_ts: rtp_clock_increment(codec_pt), silent_ticks: 0, rtp_seq: 0, rtp_ts: 0, rtp_ssrc: rand::random(), + outbound_drops: 0, role: LegRole::Participant, jitter: JitterBuffer::new(), }, @@ -302,7 +540,14 @@ async fn mixer_loop( tool_type, audio_tx, }) => { - tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx }); + tool_legs.insert( + leg_id, + ToolLegSlot { + tool_type, + audio_tx, + dropped_batches: 0, + }, + ); } Ok(MixerCommand::RemoveToolLeg { leg_id }) => { tool_legs.remove(&leg_id); @@ -343,54 +588,11 @@ async fn mixer_loop( } } - // Step 2b: Consume exactly one frame from the jitter buffer. - match slot.jitter.consume() { - JitterResult::Packet(pkt) => { - match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) { - Ok((pcm, rate)) => { - let pcm_48k = if rate == MIX_RATE { - pcm - } else { - slot.transcoder - .resample_f32(&pcm, rate, MIX_RATE) - .unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE]) - }; - let processed = if slot.codec_pt != codec_lib::PT_OPUS { - TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k) - } else { - pcm_48k - }; - let mut frame = processed; - frame.resize(MIX_FRAME_SIZE, 0.0); - slot.last_pcm_frame = frame; - } - Err(_) => {} - } - } - JitterResult::Missing => { - // Invoke Opus PLC or fade for non-Opus codecs. - if slot.codec_pt == codec_lib::PT_OPUS { - match slot.transcoder.opus_plc(MIX_FRAME_SIZE) { - Ok(pcm) => { - slot.last_pcm_frame = pcm; - } - Err(_) => { - for s in slot.last_pcm_frame.iter_mut() { - *s *= 0.8; - } - } - } - } else { - // Non-Opus: fade last frame toward silence. - for s in slot.last_pcm_frame.iter_mut() { - *s *= 0.85; - } - } - } - JitterResult::Filling => { - slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; - } - } + // Step 2b: Decode enough RTP to cover one 20ms playout frame. + // Variable-duration packets (10ms, 20ms, 60ms, ...) accumulate in + // the per-leg PCM FIFO; we pop exactly one 20ms frame below. + fill_leg_playout_buffer(slot); + slot.last_pcm_frame = take_mix_frame(slot); // Run jitter adaptation + prune stale packets. slot.jitter.adapt(); @@ -404,6 +606,9 @@ async fn mixer_loop( } if slot.silent_ticks > 150 { slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; + slot.pcm_buffer.clear(); + slot.expected_rtp_timestamp = None; + slot.estimated_packet_ts = rtp_clock_increment(slot.codec_pt); } } @@ -426,12 +631,12 @@ async fn mixer_loop( for (lid, slot) in legs.iter_mut() { match &mut slot.role { LegRole::Participant => { - // Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0]. + // Mix-minus: total minus this leg's own contribution. + // Apply a light soft limiter instead of hard clipping the sum. let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE); for i in 0..MIX_FRAME_SIZE { - let sample = - (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32; - mix_minus.push(sample.clamp(-1.0, 1.0)); + let sample = (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32; + mix_minus.push(soft_limit_sample(sample)); } // Resample from 48kHz to the leg's codec native rate. @@ -445,11 +650,10 @@ async fn mixer_loop( }; // Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32). - let encoded = - match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) { - Ok(e) if !e.is_empty() => e, - _ => continue, - }; + let encoded = match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) { + Ok(e) if !e.is_empty() => e, + _ => continue, + }; // Build RTP packet with header. let header = @@ -460,8 +664,7 @@ async fn mixer_loop( slot.rtp_seq = slot.rtp_seq.wrapping_add(1); slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); - // Non-blocking send — drop frame if channel is full. - let _ = slot.outbound_tx.try_send(rtp); + try_send_leg_output(&out_tx, &call_id, lid, slot, rtp, "participant-audio"); } LegRole::Isolated(state) => { // Check for DTMF digit from this leg. @@ -487,8 +690,7 @@ async fn mixer_loop( if let Some(digit) = matched_digit { // Interaction complete — digit matched. - completed_interactions - .push((lid.clone(), InteractionResult::Digit(digit))); + completed_interactions.push((lid.clone(), InteractionResult::Digit(digit))); } else { // Play prompt frame or silence. let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() { @@ -508,6 +710,7 @@ async fn mixer_loop( .unwrap_or_default() }; + let mut prompt_rtp: Option> = None; if let Ok(encoded) = slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) { @@ -521,10 +724,9 @@ async fn mixer_loop( let mut rtp = header.to_vec(); rtp.extend_from_slice(&encoded); slot.rtp_seq = slot.rtp_seq.wrapping_add(1); - slot.rtp_ts = slot - .rtp_ts - .wrapping_add(rtp_clock_increment(slot.codec_pt)); - let _ = slot.outbound_tx.try_send(rtp); + slot.rtp_ts = + slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); + prompt_rtp = Some(rtp); } } @@ -537,6 +739,17 @@ async fn mixer_loop( state.timeout_ticks_remaining -= 1; } } + + if let Some(rtp) = prompt_rtp { + try_send_leg_output( + &out_tx, + &call_id, + lid, + slot, + rtp, + "isolated-prompt", + ); + } } } } @@ -566,7 +779,7 @@ async fn mixer_loop( }) .collect(); - for tool in tool_legs.values() { + for (tool_leg_id, tool) in tool_legs.iter_mut() { let batch = ToolAudioBatch { sources: sources .iter() @@ -576,8 +789,7 @@ async fn mixer_loop( }) .collect(), }; - // Non-blocking send — drop batch if tool can't keep up. - let _ = tool.audio_tx.try_send(batch); + try_send_tool_output(&out_tx, &call_id, tool_leg_id, tool, batch); } } @@ -610,7 +822,14 @@ async fn mixer_loop( rtp_out.extend_from_slice(&dtmf_pkt.payload); target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1); // Don't increment rtp_ts for DTMF — it shares timestamp context with audio. - let _ = target_slot.outbound_tx.try_send(rtp_out); + try_send_leg_output( + &out_tx, + &call_id, + target_lid, + target_slot, + rtp_out, + "dtmf", + ); } } } diff --git a/rust/crates/proxy-engine/src/provider.rs b/rust/crates/proxy-engine/src/provider.rs index ecbd406..cf3fec1 100644 --- a/rust/crates/proxy-engine/src/provider.rs +++ b/rust/crates/proxy-engine/src/provider.rs @@ -267,11 +267,7 @@ impl ProviderManager { /// Try to handle a SIP response as a provider registration response. /// Returns true if consumed. - pub async fn handle_response( - &self, - msg: &SipMessage, - socket: &UdpSocket, - ) -> bool { + pub async fn handle_response(&self, msg: &SipMessage, socket: &UdpSocket) -> bool { for ps_arc in &self.providers { let mut ps = ps_arc.lock().await; let was_registered = ps.is_registered; @@ -322,7 +318,10 @@ impl ProviderManager { } /// Find a provider by its config ID (e.g. "easybell"). - pub async fn find_by_provider_id(&self, provider_id: &str) -> Option>> { + pub async fn find_by_provider_id( + &self, + provider_id: &str, + ) -> Option>> { for ps_arc in &self.providers { let ps = ps_arc.lock().await; if ps.config.id == provider_id { diff --git a/rust/crates/proxy-engine/src/recorder.rs b/rust/crates/proxy-engine/src/recorder.rs index e942b71..dc9db6b 100644 --- a/rust/crates/proxy-engine/src/recorder.rs +++ b/rust/crates/proxy-engine/src/recorder.rs @@ -25,8 +25,7 @@ impl Recorder { ) -> Result { // Ensure parent directory exists. if let Some(parent) = Path::new(file_path).parent() { - std::fs::create_dir_all(parent) - .map_err(|e| format!("create dir: {e}"))?; + std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?; } let sample_rate = 8000u32; // Record at 8kHz (standard telephony) @@ -57,10 +56,13 @@ impl Recorder { /// Create a recorder that writes raw PCM at a given sample rate. /// Used by tool legs that already have decoded PCM (no RTP processing needed). - pub fn new_pcm(file_path: &str, sample_rate: u32, max_duration_ms: Option) -> Result { + pub fn new_pcm( + file_path: &str, + sample_rate: u32, + max_duration_ms: Option, + ) -> Result { if let Some(parent) = Path::new(file_path).parent() { - std::fs::create_dir_all(parent) - .map_err(|e| format!("create dir: {e}"))?; + std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?; } let spec = hound::WavSpec { diff --git a/rust/crates/proxy-engine/src/registrar.rs b/rust/crates/proxy-engine/src/registrar.rs index 0180eea..4aba0fd 100644 --- a/rust/crates/proxy-engine/src/registrar.rs +++ b/rust/crates/proxy-engine/src/registrar.rs @@ -60,18 +60,17 @@ impl Registrar { /// Try to handle a SIP REGISTER from a device. /// Returns Some(response_bytes) if handled, None if not a known device. - pub fn handle_register( - &mut self, - msg: &SipMessage, - from_addr: SocketAddr, - ) -> Option> { + pub fn handle_register(&mut self, msg: &SipMessage, from_addr: SocketAddr) -> Option> { if msg.method() != Some("REGISTER") { return None; } // Find the device by matching the source IP against expectedAddress. let from_ip = from_addr.ip().to_string(); - let device = self.devices.iter().find(|d| d.expected_address == from_ip)?; + let device = self + .devices + .iter() + .find(|d| d.expected_address == from_ip)?; let from_header = msg.get_header("From").unwrap_or(""); let aor = SipMessage::extract_uri(from_header) @@ -79,9 +78,7 @@ impl Registrar { .unwrap_or_else(|| format!("sip:{}@{}", device.extension, from_ip)); let expires_header = msg.get_header("Expires"); - let requested: u32 = expires_header - .and_then(|s| s.parse().ok()) - .unwrap_or(3600); + let requested: u32 = expires_header.and_then(|s| s.parse().ok()).unwrap_or(3600); let expires = requested.min(MAX_EXPIRES); let entry = RegisteredDevice { @@ -122,10 +119,7 @@ impl Registrar { Some(ResponseOptions { to_tag: Some(generate_tag()), contact: Some(contact), - extra_headers: Some(vec![( - "Expires".to_string(), - expires.to_string(), - )]), + extra_headers: Some(vec![("Expires".to_string(), expires.to_string())]), ..Default::default() }), ); @@ -145,8 +139,8 @@ impl Registrar { /// Find a registered device by its source IP address. pub fn find_by_address(&self, addr: &SocketAddr) -> Option<&RegisteredDevice> { let ip = addr.ip().to_string(); - self.registered.values().find(|e| { - e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at - }) + self.registered + .values() + .find(|e| e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at) } } diff --git a/rust/crates/proxy-engine/src/rtp.rs b/rust/crates/proxy-engine/src/rtp.rs index 4d38180..28b8334 100644 --- a/rust/crates/proxy-engine/src/rtp.rs +++ b/rust/crates/proxy-engine/src/rtp.rs @@ -82,10 +82,15 @@ pub fn build_rtp_header(pt: u8, seq: u16, timestamp: u32, ssrc: u32) -> [u8; 12] /// Get the RTP clock increment per 20ms frame for a payload type. pub fn rtp_clock_increment(pt: u8) -> u32 { + rtp_clock_rate(pt) / 50 +} + +/// Get the RTP clock rate for a payload type. +pub fn rtp_clock_rate(pt: u8) -> u32 { match pt { - 9 => 160, // G.722: 8000 Hz clock rate (despite 16kHz audio) × 0.02s - 0 | 8 => 160, // PCMU/PCMA: 8000 × 0.02 - 111 => 960, // Opus: 48000 × 0.02 - _ => 160, + 9 => 8000, // G.722 uses an 8kHz RTP clock despite 16kHz audio. + 0 | 8 => 8000, // PCMU/PCMA + 111 => 48000, // Opus + _ => 8000, } } diff --git a/rust/crates/proxy-engine/src/sip_leg.rs b/rust/crates/proxy-engine/src/sip_leg.rs index 911e36c..2f3f372 100644 --- a/rust/crates/proxy-engine/src/sip_leg.rs +++ b/rust/crates/proxy-engine/src/sip_leg.rs @@ -128,17 +128,24 @@ impl SipLeg { max_forwards: Some(70), body: Some(sdp), content_type: Some("application/sdp".to_string()), - extra_headers: Some(vec![ - ("User-Agent".to_string(), "SipRouter/1.0".to_string()), - ]), + extra_headers: Some(vec![( + "User-Agent".to_string(), + "SipRouter/1.0".to_string(), + )]), }, ); - self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port)); + self.dialog = Some(SipDialog::from_uac_invite( + &invite, + ip, + self.config.lan_port, + )); self.invite = Some(invite.clone()); self.state = LegState::Inviting; - let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await; + let _ = socket + .send_to(&invite.serialize(), self.config.sip_target) + .await; } /// Handle an incoming SIP message routed to this leg. @@ -443,10 +450,7 @@ pub enum SipLegAction { /// Build an ACK for a non-2xx response (same transaction as the INVITE). fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage { let via = original_invite.get_header("Via").unwrap_or("").to_string(); - let from = original_invite - .get_header("From") - .unwrap_or("") - .to_string(); + let from = original_invite.get_header("From").unwrap_or("").to_string(); let to = response.get_header("To").unwrap_or("").to_string(); let call_id = original_invite.call_id().to_string(); let cseq_num: u32 = original_invite diff --git a/rust/crates/proxy-engine/src/sip_transport.rs b/rust/crates/proxy-engine/src/sip_transport.rs index fc0a48a..2833450 100644 --- a/rust/crates/proxy-engine/src/sip_transport.rs +++ b/rust/crates/proxy-engine/src/sip_transport.rs @@ -28,10 +28,8 @@ impl SipTransport { } /// Spawn the UDP receive loop. Calls the handler for every received packet. - pub fn spawn_receiver( - &self, - handler: F, - ) where + pub fn spawn_receiver(&self, handler: F) + where F: Fn(&[u8], SocketAddr) + Send + 'static, { let socket = self.socket.clone(); diff --git a/rust/crates/proxy-engine/src/tool_leg.rs b/rust/crates/proxy-engine/src/tool_leg.rs index d4d15b7..1ba1fe5 100644 --- a/rust/crates/proxy-engine/src/tool_leg.rs +++ b/rust/crates/proxy-engine/src/tool_leg.rs @@ -51,7 +51,8 @@ pub fn spawn_recording_tool( }); // Convert f32 [-1.0, 1.0] to i16 for WAV writing. - let pcm_i16: Vec = source.pcm_48k + let pcm_i16: Vec = source + .pcm_48k .iter() .map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16) .collect(); diff --git a/rust/crates/proxy-engine/src/tts.rs b/rust/crates/proxy-engine/src/tts.rs index 93c0562..da0836a 100644 --- a/rust/crates/proxy-engine/src/tts.rs +++ b/rust/crates/proxy-engine/src/tts.rs @@ -39,18 +39,33 @@ impl TtsEngine { /// - `output`: output WAV file path /// - `cacheable`: if true, skip synthesis when the output WAV already /// matches the same text+voice (checked via a `.meta` sidecar file) - pub async fn generate(&mut self, params: &serde_json::Value) -> Result { - let model_path = params.get("model").and_then(|v| v.as_str()) + pub async fn generate( + &mut self, + params: &serde_json::Value, + ) -> Result { + let model_path = params + .get("model") + .and_then(|v| v.as_str()) .ok_or("missing 'model' param")?; - let voices_path = params.get("voices").and_then(|v| v.as_str()) + let voices_path = params + .get("voices") + .and_then(|v| v.as_str()) .ok_or("missing 'voices' param")?; - let voice_name = params.get("voice").and_then(|v| v.as_str()) + let voice_name = params + .get("voice") + .and_then(|v| v.as_str()) .unwrap_or("af_bella"); - let text = params.get("text").and_then(|v| v.as_str()) + let text = params + .get("text") + .and_then(|v| v.as_str()) .ok_or("missing 'text' param")?; - let output_path = params.get("output").and_then(|v| v.as_str()) + let output_path = params + .get("output") + .and_then(|v| v.as_str()) .ok_or("missing 'output' param")?; - let cacheable = params.get("cacheable").and_then(|v| v.as_bool()) + let cacheable = params + .get("cacheable") + .and_then(|v| v.as_bool()) .unwrap_or(false); if text.is_empty() { @@ -94,10 +109,14 @@ impl TtsEngine { let voice = select_voice(voice_name); eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\""); - let (samples, duration) = tts.synth(text, voice) + let (samples, duration) = tts + .synth(text, voice) .await .map_err(|e| format!("synthesis failed: {e:?}"))?; - eprintln!("[tts] synthesized {} samples in {duration:?}", samples.len()); + eprintln!( + "[tts] synthesized {} samples in {duration:?}", + samples.len() + ); // Write 24kHz 16-bit mono WAV. let spec = hound::WavSpec { @@ -111,9 +130,13 @@ impl TtsEngine { .map_err(|e| format!("WAV create failed: {e}"))?; for &sample in &samples { let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16; - writer.write_sample(s16).map_err(|e| format!("WAV write: {e}"))?; + writer + .write_sample(s16) + .map_err(|e| format!("WAV write: {e}"))?; } - writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?; + writer + .finalize() + .map_err(|e| format!("WAV finalize: {e}"))?; // Write sidecar for future cache checks. if cacheable { diff --git a/rust/crates/proxy-engine/src/voicemail.rs b/rust/crates/proxy-engine/src/voicemail.rs index 06d40a0..71e8329 100644 --- a/rust/crates/proxy-engine/src/voicemail.rs +++ b/rust/crates/proxy-engine/src/voicemail.rs @@ -128,8 +128,8 @@ async fn record_from_socket( break; // Max duration reached. } } - Ok(Err(_)) => break, // Socket error (closed). - Err(_) => break, // Timeout (max duration + grace). + Ok(Err(_)) => break, // Socket error (closed). + Err(_) => break, // Timeout (max duration + grace). } } diff --git a/rust/crates/proxy-engine/src/webrtc_engine.rs b/rust/crates/proxy-engine/src/webrtc_engine.rs index 3c76937..fbca1fd 100644 --- a/rust/crates/proxy-engine/src/webrtc_engine.rs +++ b/rust/crates/proxy-engine/src/webrtc_engine.rs @@ -58,9 +58,7 @@ impl WebRtcEngine { .register_default_codecs() .map_err(|e| format!("register codecs: {e}"))?; - let api = APIBuilder::new() - .with_media_engine(media_engine) - .build(); + let api = APIBuilder::new().with_media_engine(media_engine).build(); let config = RTCConfiguration { ice_servers: vec![], @@ -91,8 +89,7 @@ impl WebRtcEngine { .map_err(|e| format!("add track: {e}"))?; // Shared mixer channel sender (populated when linked to a call). - let mixer_tx: Arc>>> = - Arc::new(Mutex::new(None)); + let mixer_tx: Arc>>> = Arc::new(Mutex::new(None)); // ICE candidate handler. let out_tx_ice = self.out_tx.clone(); @@ -256,7 +253,11 @@ impl WebRtcEngine { pub async fn close_session(&mut self, session_id: &str) -> Result<(), String> { if let Some(session) = self.sessions.remove(session_id) { - session.pc.close().await.map_err(|e| format!("close: {e}"))?; + session + .pc + .close() + .await + .map_err(|e| format!("close: {e}"))?; } Ok(()) } diff --git a/rust/crates/sip-proto/src/dialog.rs b/rust/crates/sip-proto/src/dialog.rs index 9f9bb44..347c043 100644 --- a/rust/crates/sip-proto/src/dialog.rs +++ b/rust/crates/sip-proto/src/dialog.rs @@ -51,9 +51,7 @@ impl SipDialog { .map(|s| s.to_string()) .unwrap_or_else(generate_tag), remote_tag: None, - local_uri: SipMessage::extract_uri(from) - .unwrap_or("") - .to_string(), + local_uri: SipMessage::extract_uri(from).unwrap_or("").to_string(), remote_uri: SipMessage::extract_uri(to).unwrap_or("").to_string(), local_cseq, remote_cseq: 0, @@ -181,10 +179,7 @@ impl SipDialog { format!("<{}>{remote_tag_str}", self.remote_uri), ), ("Call-ID".to_string(), self.call_id.clone()), - ( - "CSeq".to_string(), - format!("{} {method}", self.local_cseq), - ), + ("CSeq".to_string(), format!("{} {method}", self.local_cseq)), ("Max-Forwards".to_string(), "70".to_string()), ]; @@ -243,10 +238,7 @@ impl SipDialog { format!("<{}>{remote_tag_str}", self.remote_uri), ), ("Call-ID".to_string(), self.call_id.clone()), - ( - "CSeq".to_string(), - format!("{} ACK", self.local_cseq), - ), + ("CSeq".to_string(), format!("{} ACK", self.local_cseq)), ("Max-Forwards".to_string(), "70".to_string()), ]; @@ -271,10 +263,7 @@ impl SipDialog { ("From".to_string(), from), ("To".to_string(), to), ("Call-ID".to_string(), self.call_id.clone()), - ( - "CSeq".to_string(), - format!("{} CANCEL", self.local_cseq), - ), + ("CSeq".to_string(), format!("{} CANCEL", self.local_cseq)), ("Max-Forwards".to_string(), "70".to_string()), ("Content-Length".to_string(), "0".to_string()), ]; @@ -284,11 +273,7 @@ impl SipDialog { .unwrap_or(&self.remote_target) .to_string(); - SipMessage::new( - format!("CANCEL {ruri} SIP/2.0"), - headers, - String::new(), - ) + SipMessage::new(format!("CANCEL {ruri} SIP/2.0"), headers, String::new()) } /// Transition the dialog to terminated state. diff --git a/rust/crates/sip-proto/src/helpers.rs b/rust/crates/sip-proto/src/helpers.rs index 3bcc455..6929aa6 100644 --- a/rust/crates/sip-proto/src/helpers.rs +++ b/rust/crates/sip-proto/src/helpers.rs @@ -27,7 +27,9 @@ pub fn generate_branch() -> String { fn random_hex(bytes: usize) -> String { let mut rng = rand::thread_rng(); - (0..bytes).map(|_| format!("{:02x}", rng.gen::())).collect() + (0..bytes) + .map(|_| format!("{:02x}", rng.gen::())) + .collect() } // ---- Codec registry -------------------------------------------------------- @@ -142,7 +144,9 @@ pub fn parse_digest_challenge(header: &str) -> Option { return Some(after[1..1 + end].to_string()); } // Unquoted value. - let end = after.find(|c: char| c == ',' || c.is_whitespace()).unwrap_or(after.len()); + let end = after + .find(|c: char| c == ',' || c.is_whitespace()) + .unwrap_or(after.len()); return Some(after[..end].to_string()); } None @@ -241,11 +245,7 @@ pub struct MwiResult { pub extra_headers: Vec<(String, String)>, } -pub fn build_mwi_body( - new_messages: u32, - old_messages: u32, - account_uri: &str, -) -> MwiResult { +pub fn build_mwi_body(new_messages: u32, old_messages: u32, account_uri: &str) -> MwiResult { let waiting = if new_messages > 0 { "yes" } else { "no" }; let body = format!( "Messages-Waiting: {waiting}\r\n\ diff --git a/rust/crates/sip-proto/src/lib.rs b/rust/crates/sip-proto/src/lib.rs index 319c7d9..63ff9c0 100644 --- a/rust/crates/sip-proto/src/lib.rs +++ b/rust/crates/sip-proto/src/lib.rs @@ -4,9 +4,9 @@ //! SDP handling, Digest authentication, and URI rewriting. //! Ported from the TypeScript `ts/sip/` library. -pub mod message; pub mod dialog; pub mod helpers; +pub mod message; pub mod rewrite; /// Network endpoint (address + port + optional negotiated codec). diff --git a/rust/crates/sip-proto/src/message.rs b/rust/crates/sip-proto/src/message.rs index 14a00c7..dcdbd53 100644 --- a/rust/crates/sip-proto/src/message.rs +++ b/rust/crates/sip-proto/src/message.rs @@ -14,7 +14,11 @@ pub struct SipMessage { impl SipMessage { pub fn new(start_line: String, headers: Vec<(String, String)>, body: String) -> Self { - Self { start_line, headers, body } + Self { + start_line, + headers, + body, + } } // ---- Parsing ----------------------------------------------------------- @@ -175,7 +179,8 @@ impl SipMessage { /// Inserts a header at the top of the header list. pub fn prepend_header(&mut self, name: &str, value: &str) -> &mut Self { - self.headers.insert(0, (name.to_string(), value.to_string())); + self.headers + .insert(0, (name.to_string(), value.to_string())); self } @@ -233,10 +238,7 @@ impl SipMessage { .to_display_name .map(|d| format!("\"{d}\" ")) .unwrap_or_default(); - let to_tag_str = opts - .to_tag - .map(|t| format!(";tag={t}")) - .unwrap_or_default(); + let to_tag_str = opts.to_tag.map(|t| format!(";tag={t}")).unwrap_or_default(); let mut headers = vec![ ( @@ -364,7 +366,43 @@ impl SipMessage { .find(|c: char| c == ';' || c == '>') .unwrap_or(trimmed.len()); let result = &trimmed[..end]; - if result.is_empty() { None } else { Some(result) } + if result.is_empty() { + None + } else { + Some(result) + } + } + } + + /// Extract the user part from a SIP/TEL URI or header value. + pub fn extract_uri_user(uri_or_header_value: &str) -> Option<&str> { + let raw = Self::extract_uri(uri_or_header_value).unwrap_or(uri_or_header_value); + let raw = raw.trim(); + if raw.is_empty() { + return None; + } + + let user_part = if raw + .get(..5) + .is_some_and(|prefix| prefix.eq_ignore_ascii_case("sips:")) + { + &raw[5..] + } else if raw.get(..4).is_some_and(|prefix| { + prefix.eq_ignore_ascii_case("sip:") || prefix.eq_ignore_ascii_case("tel:") + }) { + &raw[4..] + } else { + raw + }; + + let end = user_part + .find(|c: char| matches!(c, '@' | ';' | '?' | '>')) + .unwrap_or(user_part.len()); + let result = &user_part[..end]; + if result.is_empty() { + None + } else { + Some(result) } } } @@ -506,6 +544,19 @@ mod tests { SipMessage::extract_uri("\"Name\" ;tag=abc"), Some("sip:user@host") ); + assert_eq!( + SipMessage::extract_uri_user("\"Name\" ;tag=abc"), + Some("+49 421 219694") + ); + assert_eq!( + SipMessage::extract_uri_user("sip:0049421219694@voip.easybell.de"), + Some("0049421219694") + ); + assert_eq!( + SipMessage::extract_uri_user("tel:+49421219694;phone-context=example.com"), + Some("+49421219694") + ); + assert_eq!(SipMessage::extract_uri_user("SIP:user@host"), Some("user")); } #[test] @@ -535,7 +586,10 @@ mod tests { ); assert_eq!(invite.method(), Some("INVITE")); assert_eq!(invite.call_id(), "test-123"); - assert!(invite.get_header("Via").unwrap().contains("192.168.1.1:5070")); + assert!(invite + .get_header("Via") + .unwrap() + .contains("192.168.1.1:5070")); let response = SipMessage::create_response( 200, diff --git a/rust/crates/sip-proto/src/rewrite.rs b/rust/crates/sip-proto/src/rewrite.rs index 1f60478..3ebf208 100644 --- a/rust/crates/sip-proto/src/rewrite.rs +++ b/rust/crates/sip-proto/src/rewrite.rs @@ -92,7 +92,11 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option .collect(); let original = match (orig_addr, orig_port) { - (Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }), + (Some(a), Some(p)) => Some(Endpoint { + address: a, + port: p, + codec_pt: None, + }), _ => None, }; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 6c41e43..0f3c109 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.23.0', + version: '1.24.0', description: 'undefined' } diff --git a/ts/config.ts b/ts/config.ts index 6f7a8bc..98921cb 100644 --- a/ts/config.ts +++ b/ts/config.ts @@ -62,7 +62,10 @@ export interface ISipRouteMatch { direction: 'inbound' | 'outbound'; /** - * Match the dialed/called number (To/Request-URI for inbound DID, dialed digits for outbound). + * Match the normalized called number. + * + * Inbound: matches the provider-delivered DID / Request-URI user part. + * Outbound: matches the normalized dialed digits. * Supports: exact string, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/"). */ numberPattern?: string; @@ -89,13 +92,13 @@ export interface ISipRouteAction { // --- Inbound actions (IVR / voicemail) --- - /** Route directly to a voicemail box (skip ringing devices). */ + /** Voicemail fallback for matched inbound routes. */ voicemailBox?: string; /** Route to an IVR menu by menu ID (skip ringing devices). */ ivrMenuId?: string; - /** Override no-answer timeout (seconds) before routing to voicemail. */ + /** Reserved for future no-answer handling. */ noAnswerTimeout?: number; // --- Outbound actions (provider selection) --- diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 6c41e43..0f3c109 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.23.0', + version: '1.24.0', description: 'undefined' } diff --git a/ts_web/elements/sipproxy-view-routes.ts b/ts_web/elements/sipproxy-view-routes.ts index 5f99dd1..fc21fc8 100644 --- a/ts_web/elements/sipproxy-view-routes.ts +++ b/ts_web/elements/sipproxy-view-routes.ts @@ -20,6 +20,9 @@ interface ISipRoute { action: { targets?: string[]; ringBrowsers?: boolean; + voicemailBox?: string; + ivrMenuId?: string; + noAnswerTimeout?: number; provider?: string; failoverProviders?: string[]; stripPrefix?: string; @@ -40,10 +43,10 @@ export class SipproxyViewRoutes extends DeesElement { `, ]; - connectedCallback() { - super.connectedCallback(); - appState.subscribe((_k, s) => { this.appData = s; }); - this.loadConfig(); + async connectedCallback(): Promise { + await super.connectedCallback(); + appState.subscribe((s) => { this.appData = s; }); + await this.loadConfig(); } private async loadConfig() { @@ -157,9 +160,15 @@ export class SipproxyViewRoutes extends DeesElement { return html`${parts.join(' ')}`; } else { const parts: string[] = []; - if (a.targets?.length) parts.push(`ring: ${a.targets.join(', ')}`); - else parts.push('ring: all devices'); - if (a.ringBrowsers) parts.push('+ browsers'); + if (a.ivrMenuId) { + parts.push(`ivr: ${a.ivrMenuId}`); + } else { + if (a.targets?.length) parts.push(`ring: ${a.targets.join(', ')}`); + else parts.push('ring: all devices'); + if (a.ringBrowsers) parts.push('+ browsers'); + } + if (a.voicemailBox) parts.push(`vm: ${a.voicemailBox}`); + if (a.noAnswerTimeout) parts.push(`timeout: ${a.noAnswerTimeout}s`); return html`${parts.join(' ')}`; } }, @@ -231,6 +240,8 @@ export class SipproxyViewRoutes extends DeesElement { const cfg = this.config; const providers = cfg?.providers || []; const devices = cfg?.devices || []; + const voiceboxes = cfg?.voiceboxes || []; + const ivrMenus = cfg?.ivr?.menus || []; const formData: ISipRoute = existing ? JSON.parse(JSON.stringify(existing)) @@ -284,7 +295,7 @@ export class SipproxyViewRoutes extends DeesElement { { formData.match.numberPattern = (e.target as any).value || undefined; }} > @@ -328,7 +339,7 @@ export class SipproxyViewRoutes extends DeesElement { { const v = (e.target as any).value.trim(); @@ -342,6 +353,30 @@ export class SipproxyViewRoutes extends DeesElement { @newValue=${(e: CustomEvent) => { formData.action.ringBrowsers = e.detail; }} > + ({ option: vb.id, key: vb.id })), + ]} + @selectedOption=${(e: CustomEvent) => { formData.action.voicemailBox = e.detail.key || undefined; }} + > + + ({ option: menu.name || menu.id, key: menu.id })), + ]} + @selectedOption=${(e: CustomEvent) => { formData.action.ivrMenuId = e.detail.key || undefined; }} + > + r.id === formData.id);