From 81441e78534bcf13f06e8ef83cd6f4a84afea7ff Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Sat, 11 Apr 2026 18:40:56 +0000 Subject: [PATCH] fix(proxy-engine): fix inbound route browser ringing and provider-facing SDP advertisement while preventing RTP port exhaustion --- changelog.md | 9 + rust/crates/proxy-engine/src/call.rs | 71 +++---- rust/crates/proxy-engine/src/call_manager.rs | 111 ++++++---- rust/crates/proxy-engine/src/config.rs | 26 +++ rust/crates/proxy-engine/src/dtmf.rs | 200 ------------------ rust/crates/proxy-engine/src/main.rs | 20 +- rust/crates/proxy-engine/src/mixer.rs | 16 +- rust/crates/proxy-engine/src/provider.rs | 11 - rust/crates/proxy-engine/src/recorder.rs | 3 + rust/crates/proxy-engine/src/registrar.rs | 35 +-- rust/crates/proxy-engine/src/rtp.rs | 111 ++-------- rust/crates/proxy-engine/src/sip_leg.rs | 13 +- rust/crates/proxy-engine/src/sip_transport.rs | 16 -- ts/00_commitinfo_data.ts | 2 +- ts/proxybridge.ts | 5 + ts/sipproxy.ts | 26 ++- ts_web/00_commitinfo_data.ts | 2 +- 17 files changed, 208 insertions(+), 469 deletions(-) delete mode 100644 rust/crates/proxy-engine/src/dtmf.rs diff --git a/changelog.md b/changelog.md index 227a192..3574c8f 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,14 @@ # Changelog +## 2026-04-11 - 1.20.2 - fix(proxy-engine) +fix inbound route browser ringing and provider-facing SDP advertisement while preventing RTP port exhaustion + +- Honor inbound routing `ringBrowsers` when emitting incoming call events so browser toast notifications can be suppressed per route. +- Rewrite SDP and Record-Route using the destination leg's routable address, using `public_ip` for provider legs and LAN IP for device and internal legs. +- Store provider leg public IP metadata on legs to support correct per-destination SIP message rewriting. +- Change the RTP port pool to track sockets with `Weak` so ports are reclaimed automatically after calls end, avoiding leaked allocations and eventual 503 failures on new calls. +- Remove unused dashboard/status, DTMF, relay, and transport helper code paths as part of engine cleanup. + ## 2026-04-11 - 1.20.1 - fix(docker) install required native build tools for Rust dependencies in the build image diff --git a/rust/crates/proxy-engine/src/call.rs b/rust/crates/proxy-engine/src/call.rs index 415ad1a..b035105 100644 --- a/rust/crates/proxy-engine/src/call.rs +++ b/rust/crates/proxy-engine/src/call.rs @@ -27,6 +27,10 @@ pub enum CallState { } impl CallState { + /// Wire-format string for events/dashboards. Not currently emitted — + /// call state changes flow as typed events (`call_answered`, etc.) — + /// but kept for future status-snapshot work. + #[allow(dead_code)] pub fn as_str(&self) -> &'static str { match self { Self::SettingUp => "setting-up", @@ -45,6 +49,8 @@ pub enum CallDirection { } impl CallDirection { + /// Wire-format string. See CallState::as_str. + #[allow(dead_code)] pub fn as_str(&self) -> &'static str { match self { Self::Inbound => "inbound", @@ -59,8 +65,13 @@ pub enum LegKind { SipProvider, SipDevice, WebRtc, - Media, // voicemail playback, IVR, recording - Tool, // observer leg for recording, transcription, etc. + /// Voicemail playback, IVR prompt playback, recording — not yet wired up + /// as a distinct leg kind (those paths currently use the mixer's role + /// system instead). Kept behind allow so adding a real media leg later + /// doesn't require re-introducing the variant. + #[allow(dead_code)] + Media, + Tool, // observer leg for recording, transcription, etc. } impl LegKind { @@ -107,11 +118,22 @@ pub struct LegInfo { /// For SIP legs: the SIP Call-ID for message routing. pub sip_call_id: Option, /// For WebRTC legs: the session ID in WebRtcEngine. + /// + /// Populated at leg creation but not yet consumed by the hub — + /// WebRTC session lookup currently goes through the session registry + /// directly. Kept for introspection/debugging. + #[allow(dead_code)] pub webrtc_session_id: Option, /// The RTP socket allocated for this leg. pub rtp_socket: Option>, /// The RTP port number. pub rtp_port: u16, + /// Public IP to advertise in SDP/Record-Route when THIS leg is the + /// destination of a rewrite. Populated only for provider legs; `None` + /// for LAN SIP devices, WebRTC browsers, media, and tool legs (which + /// are reachable via `lan_ip`). See `route_passthrough_message` for + /// the per-destination advertise-IP logic. + pub public_ip: Option, /// The remote media endpoint (learned from SDP or address learning). pub remote_media: Option, /// SIP signaling address (provider or device). @@ -124,14 +146,21 @@ pub struct LegInfo { /// A multiparty call with N legs and a central mixer. pub struct Call { + // Duplicated from the HashMap key in CallManager. Kept for future + // status-snapshot work. + #[allow(dead_code)] pub id: String, pub state: CallState, + // Populated at call creation but not currently consumed — dashboard + // pull snapshots are gone (push events only). + #[allow(dead_code)] pub direction: CallDirection, pub created_at: Instant, // Metadata. pub caller_number: Option, pub callee_number: Option, + #[allow(dead_code)] pub provider_id: String, /// Original INVITE from the device (for device-originated outbound calls). @@ -211,42 +240,4 @@ impl Call { handle.abort(); } } - - /// Produce a JSON status snapshot for the dashboard. - pub fn to_status_json(&self) -> serde_json::Value { - let legs: Vec = self - .legs - .values() - .filter(|l| l.state != LegState::Terminated) - .map(|l| { - let metadata: serde_json::Value = if l.metadata.is_empty() { - serde_json::json!({}) - } else { - serde_json::Value::Object( - l.metadata.iter().map(|(k, v)| (k.clone(), v.clone())).collect(), - ) - }; - serde_json::json!({ - "id": l.id, - "type": l.kind.as_str(), - "state": l.state.as_str(), - "codec": sip_proto::helpers::codec_name(l.codec_pt), - "rtpPort": l.rtp_port, - "remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())), - "metadata": metadata, - }) - }) - .collect(); - - serde_json::json!({ - "id": self.id, - "state": self.state.as_str(), - "direction": self.direction.as_str(), - "callerNumber": self.caller_number, - "calleeNumber": self.callee_number, - "providerUsed": self.provider_id, - "duration": self.duration_secs(), - "legs": legs, - }) - } } diff --git a/rust/crates/proxy-engine/src/call_manager.rs b/rust/crates/proxy-engine/src/call_manager.rs index 402c94d..5504fa1 100644 --- a/rust/crates/proxy-engine/src/call_manager.rs +++ b/rust/crates/proxy-engine/src/call_manager.rs @@ -20,6 +20,14 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::net::UdpSocket; +/// Result of creating an inbound call — carries both the call id and +/// whether browsers should be notified (flows from the matched inbound +/// route's `ring_browsers` flag, or the fallback default). +pub struct InboundCallCreated { + pub call_id: String, + pub ring_browsers: bool, +} + /// Emit a `leg_added` event with full leg information. /// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed. fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) { @@ -94,26 +102,6 @@ impl CallManager { self.sip_index.contains_key(sip_call_id) } - /// Get an RTP socket for a call's provider leg (used by webrtc_link). - pub fn get_call_provider_rtp_socket(&self, call_id: &str) -> Option> { - let call = self.calls.get(call_id)?; - for leg in call.legs.values() { - if leg.kind == LegKind::SipProvider { - return leg.rtp_socket.clone(); - } - } - None - } - - /// Get all active call statuses for the dashboard. - pub fn get_all_statuses(&self) -> Vec { - self.calls - .values() - .filter(|c| c.state != CallState::Terminated) - .map(|c| c.to_status_json()) - .collect() - } - // ----------------------------------------------------------------------- // SIP message routing // ----------------------------------------------------------------------- @@ -426,8 +414,8 @@ impl CallManager { // Find the counterpart leg. let other_leg = call.legs.values().find(|l| l.id != this_leg_id && l.state != LegState::Terminated); - let (other_addr, other_rtp_port, other_leg_id) = match other_leg { - Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone()), + let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) = match other_leg { + Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone(), l.kind, l.public_ip.clone()), None => return false, }; let forward_to = match other_addr { @@ -438,8 +426,14 @@ impl CallManager { let lan_ip = config.proxy.lan_ip.clone(); let lan_port = config.proxy.lan_port; - // Get this leg's RTP port (for SDP rewriting — tell the other side to send RTP here). - let this_rtp_port = call.legs.get(this_leg_id).map(|l| l.rtp_port).unwrap_or(0); + // Pick the IP to advertise to the destination leg. Provider legs face + // the public internet and need `public_ip`; every other leg kind is + // on-LAN (or proxy-internal) and takes `lan_ip`. This rule is applied + // both to the SDP `c=` line and the Record-Route header below. + let advertise_ip: String = match other_kind { + LegKind::SipProvider => other_public_ip.unwrap_or_else(|| lan_ip.clone()), + _ => lan_ip.clone(), + }; // Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt). let other_has_sip_leg = call.legs.get(&other_leg_id) @@ -533,10 +527,11 @@ impl CallManager { // Forward other requests with SDP rewriting. let mut fwd = msg.clone(); - // Rewrite SDP to point the other side to this leg's RTP port - // (so we receive their audio on our socket). + // Rewrite SDP so the destination leg sends RTP to our proxy port + // at an address that is routable from its vantage point + // (public IP for provider legs, LAN IP for everything else). if fwd.has_sdp_body() { - let (new_body, _) = rewrite_sdp(&fwd.body, &lan_ip, other_rtp_port); + let (new_body, _) = rewrite_sdp(&fwd.body, &advertise_ip, other_rtp_port); fwd.body = new_body; fwd.update_content_length(); } @@ -548,7 +543,8 @@ impl CallManager { } } if fwd.is_dialog_establishing() { - fwd.prepend_header("Record-Route", &format!("")); + // Record-Route must also be routable from the destination leg. + fwd.prepend_header("Record-Route", &format!("")); } let _ = socket.send_to(&fwd.serialize(), forward_to).await; return true; @@ -560,15 +556,10 @@ impl CallManager { let cseq_method = msg.cseq_method().unwrap_or("").to_uppercase(); let mut fwd = msg.clone(); - // Rewrite SDP so the forward-to side sends RTP to the correct leg port. + // Rewrite SDP so the forward-to side sends RTP to the correct + // leg port at a routable address (see `advertise_ip` above). if fwd.has_sdp_body() { - let rewrite_ip = if this_kind == LegKind::SipDevice { - // Response from device → send to provider: use LAN/public IP. - &lan_ip - } else { - &lan_ip - }; - let (new_body, _) = rewrite_sdp(&fwd.body, rewrite_ip, other_rtp_port); + let (new_body, _) = rewrite_sdp(&fwd.body, &advertise_ip, other_rtp_port); fwd.body = new_body; fwd.update_content_length(); } @@ -690,7 +681,7 @@ impl CallManager { rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, - ) -> Option { + ) -> Option { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; @@ -707,17 +698,41 @@ impl CallManager { .unwrap_or("") .to_string(); - // Resolve target device. - let device_addr = match self.resolve_first_device(config, registrar) { + // Resolve via the configured inbound routing table. This honors + // user-defined routes from the UI (numberPattern, callerPattern, + // sourceProvider, targets, ringBrowsers). If no route matches, the + // fallback returns an empty `device_ids` and `ring_browsers: true`, + // which preserves pre-routing behavior via the `resolve_first_device` + // fallback below. + // + // TODO: Multi-target inbound fork is not yet implemented. + // - `route.device_ids` beyond the first registered target are ignored. + // - `ring_browsers` is informational only — browsers see a toast but + // do not race the SIP device. First-to-answer-wins requires a + // multi-leg fork + per-leg CANCEL, which is not built yet. + // - `voicemail_box`, `ivr_menu_id`, `no_answer_timeout` are not honored. + let route = config.resolve_inbound_route(provider_id, &called_number, &caller_number); + let ring_browsers = route.ring_browsers; + + // Pick the first registered device from the matched targets, or fall + // back to any-registered-device if the route has no resolved targets. + let device_addr = route + .device_ids + .iter() + .find_map(|id| registrar.get_device_contact(id)) + .or_else(|| self.resolve_first_device(config, registrar)); + + let device_addr = match device_addr { Some(addr) => addr, None => { // No device registered → voicemail. - return self + let call_id = self .route_to_voicemail( &call_id, invite, from_addr, &caller_number, provider_id, provider_config, config, rtp_pool, socket, public_ip, ) - .await; + .await?; + return Some(InboundCallCreated { call_id, ring_browsers }); } }; @@ -781,6 +796,7 @@ impl CallManager { webrtc_session_id: None, rtp_socket: Some(provider_rtp.socket.clone()), rtp_port: provider_rtp.port, + public_ip: public_ip.map(|s| s.to_string()), remote_media: provider_media, signaling_addr: Some(from_addr), metadata: HashMap::new(), @@ -801,6 +817,7 @@ impl CallManager { webrtc_session_id: None, rtp_socket: Some(device_rtp.socket.clone()), rtp_port: device_rtp.port, + public_ip: None, remote_media: None, // Learned from device's 200 OK. signaling_addr: Some(device_addr), metadata: HashMap::new(), @@ -844,7 +861,7 @@ impl CallManager { } } - Some(call_id) + Some(InboundCallCreated { call_id, ring_browsers }) } /// Initiate an outbound B2BUA call from the dashboard. @@ -920,6 +937,7 @@ impl CallManager { webrtc_session_id: None, rtp_socket: Some(rtp_alloc.socket.clone()), rtp_port: rtp_alloc.port, + public_ip: public_ip.map(|s| s.to_string()), remote_media: None, signaling_addr: Some(provider_dest), metadata: HashMap::new(), @@ -1030,6 +1048,7 @@ impl CallManager { sip_leg: None, sip_call_id: Some(device_sip_call_id.clone()), webrtc_session_id: None, + public_ip: None, rtp_socket: Some(device_rtp.socket.clone()), rtp_port: device_rtp.port, remote_media: device_media, @@ -1076,6 +1095,7 @@ impl CallManager { webrtc_session_id: None, rtp_socket: Some(provider_rtp.socket.clone()), rtp_port: provider_rtp.port, + public_ip: public_ip.map(|s| s.to_string()), remote_media: None, signaling_addr: Some(provider_dest), metadata: HashMap::new(), @@ -1114,7 +1134,7 @@ impl CallManager { public_ip: Option<&str>, registered_aor: &str, ) -> Option { - let call = self.calls.get(call_id)?; + self.calls.get(call_id)?; // existence check; the call is re-fetched via get_mut below let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; @@ -1151,6 +1171,7 @@ impl CallManager { webrtc_session_id: None, rtp_socket: Some(rtp_alloc.socket.clone()), rtp_port: rtp_alloc.port, + public_ip: public_ip.map(|s| s.to_string()), remote_media: None, signaling_addr: Some(provider_dest), metadata: HashMap::new(), @@ -1182,7 +1203,7 @@ impl CallManager { socket: &UdpSocket, ) -> Option { let device_addr = registrar.get_device_contact(device_id)?; - let call = self.calls.get(call_id)?; + self.calls.get(call_id)?; // existence check; the call is re-fetched via get_mut below let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; @@ -1221,6 +1242,7 @@ impl CallManager { webrtc_session_id: None, rtp_socket: Some(rtp_alloc.socket.clone()), rtp_port: rtp_alloc.port, + public_ip: None, remote_media: None, signaling_addr: Some(device_addr), metadata: HashMap::new(), @@ -1581,6 +1603,7 @@ impl CallManager { webrtc_session_id: None, rtp_socket: Some(rtp_alloc.socket.clone()), rtp_port: rtp_alloc.port, + public_ip: public_ip.map(|s| s.to_string()), remote_media: Some(provider_media), signaling_addr: Some(from_addr), metadata: HashMap::new(), diff --git a/rust/crates/proxy-engine/src/config.rs b/rust/crates/proxy-engine/src/config.rs index 745190b..c341bc5 100644 --- a/rust/crates/proxy-engine/src/config.rs +++ b/rust/crates/proxy-engine/src/config.rs @@ -30,6 +30,11 @@ impl Endpoint { } /// Provider quirks for codec/protocol workarounds. +// +// Deserialized from provider config for TS parity. Early-media silence +// injection and related workarounds are not yet ported to the Rust engine, +// so every field is populated by serde but not yet consumed. +#[allow(dead_code)] #[derive(Debug, Clone, Deserialize)] pub struct Quirks { #[serde(rename = "earlyMediaSilence")] @@ -44,6 +49,9 @@ pub struct Quirks { #[derive(Debug, Clone, Deserialize)] pub struct ProviderConfig { pub id: String, + // UI label — populated by serde for parity with the TS config, not + // consumed at runtime. + #[allow(dead_code)] #[serde(rename = "displayName")] pub display_name: String, pub domain: String, @@ -54,6 +62,8 @@ pub struct ProviderConfig { #[serde(rename = "registerIntervalSec")] pub register_interval_sec: u32, pub codecs: Vec, + // Workaround knobs populated by serde but not yet acted upon — see Quirks. + #[allow(dead_code)] pub quirks: Quirks, } @@ -84,6 +94,10 @@ pub struct RouteMatch { /// Route action. #[derive(Debug, Clone, Deserialize)] +// Several fields (voicemail_box, ivr_menu_id, no_answer_timeout) are read +// by resolve_inbound_route but not yet honored downstream — see the +// multi-target TODO in CallManager::create_inbound_call. +#[allow(dead_code)] pub struct RouteAction { pub targets: Option>, #[serde(rename = "ringBrowsers")] @@ -106,7 +120,11 @@ pub struct RouteAction { /// A routing rule. #[derive(Debug, Clone, Deserialize)] pub struct Route { + // `id` and `name` are UI identifiers, populated by serde but not + // consumed by the resolvers. + #[allow(dead_code)] pub id: String, + #[allow(dead_code)] pub name: String, pub priority: i32, pub enabled: bool, @@ -192,10 +210,18 @@ pub fn matches_pattern(pattern: Option<&str>, value: &str) -> bool { /// Result of resolving an outbound route. pub struct OutboundRouteResult { pub provider: ProviderConfig, + // TODO: prefix rewriting is unfinished — this is computed but the + // caller ignores it and uses the raw dialed number. + #[allow(dead_code)] pub transformed_number: String, } /// Result of resolving an inbound route. +// +// `device_ids` and `ring_browsers` are consumed by create_inbound_call. +// The remaining fields (voicemail_box, ivr_menu_id, no_answer_timeout) +// are resolved but not yet acted upon — see the multi-target TODO. +#[allow(dead_code)] pub struct InboundRouteResult { pub device_ids: Vec, pub ring_browsers: bool, diff --git a/rust/crates/proxy-engine/src/dtmf.rs b/rust/crates/proxy-engine/src/dtmf.rs deleted file mode 100644 index b874149..0000000 --- a/rust/crates/proxy-engine/src/dtmf.rs +++ /dev/null @@ -1,200 +0,0 @@ -//! DTMF detection — parses RFC 2833 telephone-event RTP packets. -//! -//! Deduplicates repeated packets (same digit sent multiple times with -//! increasing duration) and fires once per detected digit. -//! -//! Ported from ts/call/dtmf-detector.ts. - -use crate::ipc::{emit_event, OutTx}; - -/// RFC 2833 event ID → character mapping. -const EVENT_CHARS: &[char] = &[ - '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#', 'A', 'B', 'C', 'D', -]; - -/// Safety timeout: report digit if no End packet arrives within this many ms. -const SAFETY_TIMEOUT_MS: u64 = 200; - -/// DTMF detector for a single RTP stream. -pub struct DtmfDetector { - /// Negotiated telephone-event payload type (default 101). - telephone_event_pt: u8, - /// Clock rate for duration calculation (default 8000 Hz). - clock_rate: u32, - /// Call ID for event emission. - call_id: String, - - // Deduplication state. - current_event_id: Option, - current_event_ts: Option, - current_event_reported: bool, - current_event_duration: u16, - - out_tx: OutTx, -} - -impl DtmfDetector { - pub fn new(call_id: String, out_tx: OutTx) -> Self { - Self { - telephone_event_pt: 101, - clock_rate: 8000, - call_id, - current_event_id: None, - current_event_ts: None, - current_event_reported: false, - current_event_duration: 0, - out_tx, - } - } - - /// Feed an RTP packet. Checks PT; ignores non-DTMF packets. - /// Returns Some(digit_char) if a digit was detected. - pub fn process_rtp(&mut self, data: &[u8]) -> Option { - if data.len() < 16 { - return None; // 12-byte header + 4-byte telephone-event minimum - } - - let pt = data[1] & 0x7F; - if pt != self.telephone_event_pt { - return None; - } - - let marker = (data[1] & 0x80) != 0; - let rtp_timestamp = u32::from_be_bytes([data[4], data[5], data[6], data[7]]); - - // Parse telephone-event payload. - let event_id = data[12]; - let end_bit = (data[13] & 0x80) != 0; - let duration = u16::from_be_bytes([data[14], data[15]]); - - if event_id as usize >= EVENT_CHARS.len() { - return None; - } - - // Detect new event. - let is_new = marker - || self.current_event_id != Some(event_id) - || self.current_event_ts != Some(rtp_timestamp); - - if is_new { - // Report pending unreported event. - let pending = self.report_pending(); - - self.current_event_id = Some(event_id); - self.current_event_ts = Some(rtp_timestamp); - self.current_event_reported = false; - self.current_event_duration = duration; - - if pending.is_some() { - return pending; - } - } - - if duration > self.current_event_duration { - self.current_event_duration = duration; - } - - // Report on End bit (first time only). - if end_bit && !self.current_event_reported { - self.current_event_reported = true; - let digit = EVENT_CHARS[event_id as usize]; - let duration_ms = (self.current_event_duration as f64 / self.clock_rate as f64) * 1000.0; - - emit_event( - &self.out_tx, - "dtmf_digit", - serde_json::json!({ - "call_id": self.call_id, - "digit": digit.to_string(), - "duration_ms": duration_ms.round() as u32, - "source": "rfc2833", - }), - ); - - return Some(digit); - } - - None - } - - /// Report a pending unreported event. - fn report_pending(&mut self) -> Option { - if let Some(event_id) = self.current_event_id { - if !self.current_event_reported && (event_id as usize) < EVENT_CHARS.len() { - self.current_event_reported = true; - let digit = EVENT_CHARS[event_id as usize]; - let duration_ms = - (self.current_event_duration as f64 / self.clock_rate as f64) * 1000.0; - - emit_event( - &self.out_tx, - "dtmf_digit", - serde_json::json!({ - "call_id": self.call_id, - "digit": digit.to_string(), - "duration_ms": duration_ms.round() as u32, - "source": "rfc2833", - }), - ); - - return Some(digit); - } - } - None - } - - /// Process a SIP INFO message body for DTMF. - pub fn process_sip_info(&mut self, content_type: &str, body: &str) -> Option { - let ct = content_type.to_ascii_lowercase(); - - if ct.contains("application/dtmf-relay") { - // Format: "Signal= 5\r\nDuration= 160\r\n" - let signal = body - .lines() - .find(|l| l.to_ascii_lowercase().starts_with("signal")) - .and_then(|l| l.split('=').nth(1)) - .map(|s| s.trim().to_string())?; - - if signal.len() != 1 { - return None; - } - let digit = signal.chars().next()?.to_ascii_uppercase(); - if !"0123456789*#ABCD".contains(digit) { - return None; - } - - emit_event( - &self.out_tx, - "dtmf_digit", - serde_json::json!({ - "call_id": self.call_id, - "digit": digit.to_string(), - "source": "sip-info", - }), - ); - - return Some(digit); - } - - if ct.contains("application/dtmf") { - let digit = body.trim().chars().next()?.to_ascii_uppercase(); - if !"0123456789*#ABCD".contains(digit) { - return None; - } - - emit_event( - &self.out_tx, - "dtmf_digit", - serde_json::json!({ - "call_id": self.call_id, - "digit": digit.to_string(), - "source": "sip-info", - }), - ); - - return Some(digit); - } - - None - } -} diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index b5fc4dc..6f34109 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -10,7 +10,6 @@ mod audio_player; mod call; mod call_manager; mod config; -mod dtmf; mod ipc; mod jitter_buffer; mod leg_io; @@ -140,7 +139,6 @@ async fn handle_command( "configure" => handle_configure(engine, out_tx, &cmd).await, "hangup" => handle_hangup(engine, out_tx, &cmd).await, "make_call" => handle_make_call(engine, out_tx, &cmd).await, - "get_status" => handle_get_status(engine, out_tx, &cmd).await, "add_leg" => handle_add_leg(engine, out_tx, &cmd).await, "remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await, // WebRTC commands — lock webrtc only (no engine contention). @@ -330,7 +328,7 @@ async fn handle_sip_packet( .. } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); - let call_id = call_mgr + let inbound = call_mgr .create_inbound_call( &msg, from_addr, @@ -344,7 +342,7 @@ async fn handle_sip_packet( ) .await; - if let Some(call_id) = call_id { + if let Some(inbound) = inbound { // Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc). let from_header = msg.get_header("From").unwrap_or(""); let from_uri = SipMessage::extract_uri(from_header).unwrap_or("Unknown"); @@ -357,10 +355,11 @@ async fn handle_sip_packet( &eng.out_tx, "incoming_call", serde_json::json!({ - "call_id": call_id, + "call_id": inbound.call_id, "from_uri": from_uri, "to_number": called_number, "provider_id": provider_id, + "ring_browsers": inbound.ring_browsers, }), ); } @@ -383,7 +382,7 @@ async fn handle_sip_packet( let route_result = config_ref.resolve_outbound_route( &dialed_number, device_id.as_deref(), - &|pid: &str| { + &|_pid: &str| { // Can't call async here — use a sync check. // For now, assume all configured providers are available. true @@ -454,13 +453,6 @@ async fn handle_sip_packet( ); } -/// Handle `get_status` — return active call statuses from Rust. -async fn handle_get_status(engine: Arc>, out_tx: &OutTx, cmd: &Command) { - let eng = engine.lock().await; - let calls = eng.call_mgr.get_all_statuses(); - respond_ok(out_tx, &cmd.id, serde_json::json!({ "calls": calls })); -} - /// Handle `make_call` — initiate an outbound call to a number via a provider. async fn handle_make_call(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let number = match cmd.params.get("number").and_then(|v| v.as_str()) { @@ -665,6 +657,7 @@ async fn handle_webrtc_link( webrtc_session_id: Some(session_id.clone()), rtp_socket: None, rtp_port: 0, + public_ip: None, remote_media: None, signaling_addr: None, metadata: std::collections::HashMap::new(), @@ -1116,6 +1109,7 @@ async fn handle_add_tool_leg( webrtc_session_id: None, rtp_socket: None, rtp_port: 0, + public_ip: None, remote_media: None, signaling_addr: None, metadata, diff --git a/rust/crates/proxy-engine/src/mixer.rs b/rust/crates/proxy-engine/src/mixer.rs index 725e212..4301647 100644 --- a/rust/crates/proxy-engine/src/mixer.rs +++ b/rust/crates/proxy-engine/src/mixer.rs @@ -39,6 +39,10 @@ pub struct RtpPacket { /// RTP sequence number for reordering. pub seq: u16, /// RTP timestamp from the original packet header. + /// + /// Set on inbound RTP but not yet consumed downstream — reserved for + /// future jitter/sync work in the mixer. + #[allow(dead_code)] pub timestamp: u32, } @@ -136,8 +140,6 @@ pub enum MixerCommand { timeout_ms: u32, result_tx: oneshot::Sender, }, - /// Cancel an in-progress interaction (e.g., leg being removed). - CancelInteraction { leg_id: String }, /// Add a tool leg that receives per-source unmerged audio. AddToolLeg { @@ -295,16 +297,6 @@ async fn mixer_loop( let _ = result_tx.send(InteractionResult::Cancelled); } } - Ok(MixerCommand::CancelInteraction { leg_id }) => { - if let Some(slot) = legs.get_mut(&leg_id) { - if let LegRole::Isolated(ref mut state) = slot.role { - if let Some(tx) = state.result_tx.take() { - let _ = tx.send(InteractionResult::Cancelled); - } - } - slot.role = LegRole::Participant; - } - } Ok(MixerCommand::AddToolLeg { leg_id, tool_type, diff --git a/rust/crates/proxy-engine/src/provider.rs b/rust/crates/proxy-engine/src/provider.rs index 67cb71c..ecbd406 100644 --- a/rust/crates/proxy-engine/src/provider.rs +++ b/rust/crates/proxy-engine/src/provider.rs @@ -331,17 +331,6 @@ impl ProviderManager { } None } - - /// Check if a provider is currently registered. - pub async fn is_registered(&self, provider_id: &str) -> bool { - for ps_arc in &self.providers { - let ps = ps_arc.lock().await; - if ps.config.id == provider_id { - return ps.is_registered; - } - } - false - } } /// Registration loop for a single provider. diff --git a/rust/crates/proxy-engine/src/recorder.rs b/rust/crates/proxy-engine/src/recorder.rs index 35b46f8..e942b71 100644 --- a/rust/crates/proxy-engine/src/recorder.rs +++ b/rust/crates/proxy-engine/src/recorder.rs @@ -178,5 +178,8 @@ impl Recorder { pub struct RecordingResult { pub file_path: String, pub duration_ms: u64, + // Running-sample total kept for parity with the TS recorder; not yet + // surfaced through any event or dashboard field. + #[allow(dead_code)] pub total_samples: u64, } diff --git a/rust/crates/proxy-engine/src/registrar.rs b/rust/crates/proxy-engine/src/registrar.rs index 2f6e5ac..0180eea 100644 --- a/rust/crates/proxy-engine/src/registrar.rs +++ b/rust/crates/proxy-engine/src/registrar.rs @@ -19,11 +19,19 @@ const MAX_EXPIRES: u32 = 300; #[derive(Debug, Clone)] pub struct RegisteredDevice { pub device_id: String, + // These fields are populated at REGISTER time for logging/debugging but are + // not read back — device identity flows via the `device_registered` push + // event, not via struct queries. Kept behind allow(dead_code) because + // removing them would churn handle_register for no runtime benefit. + #[allow(dead_code)] pub display_name: String, + #[allow(dead_code)] pub extension: String, pub contact_addr: SocketAddr, + #[allow(dead_code)] pub registered_at: Instant, pub expires_at: Instant, + #[allow(dead_code)] pub aor: String, } @@ -134,11 +142,6 @@ impl Registrar { Some(entry.contact_addr) } - /// Check if a source address belongs to a known device. - pub fn is_known_device_address(&self, addr: &str) -> bool { - self.devices.iter().any(|d| d.expected_address == addr) - } - /// Find a registered device by its source IP address. pub fn find_by_address(&self, addr: &SocketAddr) -> Option<&RegisteredDevice> { let ip = addr.ip().to_string(); @@ -146,26 +149,4 @@ impl Registrar { e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at }) } - - /// Get all device statuses for the dashboard. - pub fn get_all_statuses(&self) -> Vec { - let now = Instant::now(); - let mut result = Vec::new(); - - for dc in &self.devices { - let reg = self.registered.get(&dc.id); - let connected = reg.map(|r| now <= r.expires_at).unwrap_or(false); - result.push(serde_json::json!({ - "id": dc.id, - "displayName": dc.display_name, - "address": reg.filter(|_| connected).map(|r| r.contact_addr.ip().to_string()), - "port": reg.filter(|_| connected).map(|r| r.contact_addr.port()), - "aor": reg.map(|r| r.aor.as_str()).unwrap_or(""), - "connected": connected, - "isBrowser": false, - })); - } - - result - } } diff --git a/rust/crates/proxy-engine/src/rtp.rs b/rust/crates/proxy-engine/src/rtp.rs index 30cf69b..4d38180 100644 --- a/rust/crates/proxy-engine/src/rtp.rs +++ b/rust/crates/proxy-engine/src/rtp.rs @@ -1,17 +1,19 @@ -//! RTP port pool and media forwarding. +//! RTP port pool for media sockets. //! -//! Manages a pool of even-numbered UDP ports for RTP media. -//! Each port gets a bound tokio UdpSocket. Supports: -//! - Direct forwarding (SIP-to-SIP, no transcoding) -//! - Transcoding forwarding (via codec-lib, e.g. G.722 ↔ Opus) -//! - Silence generation -//! - NAT priming +//! Manages a pool of even-numbered UDP ports for RTP media. `allocate()` +//! hands back an `Arc` to the caller (stored on the owning +//! `LegInfo`), while the pool itself keeps only a `Weak`. When +//! the call terminates and `LegInfo` is dropped, the strong refcount +//! reaches zero, the socket is closed, and `allocate()` prunes the dead +//! weak ref the next time it scans that slot — so the port automatically +//! becomes available for reuse without any explicit `release()` plumbing. //! -//! Ported from ts/call/rtp-port-pool.ts + sip-leg.ts RTP handling. +//! This fixes the previous leak where the pool held `Arc` and +//! `release()` was never called, eventually exhausting the port range and +//! causing "503 Service Unavailable" on new calls. use std::collections::HashMap; -use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use tokio::net::UdpSocket; /// A single RTP port allocation. @@ -24,7 +26,7 @@ pub struct RtpAllocation { pub struct RtpPortPool { min: u16, max: u16, - allocated: HashMap>, + allocated: HashMap>, } impl RtpPortPool { @@ -41,11 +43,19 @@ impl RtpPortPool { pub async fn allocate(&mut self) -> Option { let mut port = self.min; while port < self.max { + // Prune a dead weak ref at this slot: if the last strong Arc + // (held by the owning LegInfo) was dropped when the call ended, + // the socket is already closed and the slot is free again. + if let Some(weak) = self.allocated.get(&port) { + if weak.strong_count() == 0 { + self.allocated.remove(&port); + } + } if !self.allocated.contains_key(&port) { match UdpSocket::bind(format!("0.0.0.0:{port}")).await { Ok(sock) => { let sock = Arc::new(sock); - self.allocated.insert(port, sock.clone()); + self.allocated.insert(port, Arc::downgrade(&sock)); return Some(RtpAllocation { port, socket: sock }); } Err(_) => { @@ -57,83 +67,6 @@ impl RtpPortPool { } None // Pool exhausted. } - - /// Release a port back to the pool. - pub fn release(&mut self, port: u16) { - self.allocated.remove(&port); - // Socket is dropped when the last Arc reference goes away. - } - - pub fn size(&self) -> usize { - self.allocated.len() - } - - pub fn capacity(&self) -> usize { - ((self.max - self.min) / 2) as usize - } -} - -/// An active RTP relay between two endpoints. -/// Receives on `local_socket` and forwards to `remote_addr`. -pub struct RtpRelay { - pub local_port: u16, - pub local_socket: Arc, - pub remote_addr: Option, - /// If set, transcode packets using this codec session before forwarding. - pub transcode: Option, - /// Packets received counter. - pub pkt_received: u64, - /// Packets sent counter. - pub pkt_sent: u64, -} - -pub struct TranscodeConfig { - pub from_pt: u8, - pub to_pt: u8, - pub session_id: String, -} - -impl RtpRelay { - pub fn new(port: u16, socket: Arc) -> Self { - Self { - local_port: port, - local_socket: socket, - remote_addr: None, - transcode: None, - pkt_received: 0, - pkt_sent: 0, - } - } - - pub fn set_remote(&mut self, addr: SocketAddr) { - self.remote_addr = Some(addr); - } -} - -/// Send a 1-byte NAT priming packet to open a pinhole. -pub async fn prime_nat(socket: &UdpSocket, remote: SocketAddr) { - let _ = socket.send_to(&[0u8], remote).await; -} - -/// Build an RTP silence frame for PCMU (payload type 0). -pub fn silence_frame_pcmu() -> Vec { - // 12-byte RTP header + 160 bytes of µ-law silence (0xFF) - let mut frame = vec![0u8; 172]; - frame[0] = 0x80; // V=2 - frame[1] = 0; // PT=0 (PCMU) - // seq, timestamp, ssrc left as 0 — caller should set these - frame[12..].fill(0xFF); // µ-law silence - frame -} - -/// Build an RTP silence frame for G.722 (payload type 9). -pub fn silence_frame_g722() -> Vec { - // 12-byte RTP header + 160 bytes of G.722 silence - let mut frame = vec![0u8; 172]; - frame[0] = 0x80; // V=2 - frame[1] = 9; // PT=9 (G.722) - // G.722 silence: all zeros is valid silence - frame } /// Build an RTP header with the given parameters. diff --git a/rust/crates/proxy-engine/src/sip_leg.rs b/rust/crates/proxy-engine/src/sip_leg.rs index 40f1c4a..911e36c 100644 --- a/rust/crates/proxy-engine/src/sip_leg.rs +++ b/rust/crates/proxy-engine/src/sip_leg.rs @@ -16,7 +16,6 @@ use sip_proto::helpers::{ }; use sip_proto::message::{RequestOptions, SipMessage}; use std::net::SocketAddr; -use std::sync::Arc; use tokio::net::UdpSocket; /// State of a SIP leg. @@ -40,6 +39,9 @@ pub struct SipLegConfig { /// SIP target endpoint (provider outbound proxy or device address). pub sip_target: SocketAddr, /// Provider credentials (for 407 auth). + // username is carried for parity with the provider config but digest auth + // rebuilds the username from the registered AOR, so this slot is never read. + #[allow(dead_code)] pub username: Option, pub password: Option, pub registered_aor: Option, @@ -51,6 +53,10 @@ pub struct SipLegConfig { /// A SIP leg with full dialog management. pub struct SipLeg { + // Leg identity is tracked via the enclosing LegInfo's key in the call's + // leg map; SipLeg itself never reads this field back. Kept to preserve + // the (id, config) constructor shape used by the call manager. + #[allow(dead_code)] pub id: String, pub state: LegState, pub config: SipLegConfig, @@ -411,11 +417,6 @@ impl SipLeg { dialog.terminate(); Some(msg.serialize()) } - - /// Get the SIP Call-ID for routing. - pub fn sip_call_id(&self) -> Option<&str> { - self.dialog.as_ref().map(|d| d.call_id.as_str()) - } } /// Actions produced by the SipLeg message handler. diff --git a/rust/crates/proxy-engine/src/sip_transport.rs b/rust/crates/proxy-engine/src/sip_transport.rs index cf5036a..fc0a48a 100644 --- a/rust/crates/proxy-engine/src/sip_transport.rs +++ b/rust/crates/proxy-engine/src/sip_transport.rs @@ -27,22 +27,6 @@ impl SipTransport { self.socket.clone() } - /// Send a raw SIP message to a destination. - pub async fn send_to(&self, data: &[u8], dest: SocketAddr) -> Result { - self.socket - .send_to(data, dest) - .await - .map_err(|e| format!("send to {dest}: {e}")) - } - - /// Send a raw SIP message to an address:port pair. - pub async fn send_to_addr(&self, data: &[u8], addr: &str, port: u16) -> Result { - let dest: SocketAddr = format!("{addr}:{port}") - .parse() - .map_err(|e| format!("bad address {addr}:{port}: {e}"))?; - self.send_to(data, dest).await - } - /// Spawn the UDP receive loop. Calls the handler for every received packet. pub fn spawn_receiver( &self, diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index dd80748..81222c6 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.20.1', + version: '1.20.2', description: 'undefined' } diff --git a/ts/proxybridge.ts b/ts/proxybridge.ts index eb807c3..96e87e1 100644 --- a/ts/proxybridge.ts +++ b/ts/proxybridge.ts @@ -94,6 +94,11 @@ export interface IIncomingCallEvent { from_uri: string; to_number: string; provider_id: string; + /** Whether registered browsers should see a `webrtc-incoming` toast for + * this call. Set by the Rust engine from the matched inbound route's + * `ringBrowsers` flag (defaults to `true` when no route matches, so + * deployments without explicit routes preserve pre-routing behavior). */ + ring_browsers?: boolean; } export interface IOutboundCallEvent { diff --git a/ts/sipproxy.ts b/ts/sipproxy.ts index 01f07df..2f75de4 100644 --- a/ts/sipproxy.ts +++ b/ts/sipproxy.ts @@ -273,15 +273,23 @@ async function startProxyEngine(): Promise { legs: new Map(), }); - // Notify browsers of incoming call. - const browserIds = getAllBrowserDeviceIds(); - for (const bid of browserIds) { - sendToBrowserDevice(bid, { - type: 'webrtc-incoming', - callId: data.call_id, - from: data.from_uri, - deviceId: bid, - }); + // Notify browsers of the incoming call, but only if the matched inbound + // route asked for it. `ring_browsers !== false` preserves today's + // ring-by-default behavior for any Rust release that predates this + // field or for the fallback "no route matched" case (where Rust still + // sends `true`). Note: this is an informational toast — browsers do + // NOT race the SIP device to answer. First-to-answer-wins requires + // a multi-leg fork which is not yet implemented. + if (data.ring_browsers !== false) { + const browserIds = getAllBrowserDeviceIds(); + for (const bid of browserIds) { + sendToBrowserDevice(bid, { + type: 'webrtc-incoming', + callId: data.call_id, + from: data.from_uri, + deviceId: bid, + }); + } } }); diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index dd80748..81222c6 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.20.1', + version: '1.20.2', description: 'undefined' }