feat(routing): require explicit inbound DID routes and normalize SIP identities for provider-based number matching

This commit is contained in:
2026-04-14 16:35:54 +00:00
parent cff70ab179
commit 06c86d7e81
29 changed files with 1476 additions and 549 deletions

View File

@@ -1,5 +1,14 @@
# Changelog
## 2026-04-14 - 1.24.0 - feat(routing)
require explicit inbound DID routes and normalize SIP identities for provider-based number matching
- Inbound route resolution now returns no match unless a configured inbound route explicitly matches the provider and called number.
- Normalized routing identities were added for SIP/TEL URIs so inbound DIDs and outbound dialed numbers match consistently across provider-specific formats.
- Call handling and incoming call events now use normalized numbers, improving routing accuracy for shared trunk providers.
- Route configuration docs and the web route editor were updated to support explicit inbound DID ownership, voicemail fallback, and IVR selection.
- Mixer RTP handling was enhanced to better support variable packet durations, timestamp-based gap fill, and non-blocking output drop reporting.
## 2026-04-14 - 1.23.0 - feat(runtime)
refactor runtime state and proxy event handling for typed WebRTC linking and shared status models

View File

@@ -148,24 +148,41 @@ Create `.nogit/config.json`:
"routing": {
"routes": [
{
"id": "inbound-default",
"name": "Ring all devices",
"priority": 100,
"id": "inbound-main-did",
"name": "Main DID",
"priority": 200,
"enabled": true,
"match": {
"direction": "inbound",
"match": {},
"sourceProvider": "my-trunk",
"numberPattern": "+49421219694"
},
"action": {
"targets": ["desk-phone"],
"ringBrowsers": true,
"voicemailBox": "main",
"noAnswerTimeout": 25
"voicemailBox": "main"
}
},
{
"id": "inbound-support-did",
"name": "Support DID",
"priority": 190,
"enabled": true,
"match": {
"direction": "inbound",
"sourceProvider": "my-trunk",
"numberPattern": "+49421219695"
},
"action": {
"ivrMenuId": "support-menu"
}
},
{
"id": "outbound-default",
"name": "Route via trunk",
"priority": 100,
"direction": "outbound",
"match": {},
"enabled": true,
"match": { "direction": "outbound" },
"action": { "provider": "my-trunk" }
}
]
@@ -187,6 +204,8 @@ Create `.nogit/config.json`:
}
```
Inbound number ownership is explicit: add one inbound route per DID (or DID prefix) and scope it with `sourceProvider` when a provider delivers multiple external numbers.
### TTS Setup (Optional)
For neural voicemail greetings and IVR prompts, download the Kokoro TTS model:

View File

@@ -115,8 +115,7 @@ pub struct TranscodeState {
impl TranscodeState {
/// Create a new transcoding session with fresh codec state.
pub fn new() -> Result<Self, String> {
let mut opus_enc =
OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip)
let mut opus_enc = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip)
.map_err(|e| format!("opus encoder: {e}"))?;
opus_enc
.set_complexity(5)
@@ -160,13 +159,8 @@ impl TranscodeState {
let key = (from_rate, to_rate, canonical_chunk);
if !self.resamplers.contains_key(&key) {
let r = FftFixedIn::<f64>::new(
from_rate as usize,
to_rate as usize,
canonical_chunk,
1,
1,
)
let r =
FftFixedIn::<f64>::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1)
.map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?;
self.resamplers.insert(key, r);
}
@@ -284,8 +278,7 @@ impl TranscodeState {
match pt {
PT_OPUS => {
let mut pcm = vec![0i16; 5760]; // up to 120ms at 48kHz
let packet =
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
let n: usize = self
@@ -343,8 +336,7 @@ impl TranscodeState {
match pt {
PT_OPUS => {
let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz
let packet =
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
let n: usize = self
@@ -368,8 +360,8 @@ impl TranscodeState {
/// Returns f32 PCM at 48kHz. `frame_size` should be 960 for 20ms.
pub fn opus_plc(&mut self, frame_size: usize) -> Result<Vec<f32>, String> {
let mut pcm = vec![0.0f32; frame_size];
let out = MutSignals::try_from(&mut pcm[..])
.map_err(|e| format!("opus plc signals: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus plc signals: {e}"))?;
let n: usize = self
.opus_dec
.decode_float(None::<OpusPacket<'_>>, out, false)
@@ -425,13 +417,8 @@ impl TranscodeState {
let key = (from_rate, to_rate, canonical_chunk);
if !self.resamplers_f32.contains_key(&key) {
let r = FftFixedIn::<f32>::new(
from_rate as usize,
to_rate as usize,
canonical_chunk,
1,
1,
)
let r =
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1)
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
self.resamplers_f32.insert(key, r);
}
@@ -508,8 +495,10 @@ mod tests {
let encoded = mulaw_encode(sample);
let decoded = mulaw_decode(encoded);
// µ-law is lossy; verify the decoded value is close.
assert!((sample as i32 - decoded as i32).abs() < 1000,
"µ-law roundtrip failed for {sample}: got {decoded}");
assert!(
(sample as i32 - decoded as i32).abs() < 1000,
"µ-law roundtrip failed for {sample}: got {decoded}"
);
}
}
@@ -518,8 +507,10 @@ mod tests {
for sample in [-32768i16, -1000, -1, 0, 1, 1000, 32767] {
let encoded = alaw_encode(sample);
let decoded = alaw_decode(encoded);
assert!((sample as i32 - decoded as i32).abs() < 1000,
"A-law roundtrip failed for {sample}: got {decoded}");
assert!(
(sample as i32 - decoded as i32).abs() < 1000,
"A-law roundtrip failed for {sample}: got {decoded}"
);
}
}
@@ -543,7 +534,9 @@ mod tests {
fn pcmu_to_pcma_roundtrip() {
let mut st = TranscodeState::new().unwrap();
// 160 bytes = 20ms of PCMU at 8kHz
let pcmu_data: Vec<u8> = (0..160).map(|i| mulaw_encode((i as i16 * 200) - 16000)).collect();
let pcmu_data: Vec<u8> = (0..160)
.map(|i| mulaw_encode((i as i16 * 200) - 16000))
.collect();
let pcma = st.transcode(&pcmu_data, PT_PCMU, PT_PCMA, None).unwrap();
assert_eq!(pcma.len(), 160); // Same frame size
let back = st.transcode(&pcma, PT_PCMA, PT_PCMU, None).unwrap();

View File

@@ -36,10 +36,7 @@ pub async fn play_wav_file(
// Read all samples as i16.
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
reader
.samples::<i16>()
.filter_map(|s| s.ok())
.collect()
reader.samples::<i16>().filter_map(|s| s.ok()).collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
@@ -199,10 +196,7 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
.map(|s| s as f32 / 32768.0)
.collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
.filter_map(|s| s.ok())
.collect()
reader.samples::<f32>().filter_map(|s| s.ok()).collect()
} else {
return Err(format!(
"unsupported WAV format: {}bit {:?}",

View File

@@ -5,7 +5,7 @@
//! The mixer provides mix-minus audio to all participants.
use crate::call::{Call, CallDirection, CallState, LegId, LegInfo, LegKind, LegState};
use crate::config::{AppConfig, ProviderConfig};
use crate::config::{normalize_routing_identity, AppConfig, ProviderConfig};
use crate::ipc::{emit_event, OutTx};
use crate::leg_io::{create_leg_channels, spawn_sip_inbound, spawn_sip_outbound};
use crate::mixer::spawn_mixer;
@@ -13,7 +13,9 @@ use crate::registrar::Registrar;
use crate::rtp::RtpPortPool;
use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig};
use crate::tts::TtsEngine;
use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions};
use sip_proto::helpers::{
build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions,
};
use sip_proto::message::{ResponseOptions, SipMessage};
use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri};
use std::collections::HashMap;
@@ -25,7 +27,7 @@ use tokio::sync::Mutex;
/// Result of creating an inbound call — carries both the call id and
/// whether browsers should be notified (flows from the matched inbound
/// route's `ring_browsers` flag, or the fallback default).
/// route's `ring_browsers` flag).
pub struct InboundCallCreated {
pub call_id: String,
pub ring_browsers: bool,
@@ -214,15 +216,27 @@ impl CallManager {
let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice);
if let Some(dev) = device_leg {
if let Some(dev_addr) = dev.signaling_addr {
let ringing = SipMessage::create_response(180, "Ringing", device_invite, None);
let ringing = SipMessage::create_response(
180,
"Ringing",
device_invite,
None,
);
let _ = socket.send_to(&ringing.serialize(), dev_addr).await;
}
}
}
}
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }));
emit_event(
&self.out_tx,
"call_ringing",
serde_json::json!({ "call_id": call_id }),
);
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }),
);
}
SipLegAction::ConnectedWithAck(ack_buf) => {
let _ = socket.send_to(&ack_buf, target).await;
@@ -245,7 +259,12 @@ impl CallManager {
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
spawn_sip_outbound(rtp_socket, remote_addr, channels.outbound_rx);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(leg_id, sip_pt, channels.inbound_rx, channels.outbound_tx)
call.add_leg_to_mixer(
leg_id,
sip_pt,
channels.inbound_rx,
channels.outbound_tx,
)
.await;
}
}
@@ -253,8 +272,17 @@ impl CallManager {
// For device-originated calls: send 200 OK to device and wire device leg.
if let Some(call) = self.calls.get(call_id) {
if let Some(device_invite) = call.device_invite.clone() {
let device_leg_info: Option<(SocketAddr, u16, Arc<UdpSocket>, Option<SocketAddr>, String)> =
call.legs.values().find(|l| l.kind == LegKind::SipDevice).and_then(|dev| {
let device_leg_info: Option<(
SocketAddr,
u16,
Arc<UdpSocket>,
Option<SocketAddr>,
String,
)> = call
.legs
.values()
.find(|l| l.kind == LegKind::SipDevice)
.and_then(|dev| {
Some((
dev.signaling_addr?,
dev.rtp_port,
@@ -264,11 +292,21 @@ impl CallManager {
))
});
if let Some((dev_addr, dev_rtp_port, dev_rtp_socket, dev_remote, dev_leg_id)) = device_leg_info {
if let Some((
dev_addr,
dev_rtp_port,
dev_rtp_socket,
dev_remote,
dev_leg_id,
)) = device_leg_info
{
// Build SDP pointing device to our device_rtp port.
// Use LAN IP for the device (it's on the local network).
let call_ref = self.calls.get(call_id).unwrap();
let prov_leg = call_ref.legs.values().find(|l| l.kind == LegKind::SipProvider);
let prov_leg = call_ref
.legs
.values()
.find(|l| l.kind == LegKind::SipProvider);
let lan_ip_str = prov_leg
.and_then(|l| l.sip_leg.as_ref())
.map(|sl| sl.config.lan_ip.clone())
@@ -280,13 +318,18 @@ impl CallManager {
..Default::default()
});
let ok = SipMessage::create_response(200, "OK", &device_invite, Some(ResponseOptions {
let ok = SipMessage::create_response(
200,
"OK",
&device_invite,
Some(ResponseOptions {
to_tag: Some(generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip_str, 5060)),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: None,
}));
}),
);
let _ = socket.send_to(&ok.serialize(), dev_addr).await;
// Update device leg state.
@@ -313,9 +356,18 @@ impl CallManager {
if let Some(dev_remote_addr) = dev_remote {
let dev_channels = create_leg_channels();
spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx);
spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx);
spawn_sip_outbound(
dev_rtp_socket,
dev_remote_addr,
dev_channels.outbound_rx,
);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(&dev_leg_id, dev_pt, dev_channels.inbound_rx, dev_channels.outbound_tx)
call.add_leg_to_mixer(
&dev_leg_id,
dev_pt,
dev_channels.inbound_rx,
dev_channels.outbound_tx,
)
.await;
}
}
@@ -323,17 +375,28 @@ impl CallManager {
}
}
emit_event(&self.out_tx, "call_answered", serde_json::json!({
emit_event(
&self.out_tx,
"call_answered",
serde_json::json!({
"call_id": call_id,
"provider_media_addr": remote.map(|a| a.ip().to_string()),
"provider_media_port": remote.map(|a| a.port()),
"sip_pt": sip_pt,
}));
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }));
}),
);
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }),
);
}
SipLegAction::Terminated(reason) => {
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
let duration = self
.calls
.get(call_id)
.map(|c| c.duration_secs())
.unwrap_or(0);
// Notify device if this is a device-originated outbound call.
if let Some(call) = self.calls.get(call_id) {
@@ -343,7 +406,8 @@ impl CallManager {
if let Some(dev_addr) = dev.signaling_addr {
// Map reason to SIP response code.
let code: u16 = if reason.starts_with("rejected_") {
reason.strip_prefix("rejected_")
reason
.strip_prefix("rejected_")
.and_then(|s| s.parse().ok())
.unwrap_or(503)
} else if reason == "bye" {
@@ -354,7 +418,12 @@ impl CallManager {
503
};
if code > 0 && dev.state != LegState::Connected {
let resp = SipMessage::create_response(code, "Service Unavailable", device_invite, None);
let resp = SipMessage::create_response(
code,
"Service Unavailable",
device_invite,
None,
);
let _ = socket.send_to(&resp.serialize(), dev_addr).await;
}
}
@@ -367,22 +436,38 @@ impl CallManager {
leg.state = LegState::Terminated;
}
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }),
);
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }),
);
self.terminate_call(call_id).await;
return true;
}
SipLegAction::SendAndTerminate(buf, reason) => {
let _ = socket.send_to(&buf, from_addr).await;
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
let duration = self
.calls
.get(call_id)
.map(|c| c.duration_secs())
.unwrap_or(0);
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }),
);
self.terminate_call(call_id).await;
return true;
}
SipLegAction::AuthRetry { ack_407, invite_with_auth } => {
SipLegAction::AuthRetry {
ack_407,
invite_with_auth,
} => {
if let Some(ack) = ack_407 {
let _ = socket.send_to(&ack, target).await;
}
@@ -416,9 +501,19 @@ impl CallManager {
let this_kind = this_leg.map(|l| l.kind).unwrap_or(LegKind::SipProvider);
// Find the counterpart leg.
let other_leg = call.legs.values().find(|l| l.id != this_leg_id && l.state != LegState::Terminated);
let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) = match other_leg {
Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone(), l.kind, l.public_ip.clone()),
let other_leg = call
.legs
.values()
.find(|l| l.id != this_leg_id && l.state != LegState::Terminated);
let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) =
match other_leg {
Some(l) => (
l.signaling_addr,
l.rtp_port,
l.id.clone(),
l.kind,
l.public_ip.clone(),
),
None => return false,
};
let forward_to = match other_addr {
@@ -439,7 +534,9 @@ impl CallManager {
};
// Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt).
let other_has_sip_leg = call.legs.get(&other_leg_id)
let other_has_sip_leg = call
.legs
.get(&other_leg_id)
.map(|l| l.sip_leg.is_some())
.unwrap_or(false);
@@ -473,7 +570,8 @@ impl CallManager {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
let _ =
socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
@@ -504,7 +602,8 @@ impl CallManager {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
let _ =
socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
@@ -541,13 +640,17 @@ impl CallManager {
if this_kind == LegKind::SipProvider {
// From provider → forward to device: rewrite request URI.
if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) {
let new_ruri = rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port());
let new_ruri =
rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port());
fwd.set_request_uri(&new_ruri);
}
}
if fwd.is_dialog_establishing() {
// Record-Route must also be routable from the destination leg.
fwd.prepend_header("Record-Route", &format!("<sip:{advertise_ip}:{lan_port};lr>"));
fwd.prepend_header(
"Record-Route",
&format!("<sip:{advertise_ip}:{lan_port};lr>"),
);
}
let _ = socket.send_to(&fwd.serialize(), forward_to).await;
return true;
@@ -572,13 +675,20 @@ impl CallManager {
if code == 180 || code == 183 {
if call.state == CallState::SettingUp {
call.state = CallState::Ringing;
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
emit_event(
&self.out_tx,
"call_ringing",
serde_json::json!({ "call_id": call_id }),
);
}
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }),
);
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
@@ -598,12 +708,19 @@ impl CallManager {
needs_wiring = true;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }),
);
if call.state != CallState::Connected {
call.state = CallState::Connected;
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
emit_event(
&self.out_tx,
"call_answered",
serde_json::json!({ "call_id": call_id }),
);
}
// Forward the response before wiring (drop call borrow).
@@ -693,28 +810,27 @@ impl CallManager {
// Extract caller/callee info.
let from_header = invite.get_header("From").unwrap_or("");
let caller_number = SipMessage::extract_uri(from_header)
.unwrap_or("Unknown")
.to_string();
let called_number = invite
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or("")
.to_string();
let caller_number = normalize_routing_identity(from_header);
let called_number = normalize_routing_identity(invite.request_uri().unwrap_or(""));
// Resolve via the configured inbound routing table. This honors
// user-defined routes from the UI (numberPattern, callerPattern,
// sourceProvider, targets, ringBrowsers). If no route matches, the
// fallback returns an empty `device_ids` and `ring_browsers: true`,
// which preserves pre-routing behavior via the `resolve_first_device`
// fallback below.
// Resolve via the configured inbound routing table. The matched route
// is the source of truth for which external numbers this provider is
// allowed to deliver to us.
//
// TODO: Multi-target inbound fork is not yet implemented.
// - `route.device_ids` beyond the first registered target are ignored.
// - `ring_browsers` is informational only — browsers see a toast but
// do not race the SIP device. First-to-answer-wins requires a
// multi-leg fork + per-leg CANCEL, which is not built yet.
let route = config.resolve_inbound_route(provider_id, &called_number, &caller_number);
let route = match config.resolve_inbound_route(provider_id, &called_number, &caller_number)
{
Some(route) => route,
None => {
let resp = SipMessage::create_response(404, "Not Found", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
}
};
let ring_browsers = route.ring_browsers;
// IVR routing: if the route targets an IVR menu, go there directly.
@@ -724,12 +840,24 @@ impl CallManager {
if let Some(menu) = ivr.menus.iter().find(|m| m.id == *ivr_menu_id) {
let call_id = self
.route_to_ivr(
&call_id, invite, from_addr, &caller_number,
provider_id, provider_config, config, rtp_pool, socket,
public_ip, menu, &tts_engine,
&call_id,
invite,
from_addr,
&caller_number,
provider_id,
provider_config,
config,
rtp_pool,
socket,
public_ip,
menu,
&tts_engine,
)
.await?;
return Some(InboundCallCreated { call_id, ring_browsers });
return Some(InboundCallCreated {
call_id,
ring_browsers,
});
}
}
}
@@ -748,19 +876,27 @@ impl CallManager {
None => {
// No device registered → voicemail.
// Resolve greeting WAV on-demand (may trigger TTS generation).
let greeting_wav = resolve_greeting_wav(
config,
route.voicemail_box.as_deref(),
&tts_engine,
).await;
let greeting_wav =
resolve_greeting_wav(config, route.voicemail_box.as_deref(), &tts_engine).await;
let call_id = self
.route_to_voicemail(
&call_id, invite, from_addr, &caller_number,
provider_id, provider_config, config, rtp_pool, socket, public_ip,
&call_id,
invite,
from_addr,
&caller_number,
provider_id,
provider_config,
config,
rtp_pool,
socket,
public_ip,
greeting_wav,
)
.await?;
return Some(InboundCallCreated { call_id, ring_browsers });
return Some(InboundCallCreated {
call_id,
ring_browsers,
});
}
};
@@ -855,8 +991,10 @@ impl CallManager {
// Register SIP Call-ID → both legs (provider leg handles provider messages).
// For passthrough, both legs share the same SIP Call-ID.
// We route based on source address in route_passthrough_message.
self.sip_index
.insert(sip_call_id.clone(), (call_id.clone(), provider_leg_id.clone()));
self.sip_index.insert(
sip_call_id.clone(),
(call_id.clone(), provider_leg_id.clone()),
);
// Rewrite and forward INVITE to device.
let mut fwd_invite = invite.clone();
@@ -889,7 +1027,10 @@ impl CallManager {
}
}
Some(InboundCallCreated { call_id, ring_browsers })
Some(InboundCallCreated {
call_id,
ring_browsers,
})
}
/// Initiate an outbound B2BUA call from the dashboard.
@@ -938,7 +1079,9 @@ impl CallManager {
// Send INVITE.
let to_uri = format!("sip:{number}@{}", provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
sip_leg
.send_invite(registered_aor, &to_uri, &sip_call_id, socket)
.await;
// Create call with mixer.
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone());
@@ -1109,7 +1252,9 @@ impl CallManager {
// Build proper To URI and send INVITE.
let to_uri = format!("sip:{}@{}", dialed_number, provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket).await;
sip_leg
.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket)
.await;
call.legs.insert(
provider_leg_id.clone(),
@@ -1185,7 +1330,9 @@ impl CallManager {
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
let to_uri = format!("sip:{number}@{}", provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
sip_leg
.send_invite(registered_aor, &to_uri, &sip_call_id, socket)
.await;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
@@ -1256,9 +1403,16 @@ impl CallManager {
};
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
let to_uri = format!("sip:{}@{}:{}", device_id, device_addr.ip(), device_addr.port());
let to_uri = format!(
"sip:{}@{}:{}",
device_id,
device_addr.ip(),
device_addr.port()
);
let from_uri = format!("sip:sipproxy@{lan_ip}:{lan_port}");
sip_leg.send_invite(&from_uri, &to_uri, &sip_call_id, socket).await;
sip_leg
.send_invite(&from_uri, &to_uri, &sip_call_id, socket)
.await;
let leg_info = LegInfo {
id: leg_id.clone(),
@@ -1292,12 +1446,7 @@ impl CallManager {
}
/// Remove a leg from a call.
pub async fn remove_leg(
&mut self,
call_id: &str,
leg_id: &str,
socket: &UdpSocket,
) -> bool {
pub async fn remove_leg(&mut self, call_id: &str, leg_id: &str, socket: &UdpSocket) -> bool {
let call = match self.calls.get_mut(call_id) {
Some(c) => c,
None => return false,
@@ -1310,7 +1459,9 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(leg_id) {
if let Some(sip_leg) = &mut leg.sip_leg {
if let Some(hangup_bytes) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
let _ = socket
.send_to(&hangup_bytes, sip_leg.config.sip_target)
.await;
}
}
leg.state = LegState::Terminated;
@@ -1356,9 +1507,7 @@ impl CallManager {
target_call_id: &str,
) -> bool {
// Validate both calls exist and the leg is in the source call.
if !self.calls.contains_key(source_call_id)
|| !self.calls.contains_key(target_call_id)
{
if !self.calls.contains_key(source_call_id) || !self.calls.contains_key(target_call_id) {
return false;
}
@@ -1503,7 +1652,9 @@ impl CallManager {
}
if let Some(sip_leg) = &mut leg.sip_leg {
if let Some(hangup_bytes) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
let _ = socket
.send_to(&hangup_bytes, sip_leg.config.sip_target)
.await;
}
} else if let Some(addr) = leg.signaling_addr {
// Passthrough leg — send a simple BYE.
@@ -1588,7 +1739,9 @@ impl CallManager {
});
let response = SipMessage::create_response(
200, "OK", invite,
200,
"OK",
invite,
Some(sip_proto::message::ResponseOptions {
to_tag: Some(sip_proto::helpers::generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
@@ -1665,9 +1818,15 @@ impl CallManager {
let rtp_socket = rtp_alloc.socket;
tokio::spawn(async move {
crate::voicemail::run_voicemail_session(
rtp_socket, provider_media, codec_pt,
greeting_wav, recording_path, 120_000,
call_id_owned, caller_owned, out_tx,
rtp_socket,
provider_media,
codec_pt,
greeting_wav,
recording_path,
120_000,
call_id_owned,
caller_owned,
out_tx,
)
.await;
});
@@ -1717,7 +1876,9 @@ impl CallManager {
});
let response = SipMessage::create_response(
200, "OK", invite,
200,
"OK",
invite,
Some(sip_proto::message::ResponseOptions {
to_tag: Some(sip_proto::helpers::generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
@@ -1816,9 +1977,9 @@ impl CallManager {
tokio::spawn(async move {
// Load prompt PCM frames if available.
let prompt_frames = prompt_wav.as_ref().and_then(|wav| {
crate::audio_player::load_prompt_pcm_frames(wav).ok()
});
let prompt_frames = prompt_wav
.as_ref()
.and_then(|wav| crate::audio_player::load_prompt_pcm_frames(wav).ok());
if let Some(frames) = prompt_frames {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
@@ -1884,7 +2045,11 @@ impl CallManager {
// Internal helpers
// -----------------------------------------------------------------------
fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option<SocketAddr> {
fn resolve_first_device(
&self,
config: &AppConfig,
registrar: &Registrar,
) -> Option<SocketAddr> {
for device in &config.devices {
if let Some(addr) = registrar.get_device_contact(&device.id) {
return Some(addr);
@@ -1907,8 +2072,7 @@ async fn resolve_greeting_wav(
tts_engine: &Arc<Mutex<TtsEngine>>,
) -> Option<String> {
// 1. Look up voicebox config.
let vb = voicebox_id
.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled));
let vb = voicebox_id.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled));
if let Some(vb) = vb {
// 2. Pre-recorded WAV takes priority.

View File

@@ -4,6 +4,7 @@
//! proxy engine via the `configure` command. These types mirror the TS interfaces.
use serde::Deserialize;
use sip_proto::message::SipMessage;
use std::net::SocketAddr;
/// Network endpoint.
@@ -227,6 +228,51 @@ pub struct IvrMenuEntry {
// Pattern matching (ported from ts/config.ts)
// ---------------------------------------------------------------------------
/// Extract the URI user part and normalize phone-like identities for routing.
///
/// This keeps inbound route matching stable across provider-specific URI shapes,
/// e.g. `sip:+49 421 219694@trunk.example` and `sip:0049421219694@trunk.example`
/// both normalize to `+49421219694`.
pub fn normalize_routing_identity(value: &str) -> String {
let extracted = SipMessage::extract_uri_user(value).unwrap_or(value).trim();
if extracted.is_empty() {
return String::new();
}
let mut digits = String::new();
let mut saw_plus = false;
for (idx, ch) in extracted.chars().enumerate() {
if ch.is_ascii_digit() {
digits.push(ch);
continue;
}
if ch == '+' && idx == 0 {
saw_plus = true;
continue;
}
if matches!(ch, ' ' | '\t' | '-' | '.' | '/' | '(' | ')') {
continue;
}
return extracted.to_string();
}
if digits.is_empty() {
return extracted.to_string();
}
if saw_plus {
return format!("+{digits}");
}
if digits.starts_with("00") && digits.len() > 2 {
return format!("+{}", &digits[2..]);
}
digits
}
/// Test a value against a pattern string.
/// - None/empty: matches everything (wildcard)
/// - Trailing '*': prefix match
@@ -363,7 +409,7 @@ impl AppConfig {
provider_id: &str,
called_number: &str,
caller_number: &str,
) -> InboundRouteResult {
) -> Option<InboundRouteResult> {
let mut routes: Vec<&Route> = self
.routing
.routes
@@ -387,22 +433,150 @@ impl AppConfig {
continue;
}
return InboundRouteResult {
return Some(InboundRouteResult {
device_ids: route.action.targets.clone().unwrap_or_default(),
ring_browsers: route.action.ring_browsers.unwrap_or(false),
voicemail_box: route.action.voicemail_box.clone(),
ivr_menu_id: route.action.ivr_menu_id.clone(),
no_answer_timeout: route.action.no_answer_timeout,
};
});
}
// Fallback: ring all devices + browsers.
InboundRouteResult {
device_ids: vec![],
ring_browsers: true,
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_app_config(routes: Vec<Route>) -> AppConfig {
AppConfig {
proxy: ProxyConfig {
lan_ip: "127.0.0.1".to_string(),
lan_port: 5070,
public_ip_seed: None,
rtp_port_range: RtpPortRange {
min: 20_000,
max: 20_100,
},
},
providers: vec![ProviderConfig {
id: "provider-a".to_string(),
display_name: "Provider A".to_string(),
domain: "example.com".to_string(),
outbound_proxy: Endpoint {
address: "example.com".to_string(),
port: 5060,
},
username: "user".to_string(),
password: "pass".to_string(),
register_interval_sec: 300,
codecs: vec![9],
quirks: Quirks {
early_media_silence: false,
silence_payload_type: None,
silence_max_packets: None,
},
}],
devices: vec![DeviceConfig {
id: "desk".to_string(),
display_name: "Desk".to_string(),
expected_address: "127.0.0.1".to_string(),
extension: "100".to_string(),
}],
routing: RoutingConfig { routes },
voiceboxes: vec![],
ivr: None,
}
}
#[test]
fn normalize_routing_identity_extracts_uri_user_and_phone_number() {
assert_eq!(
normalize_routing_identity("sip:0049 421 219694@voip.easybell.de"),
"+49421219694"
);
assert_eq!(
normalize_routing_identity("<tel:+49 (421) 219694>"),
"+49421219694"
);
assert_eq!(normalize_routing_identity("sip:100@pbx.local"), "100");
assert_eq!(normalize_routing_identity("sip:alice@pbx.local"), "alice");
}
#[test]
fn resolve_inbound_route_requires_explicit_match() {
let cfg = test_app_config(vec![]);
assert!(cfg
.resolve_inbound_route("provider-a", "+49421219694", "+491701234567")
.is_none());
}
#[test]
fn resolve_inbound_route_matches_per_number_on_shared_provider() {
let cfg = test_app_config(vec![
Route {
id: "main".to_string(),
name: "Main DID".to_string(),
priority: 200,
enabled: true,
match_criteria: RouteMatch {
direction: "inbound".to_string(),
number_pattern: Some("+49421219694".to_string()),
caller_pattern: None,
source_provider: Some("provider-a".to_string()),
source_device: None,
},
action: RouteAction {
targets: Some(vec!["desk".to_string()]),
ring_browsers: Some(true),
voicemail_box: None,
ivr_menu_id: None,
no_answer_timeout: None,
}
provider: None,
failover_providers: None,
strip_prefix: None,
prepend_prefix: None,
},
},
Route {
id: "support".to_string(),
name: "Support DID".to_string(),
priority: 100,
enabled: true,
match_criteria: RouteMatch {
direction: "inbound".to_string(),
number_pattern: Some("+49421219695".to_string()),
caller_pattern: None,
source_provider: Some("provider-a".to_string()),
source_device: None,
},
action: RouteAction {
targets: None,
ring_browsers: Some(false),
voicemail_box: Some("support-box".to_string()),
ivr_menu_id: None,
no_answer_timeout: Some(20),
provider: None,
failover_providers: None,
strip_prefix: None,
prepend_prefix: None,
},
},
]);
let main = cfg
.resolve_inbound_route("provider-a", "+49421219694", "+491701234567")
.expect("main DID should match");
assert_eq!(main.device_ids, vec!["desk".to_string()]);
assert!(main.ring_browsers);
let support = cfg
.resolve_inbound_route("provider-a", "+49421219695", "+491701234567")
.expect("support DID should match");
assert_eq!(support.voicemail_box.as_deref(), Some("support-box"));
assert_eq!(support.no_answer_timeout, Some(20));
assert!(!support.ring_browsers);
}
}

View File

@@ -19,7 +19,13 @@ pub struct Command {
}
/// Send a response to a command.
pub fn respond(tx: &OutTx, id: &str, success: bool, result: Option<serde_json::Value>, error: Option<&str>) {
pub fn respond(
tx: &OutTx,
id: &str,
success: bool,
result: Option<serde_json::Value>,
error: Option<&str>,
) {
let mut resp = serde_json::json!({ "id": id, "success": success });
if let Some(r) = result {
resp["result"] = r;

View File

@@ -63,7 +63,8 @@ pub fn spawn_sip_inbound(
if offset + 4 > n {
continue; // Malformed: extension header truncated.
}
let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
let ext_len =
u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_len * 4;
}
if offset >= n {
@@ -74,7 +75,17 @@ pub fn spawn_sip_inbound(
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() {
if inbound_tx
.send(RtpPacket {
payload,
payload_type: pt,
marker,
seq,
timestamp,
})
.await
.is_err()
{
break; // Channel closed — leg removed.
}
}

View File

@@ -5,7 +5,6 @@
/// (incoming calls, registration state).
///
/// No raw SIP ever touches TypeScript.
mod audio_player;
mod call;
mod call_manager;
@@ -26,7 +25,7 @@ mod voicemail;
mod webrtc_engine;
use crate::call_manager::CallManager;
use crate::config::AppConfig;
use crate::config::{normalize_routing_identity, AppConfig};
use crate::ipc::{emit_event, respond_err, respond_ok, Command, OutTx};
use crate::provider::ProviderManager;
use crate::registrar::Registrar;
@@ -266,11 +265,7 @@ async fn handle_sip_packet(
}
// 2. Device REGISTER — handled by registrar.
let is_from_provider = eng
.provider_mgr
.find_by_address(&from_addr)
.await
.is_some();
let is_from_provider = eng.provider_mgr.find_by_address(&from_addr).await.is_some();
if !is_from_provider && msg.method() == Some("REGISTER") {
if let Some(response_buf) = eng.registrar.handle_register(&msg, from_addr) {
@@ -349,11 +344,8 @@ async fn handle_sip_packet(
if let Some(inbound) = inbound {
// Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc).
let from_header = msg.get_header("From").unwrap_or("");
let from_uri = SipMessage::extract_uri(from_header).unwrap_or("Unknown");
let called_number = msg
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or("");
let from_uri = normalize_routing_identity(from_header);
let called_number = normalize_routing_identity(msg.request_uri().unwrap_or(""));
emit_event(
&eng.out_tx,
@@ -373,11 +365,7 @@ async fn handle_sip_packet(
// 5. New outbound INVITE from device.
if !is_from_provider && msg.is_request() && msg.method() == Some("INVITE") {
// Resolve outbound route.
let dialed_number = msg
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or(msg.request_uri().unwrap_or(""))
.to_string();
let dialed_number = normalize_routing_identity(msg.request_uri().unwrap_or(""));
let device = eng.registrar.find_by_address(&from_addr);
let device_id = device.map(|d| d.device_id.clone());
@@ -395,13 +383,18 @@ async fn handle_sip_packet(
if let Some(route) = route_result {
// Look up provider state by config ID (not by device address).
let (public_ip, registered_aor) = if let Some(ps_arc) =
eng.provider_mgr.find_by_provider_id(&route.provider.id).await
let (public_ip, registered_aor) = if let Some(ps_arc) = eng
.provider_mgr
.find_by_provider_id(&route.provider.id)
.await
{
let ps = ps_arc.lock().await;
(ps.public_ip.clone(), ps.registered_aor.clone())
} else {
(None, format!("sip:{}@{}", route.provider.username, route.provider.domain))
(
None,
format!("sip:{}@{}", route.provider.username, route.provider.domain),
)
};
let ProxyEngine {
@@ -461,14 +454,20 @@ async fn handle_sip_packet(
async fn handle_make_call(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing number");
return;
}
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
None => {
respond_err(out_tx, &cmd.id, "not configured");
return;
}
};
// Resolve provider.
@@ -482,29 +481,53 @@ async fn handle_make_call(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd:
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
None => {
respond_err(out_tx, &cmd.id, "no provider available");
return;
}
};
// Get public IP and registered AOR from provider state.
let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let (public_ip, registered_aor) = if let Some(ps_arc) = eng
.provider_mgr
.find_by_address(
&provider_config
.outbound_proxy
.to_socket_addr()
.unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()),
)
.await
{
let ps = ps_arc.lock().await;
(ps.public_ip.clone(), ps.registered_aor.clone())
} else {
// Fallback — construct AOR from config.
(None, format!("sip:{}@{}", provider_config.username, provider_config.domain))
(
None,
format!(
"sip:{}@{}",
provider_config.username, provider_config.domain
),
)
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
None => {
respond_err(out_tx, &cmd.id, "not initialized");
return;
}
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let ProxyEngine {
ref mut call_mgr,
ref mut rtp_pool,
..
} = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let call_id = call_mgr.make_outbound_call(
let call_id = call_mgr
.make_outbound_call(
&number,
&provider_config,
&config_ref,
@@ -512,19 +535,28 @@ async fn handle_make_call(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd:
&socket,
public_ip.as_deref(),
&registered_aor,
).await;
)
.await;
match call_id {
Some(id) => {
emit_event(out_tx, "outbound_call_started", serde_json::json!({
emit_event(
out_tx,
"outbound_call_started",
serde_json::json!({
"call_id": id,
"number": number,
"provider_id": provider_config.id,
}));
}),
);
respond_ok(out_tx, &cmd.id, serde_json::json!({ "call_id": id }));
}
None => {
respond_err(out_tx, &cmd.id, "call origination failed — provider not registered or no ports available");
respond_err(
out_tx,
&cmd.id,
"call origination failed — provider not registered or no ports available",
);
}
}
}
@@ -560,20 +592,30 @@ async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Co
async fn handle_webrtc_offer(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing session_id");
return;
}
};
let offer_sdp = match cmd.params.get("sdp").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing sdp"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing sdp");
return;
}
};
let mut wrtc = webrtc.lock().await;
match wrtc.handle_offer(&session_id, &offer_sdp).await {
Ok(answer_sdp) => {
respond_ok(out_tx, &cmd.id, serde_json::json!({
respond_ok(
out_tx,
&cmd.id,
serde_json::json!({
"session_id": session_id,
"sdp": answer_sdp,
}));
}),
);
}
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
@@ -584,14 +626,28 @@ async fn handle_webrtc_offer(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, c
async fn handle_webrtc_ice(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing session_id");
return;
}
};
let candidate = cmd.params.get("candidate").and_then(|v| v.as_str()).unwrap_or("");
let candidate = cmd
.params
.get("candidate")
.and_then(|v| v.as_str())
.unwrap_or("");
let sdp_mid = cmd.params.get("sdp_mid").and_then(|v| v.as_str());
let sdp_mline_index = cmd.params.get("sdp_mline_index").and_then(|v| v.as_u64()).map(|v| v as u16);
let sdp_mline_index = cmd
.params
.get("sdp_mline_index")
.and_then(|v| v.as_u64())
.map(|v| v as u16);
let wrtc = webrtc.lock().await;
match wrtc.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index).await {
match wrtc
.add_ice_candidate(&session_id, candidate, sdp_mid, sdp_mline_index)
.await
{
Ok(()) => respond_ok(out_tx, &cmd.id, serde_json::json!({})),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
@@ -608,11 +664,17 @@ async fn handle_webrtc_link(
) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing session_id");
return;
}
};
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
// Create channels for the WebRTC leg.
@@ -641,7 +703,12 @@ async fn handle_webrtc_link(
// Lock webrtc to wire the channels.
let mut wrtc = webrtc.lock().await;
if wrtc
.link_to_mixer(&session_id, &call_id, channels.inbound_tx, channels.outbound_rx)
.link_to_mixer(
&session_id,
&call_id,
channels.inbound_tx,
channels.outbound_rx,
)
.await
{
// Also store the WebRTC leg info in the call.
@@ -670,7 +737,10 @@ async fn handle_webrtc_link(
}
}
emit_event(out_tx, "leg_added", serde_json::json!({
emit_event(
out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": session_id,
"kind": "webrtc",
@@ -679,13 +749,18 @@ async fn handle_webrtc_link(
"rtpPort": 0,
"remoteMedia": null,
"metadata": {},
}));
}),
);
respond_ok(out_tx, &cmd.id, serde_json::json!({
respond_ok(
out_tx,
&cmd.id,
serde_json::json!({
"session_id": session_id,
"call_id": call_id,
"bridged": true,
}));
}),
);
} else {
respond_err(out_tx, &cmd.id, &format!("session {session_id} not found"));
}
@@ -695,45 +770,76 @@ async fn handle_webrtc_link(
async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing number");
return;
}
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
None => {
respond_err(out_tx, &cmd.id, "not configured");
return;
}
};
// Resolve provider.
let provider_config = if let Some(pid) = provider_id {
config_ref.providers.iter().find(|p| p.id == pid).cloned()
} else {
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
config_ref
.resolve_outbound_route(&number, None, &|_| true)
.map(|r| r.provider)
};
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
None => {
respond_err(out_tx, &cmd.id, "no provider available");
return;
}
};
// Get registered AOR.
let registered_aor = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let registered_aor = if let Some(ps_arc) = eng
.provider_mgr
.find_by_address(
&provider_config
.outbound_proxy
.to_socket_addr()
.unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()),
)
.await
{
let ps = ps_arc.lock().await;
ps.registered_aor.clone()
} else {
format!("sip:{}@{}", provider_config.username, provider_config.domain)
format!(
"sip:{}@{}",
provider_config.username, provider_config.domain
)
};
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(
&provider_config.outbound_proxy.to_socket_addr().unwrap_or_else(|| "0.0.0.0:0".parse().unwrap())
).await {
let public_ip = if let Some(ps_arc) = eng
.provider_mgr
.find_by_address(
&provider_config
.outbound_proxy
.to_socket_addr()
.unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()),
)
.await
{
let ps = ps_arc.lock().await;
ps.public_ip.clone()
} else {
@@ -742,16 +848,31 @@ async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &C
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
None => {
respond_err(out_tx, &cmd.id, "not initialized");
return;
}
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let ProxyEngine {
ref mut call_mgr,
ref mut rtp_pool,
..
} = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let leg_id = call_mgr.add_external_leg(
&call_id, &number, &provider_config, &config_ref,
rtp_pool, &socket, public_ip.as_deref(), &registered_aor,
).await;
let leg_id = call_mgr
.add_external_leg(
&call_id,
&number,
&provider_config,
&config_ref,
rtp_pool,
&socket,
public_ip.as_deref(),
&registered_aor,
)
.await;
match leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
@@ -763,33 +884,61 @@ async fn handle_add_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &C
async fn handle_add_device_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let device_id = match cmd.params.get("device_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing device_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing device_id");
return;
}
};
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
None => {
respond_err(out_tx, &cmd.id, "not configured");
return;
}
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
None => {
respond_err(out_tx, &cmd.id, "not initialized");
return;
}
};
let ProxyEngine { ref registrar, ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let ProxyEngine {
ref registrar,
ref mut call_mgr,
ref mut rtp_pool,
..
} = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let leg_id = call_mgr.add_device_leg(
&call_id, &device_id, registrar, &config_ref, rtp_pool, &socket,
).await;
let leg_id = call_mgr
.add_device_leg(
&call_id,
&device_id,
registrar,
&config_ref,
rtp_pool,
&socket,
)
.await;
match leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })),
None => respond_err(out_tx, &cmd.id, "failed to add device leg — device not registered or call not found"),
None => respond_err(
out_tx,
&cmd.id,
"failed to add device leg — device not registered or call not found",
),
}
}
@@ -797,19 +946,32 @@ async fn handle_add_device_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx,
async fn handle_transfer_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let source_call_id = match cmd.params.get("source_call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing source_call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing source_call_id");
return;
}
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing leg_id");
return;
}
};
let target_call_id = match cmd.params.get("target_call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing target_call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing target_call_id");
return;
}
};
let mut eng = engine.lock().await;
if eng.call_mgr.transfer_leg(&source_call_id, &leg_id, &target_call_id).await {
if eng
.call_mgr
.transfer_leg(&source_call_id, &leg_id, &target_call_id)
.await
{
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
} else {
respond_err(out_tx, &cmd.id, "transfer failed — call or leg not found");
@@ -820,57 +982,104 @@ async fn handle_transfer_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
async fn handle_replace_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let old_leg_id = match cmd.params.get("old_leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing old_leg_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing old_leg_id");
return;
}
};
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
Some(n) => n.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing number"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing number");
return;
}
};
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
let mut eng = engine.lock().await;
let config_ref = match &eng.config {
Some(c) => c.clone(),
None => { respond_err(out_tx, &cmd.id, "not configured"); return; }
None => {
respond_err(out_tx, &cmd.id, "not configured");
return;
}
};
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
None => {
respond_err(out_tx, &cmd.id, "not initialized");
return;
}
};
// Resolve provider.
let provider_config = if let Some(pid) = provider_id {
config_ref.providers.iter().find(|p| p.id == pid).cloned()
} else {
config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider)
config_ref
.resolve_outbound_route(&number, None, &|_| true)
.map(|r| r.provider)
};
let provider_config = match provider_config {
Some(p) => p,
None => { respond_err(out_tx, &cmd.id, "no provider available"); return; }
None => {
respond_err(out_tx, &cmd.id, "no provider available");
return;
}
};
let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_provider_id(&provider_config.id).await {
let (public_ip, registered_aor) = if let Some(ps_arc) = eng
.provider_mgr
.find_by_provider_id(&provider_config.id)
.await
{
let ps = ps_arc.lock().await;
(ps.public_ip.clone(), ps.registered_aor.clone())
} else {
(None, format!("sip:{}@{}", provider_config.username, provider_config.domain))
(
None,
format!(
"sip:{}@{}",
provider_config.username, provider_config.domain
),
)
};
let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng;
let ProxyEngine {
ref mut call_mgr,
ref mut rtp_pool,
..
} = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let new_leg_id = call_mgr.replace_leg(
&call_id, &old_leg_id, &number, &provider_config, &config_ref,
rtp_pool, &socket, public_ip.as_deref(), &registered_aor,
).await;
let new_leg_id = call_mgr
.replace_leg(
&call_id,
&old_leg_id,
&number,
&provider_config,
&config_ref,
rtp_pool,
&socket,
public_ip.as_deref(),
&registered_aor,
)
.await;
match new_leg_id {
Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "new_leg_id": lid })),
None => respond_err(out_tx, &cmd.id, "replace failed — call ended or dial failed"),
None => respond_err(
out_tx,
&cmd.id,
"replace failed — call ended or dial failed",
),
}
}
@@ -878,17 +1087,26 @@ async fn handle_replace_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd
async fn handle_remove_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing leg_id");
return;
}
};
let mut eng = engine.lock().await;
let socket = match &eng.transport {
Some(t) => t.socket(),
None => { respond_err(out_tx, &cmd.id, "not initialized"); return; }
None => {
respond_err(out_tx, &cmd.id, "not initialized");
return;
}
};
if eng.call_mgr.remove_leg(&call_id, &leg_id, &socket).await {
@@ -903,7 +1121,10 @@ async fn handle_remove_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd:
async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, cmd: &Command) {
let session_id = match cmd.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing session_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing session_id");
return;
}
};
let mut wrtc = webrtc.lock().await;
@@ -919,22 +1140,27 @@ async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, c
/// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF.
/// This command blocks until the interaction completes (digit, timeout, or cancel).
async fn handle_start_interaction(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
async fn handle_start_interaction(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing leg_id");
return;
}
};
let prompt_wav = match cmd.params.get("prompt_wav").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing prompt_wav"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing prompt_wav");
return;
}
};
let expected_digits: Vec<char> = cmd
.params
@@ -1007,10 +1233,8 @@ async fn handle_start_interaction(
serde_json::json!(result_str),
);
if let Some(ref d) = digit_str {
leg.metadata.insert(
"last_interaction_digit".to_string(),
serde_json::json!(d),
);
leg.metadata
.insert("last_interaction_digit".to_string(), serde_json::json!(d));
}
}
}
@@ -1024,18 +1248,20 @@ async fn handle_start_interaction(
}
/// Handle `add_tool_leg` — add a recording or transcription tool leg to a call.
async fn handle_add_tool_leg(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
async fn handle_add_tool_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let tool_type_str = match cmd.params.get("tool_type").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing tool_type"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing tool_type");
return;
}
};
let tool_type = match tool_type_str.as_str() {
@@ -1066,13 +1292,11 @@ async fn handle_add_tool_leg(
out_tx.clone(),
)
}
crate::mixer::ToolType::Transcription => {
crate::tool_leg::spawn_transcription_tool(
crate::mixer::ToolType::Transcription => crate::tool_leg::spawn_transcription_tool(
tool_leg_id.clone(),
call_id.clone(),
out_tx.clone(),
)
}
),
};
// Send AddToolLeg to the mixer and register in call.
@@ -1097,10 +1321,7 @@ async fn handle_add_tool_leg(
// Register tool leg in the call's leg map.
let mut metadata = std::collections::HashMap::new();
metadata.insert(
"tool_type".to_string(),
serde_json::json!(tool_type_str),
);
metadata.insert("tool_type".to_string(), serde_json::json!(tool_type_str));
call.legs.insert(
tool_leg_id.clone(),
crate::call::LegInfo {
@@ -1144,18 +1365,20 @@ async fn handle_add_tool_leg(
}
/// Handle `remove_tool_leg` — remove a tool leg from a call.
async fn handle_remove_tool_leg(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
async fn handle_remove_tool_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let tool_leg_id = match cmd.params.get("tool_leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing tool_leg_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing tool_leg_id");
return;
}
};
let mut eng = engine.lock().await;
@@ -1191,26 +1414,34 @@ async fn handle_remove_tool_leg(
}
/// Handle `set_leg_metadata` — set a metadata key on a leg.
async fn handle_set_leg_metadata(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
async fn handle_set_leg_metadata(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing leg_id");
return;
}
};
let key = match cmd.params.get("key").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing key"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing key");
return;
}
};
let value = match cmd.params.get("value") {
Some(v) => v.clone(),
None => { respond_err(out_tx, &cmd.id, "missing value"); return; }
None => {
respond_err(out_tx, &cmd.id, "missing value");
return;
}
};
let mut eng = engine.lock().await;
@@ -1234,11 +1465,7 @@ async fn handle_set_leg_metadata(
}
/// Handle `generate_tts` — synthesize text to a WAV file using Kokoro TTS.
async fn handle_generate_tts(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
async fn handle_generate_tts(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
let tts_engine = engine.lock().await.tts_engine.clone();
let mut tts = tts_engine.lock().await;
match tts.generate(&cmd.params).await {

View File

@@ -7,7 +7,8 @@
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
//! 1. Drain inbound channels, reorder RTP, decode variable-duration packets to 48kHz,
//! and queue them in per-leg PCM buffers
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
@@ -16,7 +17,7 @@
use crate::ipc::{emit_event, OutTx};
use crate::jitter_buffer::{JitterBuffer, JitterResult};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use crate::rtp::{build_rtp_header, rtp_clock_increment, rtp_clock_rate};
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
use nnnoiseless::DenoiseState;
use std::collections::{HashMap, VecDeque};
@@ -29,6 +30,12 @@ use tokio::time::{self, Duration, MissedTickBehavior};
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
/// Safety cap for how much timestamp-derived gap fill we synthesize at once.
const MAX_GAP_FILL_SAMPLES: usize = MIX_FRAME_SIZE * 6; // 120ms
/// Bound how many decode / concealment steps a leg can consume in one tick.
const MAX_PACKET_STEPS_PER_TICK: usize = 24;
/// Report the first output drop immediately, then every N drops.
const DROP_REPORT_INTERVAL: u64 = 50;
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
@@ -39,10 +46,6 @@ pub struct RtpPacket {
/// RTP sequence number for reordering.
pub seq: u16,
/// RTP timestamp from the original packet header.
///
/// Set on inbound RTP but not yet consumed downstream — reserved for
/// future jitter/sync work in the mixer.
#[allow(dead_code)]
pub timestamp: u32,
}
@@ -109,6 +112,7 @@ struct ToolLegSlot {
#[allow(dead_code)]
tool_type: ToolType,
audio_tx: mpsc::Sender<ToolAudioBatch>,
dropped_batches: u64,
}
// ---------------------------------------------------------------------------
@@ -163,8 +167,15 @@ struct MixerLegSlot {
denoiser: Box<DenoiseState<'static>>,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Decoded PCM waiting for playout. Variable-duration RTP packets are
/// decoded into this FIFO; the mixer consumes exactly one 20ms frame per tick.
pcm_buffer: VecDeque<f32>,
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
last_pcm_frame: Vec<f32>,
/// Next RTP timestamp expected from the inbound stream.
expected_rtp_timestamp: Option<u32>,
/// Best-effort estimate of packet duration in RTP clock units.
estimated_packet_ts: u32,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
/// Per-leg jitter buffer for packet reordering and timing.
@@ -173,15 +184,242 @@ struct MixerLegSlot {
rtp_seq: u16,
rtp_ts: u32,
rtp_ssrc: u32,
/// Dropped outbound frames for this leg (queue full / closed).
outbound_drops: u64,
/// Current role of this leg in the mixer.
role: LegRole,
}
fn mix_samples_to_rtp_ts(codec_pt: u8, mix_samples: usize) -> u32 {
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
(((mix_samples as u64 * clock_rate) + (MIX_RATE as u64 / 2)) / MIX_RATE as u64) as u32
}
fn rtp_ts_to_mix_samples(codec_pt: u8, rtp_ts: u32) -> usize {
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
(((rtp_ts as u64 * MIX_RATE as u64) + (clock_rate / 2)) / clock_rate) as usize
}
fn is_forward_rtp_delta(delta: u32) -> bool {
delta > 0 && delta < 0x8000_0000
}
fn should_emit_drop_event(total_drops: u64) -> bool {
total_drops == 1 || total_drops % DROP_REPORT_INTERVAL == 0
}
fn emit_output_drop_event(
out_tx: &OutTx,
call_id: &str,
leg_id: Option<&str>,
tool_leg_id: Option<&str>,
stream: &str,
reason: &str,
total_drops: u64,
) {
if !should_emit_drop_event(total_drops) {
return;
}
emit_event(
out_tx,
"mixer_output_drop",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"tool_leg_id": tool_leg_id,
"stream": stream,
"reason": reason,
"total_drops": total_drops,
}),
);
}
fn fade_concealment_from_last_frame(slot: &mut MixerLegSlot, samples: usize, decay: f32) {
let mut template = if slot.last_pcm_frame.is_empty() {
vec![0.0f32; MIX_FRAME_SIZE]
} else {
slot.last_pcm_frame.clone()
};
let mut remaining = samples;
while remaining > 0 {
for sample in &mut template {
*sample *= decay;
}
let take = remaining.min(template.len());
slot.pcm_buffer.extend(template.iter().take(take).copied());
remaining -= take;
}
}
fn append_packet_loss_concealment(slot: &mut MixerLegSlot, samples: usize) {
let mut remaining = samples.max(1);
while remaining > 0 {
let chunk = remaining.min(MIX_FRAME_SIZE);
if slot.codec_pt == codec_lib::PT_OPUS {
match slot.transcoder.opus_plc(chunk) {
Ok(mut pcm) => {
pcm.resize(chunk, 0.0);
slot.pcm_buffer.extend(pcm);
}
Err(_) => fade_concealment_from_last_frame(slot, chunk, 0.8),
}
} else {
fade_concealment_from_last_frame(slot, chunk, 0.85);
}
remaining -= chunk;
}
}
fn decode_packet_to_mix_pcm(slot: &mut MixerLegSlot, pkt: &RtpPacket) -> Option<Vec<f32>> {
let (pcm, rate) = slot
.transcoder
.decode_to_f32(&pkt.payload, pkt.payload_type)
.ok()?;
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
Some(processed)
}
fn queue_inbound_packet(slot: &mut MixerLegSlot, pkt: RtpPacket) {
if let Some(pcm_48k) = decode_packet_to_mix_pcm(slot, &pkt) {
if pcm_48k.is_empty() {
return;
}
if let Some(expected_ts) = slot.expected_rtp_timestamp {
let gap_ts = pkt.timestamp.wrapping_sub(expected_ts);
if is_forward_rtp_delta(gap_ts) {
let gap_samples = rtp_ts_to_mix_samples(slot.codec_pt, gap_ts);
if gap_samples <= MAX_GAP_FILL_SAMPLES {
append_packet_loss_concealment(slot, gap_samples);
} else {
slot.pcm_buffer.clear();
}
}
}
let packet_ts = mix_samples_to_rtp_ts(slot.codec_pt, pcm_48k.len());
if packet_ts > 0 {
slot.estimated_packet_ts = packet_ts;
slot.expected_rtp_timestamp = Some(pkt.timestamp.wrapping_add(packet_ts));
}
slot.pcm_buffer.extend(pcm_48k);
}
}
fn fill_leg_playout_buffer(slot: &mut MixerLegSlot) {
let mut steps = 0usize;
while slot.pcm_buffer.len() < MIX_FRAME_SIZE && steps < MAX_PACKET_STEPS_PER_TICK {
steps += 1;
match slot.jitter.consume() {
JitterResult::Packet(pkt) => queue_inbound_packet(slot, pkt),
JitterResult::Missing => {
let conceal_ts = slot.estimated_packet_ts.max(rtp_clock_increment(slot.codec_pt));
let conceal_samples = rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts)
.clamp(1, MAX_GAP_FILL_SAMPLES);
append_packet_loss_concealment(slot, conceal_samples);
if let Some(expected_ts) = slot.expected_rtp_timestamp {
slot.expected_rtp_timestamp = Some(expected_ts.wrapping_add(conceal_ts));
}
}
JitterResult::Filling => break,
}
}
}
fn take_mix_frame(slot: &mut MixerLegSlot) -> Vec<f32> {
let mut frame = Vec::with_capacity(MIX_FRAME_SIZE);
while frame.len() < MIX_FRAME_SIZE {
if let Some(sample) = slot.pcm_buffer.pop_front() {
frame.push(sample);
} else {
frame.push(0.0);
}
}
frame
}
fn soft_limit_sample(sample: f32) -> f32 {
const KNEE: f32 = 0.85;
let abs = sample.abs();
if abs <= KNEE {
sample
} else {
let excess = abs - KNEE;
let compressed = KNEE + (excess / (1.0 + (excess / (1.0 - KNEE))));
sample.signum() * compressed.min(1.0)
}
}
fn try_send_leg_output(
out_tx: &OutTx,
call_id: &str,
leg_id: &str,
slot: &mut MixerLegSlot,
rtp: Vec<u8>,
stream: &str,
) {
let reason = match slot.outbound_tx.try_send(rtp) {
Ok(()) => return,
Err(mpsc::error::TrySendError::Full(_)) => "full",
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
};
slot.outbound_drops += 1;
emit_output_drop_event(
out_tx,
call_id,
Some(leg_id),
None,
stream,
reason,
slot.outbound_drops,
);
}
fn try_send_tool_output(
out_tx: &OutTx,
call_id: &str,
tool_leg_id: &str,
tool: &mut ToolLegSlot,
batch: ToolAudioBatch,
) {
let reason = match tool.audio_tx.try_send(batch) {
Ok(()) => return,
Err(mpsc::error::TrySendError::Full(_)) => "full",
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
};
tool.dropped_batches += 1;
emit_output_drop_event(
out_tx,
call_id,
None,
Some(tool_leg_id),
"tool-batch",
reason,
tool.dropped_batches,
);
}
/// Spawn the mixer task for a call. Returns the command sender and task handle.
pub fn spawn_mixer(
call_id: String,
out_tx: OutTx,
) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
pub fn spawn_mixer(call_id: String, out_tx: OutTx) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
let handle = tokio::spawn(async move {
@@ -192,11 +430,7 @@ pub fn spawn_mixer(
}
/// The 20ms mixing loop.
async fn mixer_loop(
call_id: String,
mut cmd_rx: mpsc::Receiver<MixerCommand>,
out_tx: OutTx,
) {
async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, out_tx: OutTx) {
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
let mut tool_legs: HashMap<String, ToolLegSlot> = HashMap::new();
let mut interval = time::interval(Duration::from_millis(20));
@@ -237,11 +471,15 @@ async fn mixer_loop(
denoiser: new_denoiser(),
inbound_rx,
outbound_tx,
pcm_buffer: VecDeque::new(),
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
expected_rtp_timestamp: None,
estimated_packet_ts: rtp_clock_increment(codec_pt),
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
rtp_ssrc: rand::random(),
outbound_drops: 0,
role: LegRole::Participant,
jitter: JitterBuffer::new(),
},
@@ -302,7 +540,14 @@ async fn mixer_loop(
tool_type,
audio_tx,
}) => {
tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx });
tool_legs.insert(
leg_id,
ToolLegSlot {
tool_type,
audio_tx,
dropped_batches: 0,
},
);
}
Ok(MixerCommand::RemoveToolLeg { leg_id }) => {
tool_legs.remove(&leg_id);
@@ -343,54 +588,11 @@ async fn mixer_loop(
}
}
// Step 2b: Consume exactly one frame from the jitter buffer.
match slot.jitter.consume() {
JitterResult::Packet(pkt) => {
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
let mut frame = processed;
frame.resize(MIX_FRAME_SIZE, 0.0);
slot.last_pcm_frame = frame;
}
Err(_) => {}
}
}
JitterResult::Missing => {
// Invoke Opus PLC or fade for non-Opus codecs.
if slot.codec_pt == codec_lib::PT_OPUS {
match slot.transcoder.opus_plc(MIX_FRAME_SIZE) {
Ok(pcm) => {
slot.last_pcm_frame = pcm;
}
Err(_) => {
for s in slot.last_pcm_frame.iter_mut() {
*s *= 0.8;
}
}
}
} else {
// Non-Opus: fade last frame toward silence.
for s in slot.last_pcm_frame.iter_mut() {
*s *= 0.85;
}
}
}
JitterResult::Filling => {
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
}
}
// Step 2b: Decode enough RTP to cover one 20ms playout frame.
// Variable-duration packets (10ms, 20ms, 60ms, ...) accumulate in
// the per-leg PCM FIFO; we pop exactly one 20ms frame below.
fill_leg_playout_buffer(slot);
slot.last_pcm_frame = take_mix_frame(slot);
// Run jitter adaptation + prune stale packets.
slot.jitter.adapt();
@@ -404,6 +606,9 @@ async fn mixer_loop(
}
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
slot.pcm_buffer.clear();
slot.expected_rtp_timestamp = None;
slot.estimated_packet_ts = rtp_clock_increment(slot.codec_pt);
}
}
@@ -426,12 +631,12 @@ async fn mixer_loop(
for (lid, slot) in legs.iter_mut() {
match &mut slot.role {
LegRole::Participant => {
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
// Mix-minus: total minus this leg's own contribution.
// Apply a light soft limiter instead of hard clipping the sum.
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(sample.clamp(-1.0, 1.0));
let sample = (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(soft_limit_sample(sample));
}
// Resample from 48kHz to the leg's codec native rate.
@@ -445,8 +650,7 @@ async fn mixer_loop(
};
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
let encoded =
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
let encoded = match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
@@ -460,8 +664,7 @@ async fn mixer_loop(
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
// Non-blocking send — drop frame if channel is full.
let _ = slot.outbound_tx.try_send(rtp);
try_send_leg_output(&out_tx, &call_id, lid, slot, rtp, "participant-audio");
}
LegRole::Isolated(state) => {
// Check for DTMF digit from this leg.
@@ -487,8 +690,7 @@ async fn mixer_loop(
if let Some(digit) = matched_digit {
// Interaction complete — digit matched.
completed_interactions
.push((lid.clone(), InteractionResult::Digit(digit)));
completed_interactions.push((lid.clone(), InteractionResult::Digit(digit)));
} else {
// Play prompt frame or silence.
let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() {
@@ -508,6 +710,7 @@ async fn mixer_loop(
.unwrap_or_default()
};
let mut prompt_rtp: Option<Vec<u8>> = None;
if let Ok(encoded) =
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
{
@@ -521,10 +724,9 @@ async fn mixer_loop(
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot
.rtp_ts
.wrapping_add(rtp_clock_increment(slot.codec_pt));
let _ = slot.outbound_tx.try_send(rtp);
slot.rtp_ts =
slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
prompt_rtp = Some(rtp);
}
}
@@ -537,6 +739,17 @@ async fn mixer_loop(
state.timeout_ticks_remaining -= 1;
}
}
if let Some(rtp) = prompt_rtp {
try_send_leg_output(
&out_tx,
&call_id,
lid,
slot,
rtp,
"isolated-prompt",
);
}
}
}
}
@@ -566,7 +779,7 @@ async fn mixer_loop(
})
.collect();
for tool in tool_legs.values() {
for (tool_leg_id, tool) in tool_legs.iter_mut() {
let batch = ToolAudioBatch {
sources: sources
.iter()
@@ -576,8 +789,7 @@ async fn mixer_loop(
})
.collect(),
};
// Non-blocking send — drop batch if tool can't keep up.
let _ = tool.audio_tx.try_send(batch);
try_send_tool_output(&out_tx, &call_id, tool_leg_id, tool, batch);
}
}
@@ -610,7 +822,14 @@ async fn mixer_loop(
rtp_out.extend_from_slice(&dtmf_pkt.payload);
target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1);
// Don't increment rtp_ts for DTMF — it shares timestamp context with audio.
let _ = target_slot.outbound_tx.try_send(rtp_out);
try_send_leg_output(
&out_tx,
&call_id,
target_lid,
target_slot,
rtp_out,
"dtmf",
);
}
}
}

View File

@@ -267,11 +267,7 @@ impl ProviderManager {
/// Try to handle a SIP response as a provider registration response.
/// Returns true if consumed.
pub async fn handle_response(
&self,
msg: &SipMessage,
socket: &UdpSocket,
) -> bool {
pub async fn handle_response(&self, msg: &SipMessage, socket: &UdpSocket) -> bool {
for ps_arc in &self.providers {
let mut ps = ps_arc.lock().await;
let was_registered = ps.is_registered;
@@ -322,7 +318,10 @@ impl ProviderManager {
}
/// Find a provider by its config ID (e.g. "easybell").
pub async fn find_by_provider_id(&self, provider_id: &str) -> Option<Arc<Mutex<ProviderState>>> {
pub async fn find_by_provider_id(
&self,
provider_id: &str,
) -> Option<Arc<Mutex<ProviderState>>> {
for ps_arc in &self.providers {
let ps = ps_arc.lock().await;
if ps.config.id == provider_id {

View File

@@ -25,8 +25,7 @@ impl Recorder {
) -> Result<Self, String> {
// Ensure parent directory exists.
if let Some(parent) = Path::new(file_path).parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create dir: {e}"))?;
std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?;
}
let sample_rate = 8000u32; // Record at 8kHz (standard telephony)
@@ -57,10 +56,13 @@ impl Recorder {
/// Create a recorder that writes raw PCM at a given sample rate.
/// Used by tool legs that already have decoded PCM (no RTP processing needed).
pub fn new_pcm(file_path: &str, sample_rate: u32, max_duration_ms: Option<u64>) -> Result<Self, String> {
pub fn new_pcm(
file_path: &str,
sample_rate: u32,
max_duration_ms: Option<u64>,
) -> Result<Self, String> {
if let Some(parent) = Path::new(file_path).parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create dir: {e}"))?;
std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?;
}
let spec = hound::WavSpec {

View File

@@ -60,18 +60,17 @@ impl Registrar {
/// Try to handle a SIP REGISTER from a device.
/// Returns Some(response_bytes) if handled, None if not a known device.
pub fn handle_register(
&mut self,
msg: &SipMessage,
from_addr: SocketAddr,
) -> Option<Vec<u8>> {
pub fn handle_register(&mut self, msg: &SipMessage, from_addr: SocketAddr) -> Option<Vec<u8>> {
if msg.method() != Some("REGISTER") {
return None;
}
// Find the device by matching the source IP against expectedAddress.
let from_ip = from_addr.ip().to_string();
let device = self.devices.iter().find(|d| d.expected_address == from_ip)?;
let device = self
.devices
.iter()
.find(|d| d.expected_address == from_ip)?;
let from_header = msg.get_header("From").unwrap_or("");
let aor = SipMessage::extract_uri(from_header)
@@ -79,9 +78,7 @@ impl Registrar {
.unwrap_or_else(|| format!("sip:{}@{}", device.extension, from_ip));
let expires_header = msg.get_header("Expires");
let requested: u32 = expires_header
.and_then(|s| s.parse().ok())
.unwrap_or(3600);
let requested: u32 = expires_header.and_then(|s| s.parse().ok()).unwrap_or(3600);
let expires = requested.min(MAX_EXPIRES);
let entry = RegisteredDevice {
@@ -122,10 +119,7 @@ impl Registrar {
Some(ResponseOptions {
to_tag: Some(generate_tag()),
contact: Some(contact),
extra_headers: Some(vec![(
"Expires".to_string(),
expires.to_string(),
)]),
extra_headers: Some(vec![("Expires".to_string(), expires.to_string())]),
..Default::default()
}),
);
@@ -145,8 +139,8 @@ impl Registrar {
/// Find a registered device by its source IP address.
pub fn find_by_address(&self, addr: &SocketAddr) -> Option<&RegisteredDevice> {
let ip = addr.ip().to_string();
self.registered.values().find(|e| {
e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at
})
self.registered
.values()
.find(|e| e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at)
}
}

View File

@@ -82,10 +82,15 @@ pub fn build_rtp_header(pt: u8, seq: u16, timestamp: u32, ssrc: u32) -> [u8; 12]
/// Get the RTP clock increment per 20ms frame for a payload type.
pub fn rtp_clock_increment(pt: u8) -> u32 {
rtp_clock_rate(pt) / 50
}
/// Get the RTP clock rate for a payload type.
pub fn rtp_clock_rate(pt: u8) -> u32 {
match pt {
9 => 160, // G.722: 8000 Hz clock rate (despite 16kHz audio) × 0.02s
0 | 8 => 160, // PCMU/PCMA: 8000 × 0.02
111 => 960, // Opus: 48000 × 0.02
_ => 160,
9 => 8000, // G.722 uses an 8kHz RTP clock despite 16kHz audio.
0 | 8 => 8000, // PCMU/PCMA
111 => 48000, // Opus
_ => 8000,
}
}

View File

@@ -128,17 +128,24 @@ impl SipLeg {
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
extra_headers: Some(vec![(
"User-Agent".to_string(),
"SipRouter/1.0".to_string(),
)]),
},
);
self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port));
self.dialog = Some(SipDialog::from_uac_invite(
&invite,
ip,
self.config.lan_port,
));
self.invite = Some(invite.clone());
self.state = LegState::Inviting;
let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await;
let _ = socket
.send_to(&invite.serialize(), self.config.sip_target)
.await;
}
/// Handle an incoming SIP message routed to this leg.
@@ -443,10 +450,7 @@ pub enum SipLegAction {
/// Build an ACK for a non-2xx response (same transaction as the INVITE).
fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage {
let via = original_invite.get_header("Via").unwrap_or("").to_string();
let from = original_invite
.get_header("From")
.unwrap_or("")
.to_string();
let from = original_invite.get_header("From").unwrap_or("").to_string();
let to = response.get_header("To").unwrap_or("").to_string();
let call_id = original_invite.call_id().to_string();
let cseq_num: u32 = original_invite

View File

@@ -28,10 +28,8 @@ impl SipTransport {
}
/// Spawn the UDP receive loop. Calls the handler for every received packet.
pub fn spawn_receiver<F>(
&self,
handler: F,
) where
pub fn spawn_receiver<F>(&self, handler: F)
where
F: Fn(&[u8], SocketAddr) + Send + 'static,
{
let socket = self.socket.clone();

View File

@@ -51,7 +51,8 @@ pub fn spawn_recording_tool(
});
// Convert f32 [-1.0, 1.0] to i16 for WAV writing.
let pcm_i16: Vec<i16> = source.pcm_48k
let pcm_i16: Vec<i16> = source
.pcm_48k
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();

View File

@@ -39,18 +39,33 @@ impl TtsEngine {
/// - `output`: output WAV file path
/// - `cacheable`: if true, skip synthesis when the output WAV already
/// matches the same text+voice (checked via a `.meta` sidecar file)
pub async fn generate(&mut self, params: &serde_json::Value) -> Result<serde_json::Value, String> {
let model_path = params.get("model").and_then(|v| v.as_str())
pub async fn generate(
&mut self,
params: &serde_json::Value,
) -> Result<serde_json::Value, String> {
let model_path = params
.get("model")
.and_then(|v| v.as_str())
.ok_or("missing 'model' param")?;
let voices_path = params.get("voices").and_then(|v| v.as_str())
let voices_path = params
.get("voices")
.and_then(|v| v.as_str())
.ok_or("missing 'voices' param")?;
let voice_name = params.get("voice").and_then(|v| v.as_str())
let voice_name = params
.get("voice")
.and_then(|v| v.as_str())
.unwrap_or("af_bella");
let text = params.get("text").and_then(|v| v.as_str())
let text = params
.get("text")
.and_then(|v| v.as_str())
.ok_or("missing 'text' param")?;
let output_path = params.get("output").and_then(|v| v.as_str())
let output_path = params
.get("output")
.and_then(|v| v.as_str())
.ok_or("missing 'output' param")?;
let cacheable = params.get("cacheable").and_then(|v| v.as_bool())
let cacheable = params
.get("cacheable")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if text.is_empty() {
@@ -94,10 +109,14 @@ impl TtsEngine {
let voice = select_voice(voice_name);
eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\"");
let (samples, duration) = tts.synth(text, voice)
let (samples, duration) = tts
.synth(text, voice)
.await
.map_err(|e| format!("synthesis failed: {e:?}"))?;
eprintln!("[tts] synthesized {} samples in {duration:?}", samples.len());
eprintln!(
"[tts] synthesized {} samples in {duration:?}",
samples.len()
);
// Write 24kHz 16-bit mono WAV.
let spec = hound::WavSpec {
@@ -111,9 +130,13 @@ impl TtsEngine {
.map_err(|e| format!("WAV create failed: {e}"))?;
for &sample in &samples {
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
writer.write_sample(s16).map_err(|e| format!("WAV write: {e}"))?;
writer
.write_sample(s16)
.map_err(|e| format!("WAV write: {e}"))?;
}
writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?;
writer
.finalize()
.map_err(|e| format!("WAV finalize: {e}"))?;
// Write sidecar for future cache checks.
if cacheable {

View File

@@ -58,9 +58,7 @@ impl WebRtcEngine {
.register_default_codecs()
.map_err(|e| format!("register codecs: {e}"))?;
let api = APIBuilder::new()
.with_media_engine(media_engine)
.build();
let api = APIBuilder::new().with_media_engine(media_engine).build();
let config = RTCConfiguration {
ice_servers: vec![],
@@ -91,8 +89,7 @@ impl WebRtcEngine {
.map_err(|e| format!("add track: {e}"))?;
// Shared mixer channel sender (populated when linked to a call).
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> =
Arc::new(Mutex::new(None));
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> = Arc::new(Mutex::new(None));
// ICE candidate handler.
let out_tx_ice = self.out_tx.clone();
@@ -256,7 +253,11 @@ impl WebRtcEngine {
pub async fn close_session(&mut self, session_id: &str) -> Result<(), String> {
if let Some(session) = self.sessions.remove(session_id) {
session.pc.close().await.map_err(|e| format!("close: {e}"))?;
session
.pc
.close()
.await
.map_err(|e| format!("close: {e}"))?;
}
Ok(())
}

View File

@@ -51,9 +51,7 @@ impl SipDialog {
.map(|s| s.to_string())
.unwrap_or_else(generate_tag),
remote_tag: None,
local_uri: SipMessage::extract_uri(from)
.unwrap_or("")
.to_string(),
local_uri: SipMessage::extract_uri(from).unwrap_or("").to_string(),
remote_uri: SipMessage::extract_uri(to).unwrap_or("").to_string(),
local_cseq,
remote_cseq: 0,
@@ -181,10 +179,7 @@ impl SipDialog {
format!("<{}>{remote_tag_str}", self.remote_uri),
),
("Call-ID".to_string(), self.call_id.clone()),
(
"CSeq".to_string(),
format!("{} {method}", self.local_cseq),
),
("CSeq".to_string(), format!("{} {method}", self.local_cseq)),
("Max-Forwards".to_string(), "70".to_string()),
];
@@ -243,10 +238,7 @@ impl SipDialog {
format!("<{}>{remote_tag_str}", self.remote_uri),
),
("Call-ID".to_string(), self.call_id.clone()),
(
"CSeq".to_string(),
format!("{} ACK", self.local_cseq),
),
("CSeq".to_string(), format!("{} ACK", self.local_cseq)),
("Max-Forwards".to_string(), "70".to_string()),
];
@@ -271,10 +263,7 @@ impl SipDialog {
("From".to_string(), from),
("To".to_string(), to),
("Call-ID".to_string(), self.call_id.clone()),
(
"CSeq".to_string(),
format!("{} CANCEL", self.local_cseq),
),
("CSeq".to_string(), format!("{} CANCEL", self.local_cseq)),
("Max-Forwards".to_string(), "70".to_string()),
("Content-Length".to_string(), "0".to_string()),
];
@@ -284,11 +273,7 @@ impl SipDialog {
.unwrap_or(&self.remote_target)
.to_string();
SipMessage::new(
format!("CANCEL {ruri} SIP/2.0"),
headers,
String::new(),
)
SipMessage::new(format!("CANCEL {ruri} SIP/2.0"), headers, String::new())
}
/// Transition the dialog to terminated state.

View File

@@ -27,7 +27,9 @@ pub fn generate_branch() -> String {
fn random_hex(bytes: usize) -> String {
let mut rng = rand::thread_rng();
(0..bytes).map(|_| format!("{:02x}", rng.gen::<u8>())).collect()
(0..bytes)
.map(|_| format!("{:02x}", rng.gen::<u8>()))
.collect()
}
// ---- Codec registry --------------------------------------------------------
@@ -142,7 +144,9 @@ pub fn parse_digest_challenge(header: &str) -> Option<DigestChallenge> {
return Some(after[1..1 + end].to_string());
}
// Unquoted value.
let end = after.find(|c: char| c == ',' || c.is_whitespace()).unwrap_or(after.len());
let end = after
.find(|c: char| c == ',' || c.is_whitespace())
.unwrap_or(after.len());
return Some(after[..end].to_string());
}
None
@@ -241,11 +245,7 @@ pub struct MwiResult {
pub extra_headers: Vec<(String, String)>,
}
pub fn build_mwi_body(
new_messages: u32,
old_messages: u32,
account_uri: &str,
) -> MwiResult {
pub fn build_mwi_body(new_messages: u32, old_messages: u32, account_uri: &str) -> MwiResult {
let waiting = if new_messages > 0 { "yes" } else { "no" };
let body = format!(
"Messages-Waiting: {waiting}\r\n\

View File

@@ -4,9 +4,9 @@
//! SDP handling, Digest authentication, and URI rewriting.
//! Ported from the TypeScript `ts/sip/` library.
pub mod message;
pub mod dialog;
pub mod helpers;
pub mod message;
pub mod rewrite;
/// Network endpoint (address + port + optional negotiated codec).

View File

@@ -14,7 +14,11 @@ pub struct SipMessage {
impl SipMessage {
pub fn new(start_line: String, headers: Vec<(String, String)>, body: String) -> Self {
Self { start_line, headers, body }
Self {
start_line,
headers,
body,
}
}
// ---- Parsing -----------------------------------------------------------
@@ -175,7 +179,8 @@ impl SipMessage {
/// Inserts a header at the top of the header list.
pub fn prepend_header(&mut self, name: &str, value: &str) -> &mut Self {
self.headers.insert(0, (name.to_string(), value.to_string()));
self.headers
.insert(0, (name.to_string(), value.to_string()));
self
}
@@ -233,10 +238,7 @@ impl SipMessage {
.to_display_name
.map(|d| format!("\"{d}\" "))
.unwrap_or_default();
let to_tag_str = opts
.to_tag
.map(|t| format!(";tag={t}"))
.unwrap_or_default();
let to_tag_str = opts.to_tag.map(|t| format!(";tag={t}")).unwrap_or_default();
let mut headers = vec![
(
@@ -364,7 +366,43 @@ impl SipMessage {
.find(|c: char| c == ';' || c == '>')
.unwrap_or(trimmed.len());
let result = &trimmed[..end];
if result.is_empty() { None } else { Some(result) }
if result.is_empty() {
None
} else {
Some(result)
}
}
}
/// Extract the user part from a SIP/TEL URI or header value.
pub fn extract_uri_user(uri_or_header_value: &str) -> Option<&str> {
let raw = Self::extract_uri(uri_or_header_value).unwrap_or(uri_or_header_value);
let raw = raw.trim();
if raw.is_empty() {
return None;
}
let user_part = if raw
.get(..5)
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("sips:"))
{
&raw[5..]
} else if raw.get(..4).is_some_and(|prefix| {
prefix.eq_ignore_ascii_case("sip:") || prefix.eq_ignore_ascii_case("tel:")
}) {
&raw[4..]
} else {
raw
};
let end = user_part
.find(|c: char| matches!(c, '@' | ';' | '?' | '>'))
.unwrap_or(user_part.len());
let result = &user_part[..end];
if result.is_empty() {
None
} else {
Some(result)
}
}
}
@@ -506,6 +544,19 @@ mod tests {
SipMessage::extract_uri("\"Name\" <sip:user@host>;tag=abc"),
Some("sip:user@host")
);
assert_eq!(
SipMessage::extract_uri_user("\"Name\" <sip:+49 421 219694@host>;tag=abc"),
Some("+49 421 219694")
);
assert_eq!(
SipMessage::extract_uri_user("sip:0049421219694@voip.easybell.de"),
Some("0049421219694")
);
assert_eq!(
SipMessage::extract_uri_user("tel:+49421219694;phone-context=example.com"),
Some("+49421219694")
);
assert_eq!(SipMessage::extract_uri_user("SIP:user@host"), Some("user"));
}
#[test]
@@ -535,7 +586,10 @@ mod tests {
);
assert_eq!(invite.method(), Some("INVITE"));
assert_eq!(invite.call_id(), "test-123");
assert!(invite.get_header("Via").unwrap().contains("192.168.1.1:5070"));
assert!(invite
.get_header("Via")
.unwrap()
.contains("192.168.1.1:5070"));
let response = SipMessage::create_response(
200,

View File

@@ -92,7 +92,11 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option<Endpoint>
.collect();
let original = match (orig_addr, orig_port) {
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }),
(Some(a), Some(p)) => Some(Endpoint {
address: a,
port: p,
codec_pt: None,
}),
_ => None,
};

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.23.0',
version: '1.24.0',
description: 'undefined'
}

View File

@@ -62,7 +62,10 @@ export interface ISipRouteMatch {
direction: 'inbound' | 'outbound';
/**
* Match the dialed/called number (To/Request-URI for inbound DID, dialed digits for outbound).
* Match the normalized called number.
*
* Inbound: matches the provider-delivered DID / Request-URI user part.
* Outbound: matches the normalized dialed digits.
* Supports: exact string, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/").
*/
numberPattern?: string;
@@ -89,13 +92,13 @@ export interface ISipRouteAction {
// --- Inbound actions (IVR / voicemail) ---
/** Route directly to a voicemail box (skip ringing devices). */
/** Voicemail fallback for matched inbound routes. */
voicemailBox?: string;
/** Route to an IVR menu by menu ID (skip ringing devices). */
ivrMenuId?: string;
/** Override no-answer timeout (seconds) before routing to voicemail. */
/** Reserved for future no-answer handling. */
noAnswerTimeout?: number;
// --- Outbound actions (provider selection) ---

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.23.0',
version: '1.24.0',
description: 'undefined'
}

View File

@@ -20,6 +20,9 @@ interface ISipRoute {
action: {
targets?: string[];
ringBrowsers?: boolean;
voicemailBox?: string;
ivrMenuId?: string;
noAnswerTimeout?: number;
provider?: string;
failoverProviders?: string[];
stripPrefix?: string;
@@ -40,10 +43,10 @@ export class SipproxyViewRoutes extends DeesElement {
`,
];
connectedCallback() {
super.connectedCallback();
appState.subscribe((_k, s) => { this.appData = s; });
this.loadConfig();
async connectedCallback(): Promise<void> {
await super.connectedCallback();
appState.subscribe((s) => { this.appData = s; });
await this.loadConfig();
}
private async loadConfig() {
@@ -157,9 +160,15 @@ export class SipproxyViewRoutes extends DeesElement {
return html`<span style="font-family:'JetBrains Mono',monospace;font-size:.82rem">${parts.join(' ')}</span>`;
} else {
const parts: string[] = [];
if (a.ivrMenuId) {
parts.push(`ivr: ${a.ivrMenuId}`);
} else {
if (a.targets?.length) parts.push(`ring: ${a.targets.join(', ')}`);
else parts.push('ring: all devices');
if (a.ringBrowsers) parts.push('+ browsers');
}
if (a.voicemailBox) parts.push(`vm: ${a.voicemailBox}`);
if (a.noAnswerTimeout) parts.push(`timeout: ${a.noAnswerTimeout}s`);
return html`<span style="font-family:'JetBrains Mono',monospace;font-size:.82rem">${parts.join(' ')}</span>`;
}
},
@@ -231,6 +240,8 @@ export class SipproxyViewRoutes extends DeesElement {
const cfg = this.config;
const providers = cfg?.providers || [];
const devices = cfg?.devices || [];
const voiceboxes = cfg?.voiceboxes || [];
const ivrMenus = cfg?.ivr?.menus || [];
const formData: ISipRoute = existing
? JSON.parse(JSON.stringify(existing))
@@ -284,7 +295,7 @@ export class SipproxyViewRoutes extends DeesElement {
<dees-input-text
.key=${'numberPattern'}
.label=${'Number Pattern'}
.description=${'Exact, prefix with * (e.g. +49*), or /regex/'}
.description=${'Inbound: DID/called number. Outbound: dialed number. Exact, prefix with * (e.g. +49*), or /regex/'}
.value=${formData.match.numberPattern || ''}
@input=${(e: Event) => { formData.match.numberPattern = (e.target as any).value || undefined; }}
></dees-input-text>
@@ -328,7 +339,7 @@ export class SipproxyViewRoutes extends DeesElement {
<dees-input-text
.key=${'targets'}
.label=${'Ring Devices (comma-separated IDs)'}
.description=${'Leave empty to ring all devices'}
.description=${'Leave empty to ring all devices for matched inbound numbers'}
.value=${(formData.action.targets || []).join(', ')}
@input=${(e: Event) => {
const v = (e.target as any).value.trim();
@@ -342,6 +353,30 @@ export class SipproxyViewRoutes extends DeesElement {
@newValue=${(e: CustomEvent) => { formData.action.ringBrowsers = e.detail; }}
></dees-input-checkbox>
<dees-input-dropdown
.key=${'voicemailBox'} .label=${'Voicemail Box (inbound fallback)'}
.selectedOption=${formData.action.voicemailBox
? { option: formData.action.voicemailBox, key: formData.action.voicemailBox }
: { option: '(none)', key: '' }}
.options=${[
{ option: '(none)', key: '' },
...voiceboxes.map((vb: any) => ({ option: vb.id, key: vb.id })),
]}
@selectedOption=${(e: CustomEvent) => { formData.action.voicemailBox = e.detail.key || undefined; }}
></dees-input-dropdown>
<dees-input-dropdown
.key=${'ivrMenuId'} .label=${'IVR Menu (inbound)'}
.selectedOption=${formData.action.ivrMenuId
? { option: formData.action.ivrMenuId, key: formData.action.ivrMenuId }
: { option: '(none)', key: '' }}
.options=${[
{ option: '(none)', key: '' },
...ivrMenus.map((menu: any) => ({ option: menu.name || menu.id, key: menu.id })),
]}
@selectedOption=${(e: CustomEvent) => { formData.action.ivrMenuId = e.detail.key || undefined; }}
></dees-input-dropdown>
<dees-input-text
.key=${'stripPrefix'} .label=${'Strip Prefix (outbound)'}
.value=${formData.action.stripPrefix || ''}
@@ -380,6 +415,9 @@ export class SipproxyViewRoutes extends DeesElement {
if (!formData.action.prependPrefix) delete formData.action.prependPrefix;
if (!formData.action.targets?.length) delete formData.action.targets;
if (!formData.action.ringBrowsers) delete formData.action.ringBrowsers;
if (!formData.action.voicemailBox) delete formData.action.voicemailBox;
if (!formData.action.ivrMenuId) delete formData.action.ivrMenuId;
if (!formData.action.noAnswerTimeout) delete formData.action.noAnswerTimeout;
const currentRoutes = [...(cfg?.routing?.routes || [])];
const idx = currentRoutes.findIndex((r: any) => r.id === formData.id);