Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 89ae12318e | |||
| feb3514de4 | |||
| adfc4726fd | |||
| 06c86d7e81 |
18
changelog.md
18
changelog.md
@@ -1,5 +1,23 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-14 - 1.25.0 - feat(proxy-engine)
|
||||
add live TTS streaming interactions and incoming number range support
|
||||
|
||||
- add a new start_tts_interaction command and bridge API to begin IVR or leg interactions before full TTS rendering completes
|
||||
- stream synthesized TTS chunks into the mixer with cancellation handling so prompts can stop cleanly on digit match, leg removal, or shutdown
|
||||
- extract PCM-to-mixer frame conversion for reusable live prompt processing
|
||||
- extend routing pattern matching to support numeric number ranges like start..end, including + prefixed values
|
||||
- add incomingNumbers config typing and frontend config update support for single, range, and regex number modes
|
||||
|
||||
## 2026-04-14 - 1.24.0 - feat(routing)
|
||||
require explicit inbound DID routes and normalize SIP identities for provider-based number matching
|
||||
|
||||
- Inbound route resolution now returns no match unless a configured inbound route explicitly matches the provider and called number.
|
||||
- Normalized routing identities were added for SIP/TEL URIs so inbound DIDs and outbound dialed numbers match consistently across provider-specific formats.
|
||||
- Call handling and incoming call events now use normalized numbers, improving routing accuracy for shared trunk providers.
|
||||
- Route configuration docs and the web route editor were updated to support explicit inbound DID ownership, voicemail fallback, and IVR selection.
|
||||
- Mixer RTP handling was enhanced to better support variable packet durations, timestamp-based gap fill, and non-blocking output drop reporting.
|
||||
|
||||
## 2026-04-14 - 1.23.0 - feat(runtime)
|
||||
refactor runtime state and proxy event handling for typed WebRTC linking and shared status models
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "siprouter",
|
||||
"version": "1.23.0",
|
||||
"version": "1.25.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
|
||||
37
readme.md
37
readme.md
@@ -148,24 +148,41 @@ Create `.nogit/config.json`:
|
||||
"routing": {
|
||||
"routes": [
|
||||
{
|
||||
"id": "inbound-default",
|
||||
"name": "Ring all devices",
|
||||
"priority": 100,
|
||||
"direction": "inbound",
|
||||
"match": {},
|
||||
"id": "inbound-main-did",
|
||||
"name": "Main DID",
|
||||
"priority": 200,
|
||||
"enabled": true,
|
||||
"match": {
|
||||
"direction": "inbound",
|
||||
"sourceProvider": "my-trunk",
|
||||
"numberPattern": "+49421219694"
|
||||
},
|
||||
"action": {
|
||||
"targets": ["desk-phone"],
|
||||
"ringBrowsers": true,
|
||||
"voicemailBox": "main",
|
||||
"noAnswerTimeout": 25
|
||||
"voicemailBox": "main"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "inbound-support-did",
|
||||
"name": "Support DID",
|
||||
"priority": 190,
|
||||
"enabled": true,
|
||||
"match": {
|
||||
"direction": "inbound",
|
||||
"sourceProvider": "my-trunk",
|
||||
"numberPattern": "+49421219695"
|
||||
},
|
||||
"action": {
|
||||
"ivrMenuId": "support-menu"
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": "outbound-default",
|
||||
"name": "Route via trunk",
|
||||
"priority": 100,
|
||||
"direction": "outbound",
|
||||
"match": {},
|
||||
"enabled": true,
|
||||
"match": { "direction": "outbound" },
|
||||
"action": { "provider": "my-trunk" }
|
||||
}
|
||||
]
|
||||
@@ -187,6 +204,8 @@ Create `.nogit/config.json`:
|
||||
}
|
||||
```
|
||||
|
||||
Inbound number ownership is explicit: add one inbound route per DID (or DID prefix) and scope it with `sourceProvider` when a provider delivers multiple external numbers.
|
||||
|
||||
### TTS Setup (Optional)
|
||||
|
||||
For neural voicemail greetings and IVR prompts, download the Kokoro TTS model:
|
||||
|
||||
@@ -115,9 +115,8 @@ pub struct TranscodeState {
|
||||
impl TranscodeState {
|
||||
/// Create a new transcoding session with fresh codec state.
|
||||
pub fn new() -> Result<Self, String> {
|
||||
let mut opus_enc =
|
||||
OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip)
|
||||
.map_err(|e| format!("opus encoder: {e}"))?;
|
||||
let mut opus_enc = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip)
|
||||
.map_err(|e| format!("opus encoder: {e}"))?;
|
||||
opus_enc
|
||||
.set_complexity(5)
|
||||
.map_err(|e| format!("opus set_complexity: {e}"))?;
|
||||
@@ -160,14 +159,9 @@ impl TranscodeState {
|
||||
let key = (from_rate, to_rate, canonical_chunk);
|
||||
|
||||
if !self.resamplers.contains_key(&key) {
|
||||
let r = FftFixedIn::<f64>::new(
|
||||
from_rate as usize,
|
||||
to_rate as usize,
|
||||
canonical_chunk,
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?;
|
||||
let r =
|
||||
FftFixedIn::<f64>::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1)
|
||||
.map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?;
|
||||
self.resamplers.insert(key, r);
|
||||
}
|
||||
let resampler = self.resamplers.get_mut(&key).unwrap();
|
||||
@@ -284,8 +278,7 @@ impl TranscodeState {
|
||||
match pt {
|
||||
PT_OPUS => {
|
||||
let mut pcm = vec![0i16; 5760]; // up to 120ms at 48kHz
|
||||
let packet =
|
||||
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
|
||||
let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
|
||||
let out =
|
||||
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
|
||||
let n: usize = self
|
||||
@@ -343,8 +336,7 @@ impl TranscodeState {
|
||||
match pt {
|
||||
PT_OPUS => {
|
||||
let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz
|
||||
let packet =
|
||||
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
|
||||
let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
|
||||
let out =
|
||||
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
|
||||
let n: usize = self
|
||||
@@ -368,8 +360,8 @@ impl TranscodeState {
|
||||
/// Returns f32 PCM at 48kHz. `frame_size` should be 960 for 20ms.
|
||||
pub fn opus_plc(&mut self, frame_size: usize) -> Result<Vec<f32>, String> {
|
||||
let mut pcm = vec![0.0f32; frame_size];
|
||||
let out = MutSignals::try_from(&mut pcm[..])
|
||||
.map_err(|e| format!("opus plc signals: {e}"))?;
|
||||
let out =
|
||||
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus plc signals: {e}"))?;
|
||||
let n: usize = self
|
||||
.opus_dec
|
||||
.decode_float(None::<OpusPacket<'_>>, out, false)
|
||||
@@ -425,14 +417,9 @@ impl TranscodeState {
|
||||
let key = (from_rate, to_rate, canonical_chunk);
|
||||
|
||||
if !self.resamplers_f32.contains_key(&key) {
|
||||
let r = FftFixedIn::<f32>::new(
|
||||
from_rate as usize,
|
||||
to_rate as usize,
|
||||
canonical_chunk,
|
||||
1,
|
||||
1,
|
||||
)
|
||||
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
|
||||
let r =
|
||||
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1)
|
||||
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
|
||||
self.resamplers_f32.insert(key, r);
|
||||
}
|
||||
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
|
||||
@@ -508,8 +495,10 @@ mod tests {
|
||||
let encoded = mulaw_encode(sample);
|
||||
let decoded = mulaw_decode(encoded);
|
||||
// µ-law is lossy; verify the decoded value is close.
|
||||
assert!((sample as i32 - decoded as i32).abs() < 1000,
|
||||
"µ-law roundtrip failed for {sample}: got {decoded}");
|
||||
assert!(
|
||||
(sample as i32 - decoded as i32).abs() < 1000,
|
||||
"µ-law roundtrip failed for {sample}: got {decoded}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -518,8 +507,10 @@ mod tests {
|
||||
for sample in [-32768i16, -1000, -1, 0, 1, 1000, 32767] {
|
||||
let encoded = alaw_encode(sample);
|
||||
let decoded = alaw_decode(encoded);
|
||||
assert!((sample as i32 - decoded as i32).abs() < 1000,
|
||||
"A-law roundtrip failed for {sample}: got {decoded}");
|
||||
assert!(
|
||||
(sample as i32 - decoded as i32).abs() < 1000,
|
||||
"A-law roundtrip failed for {sample}: got {decoded}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -543,7 +534,9 @@ mod tests {
|
||||
fn pcmu_to_pcma_roundtrip() {
|
||||
let mut st = TranscodeState::new().unwrap();
|
||||
// 160 bytes = 20ms of PCMU at 8kHz
|
||||
let pcmu_data: Vec<u8> = (0..160).map(|i| mulaw_encode((i as i16 * 200) - 16000)).collect();
|
||||
let pcmu_data: Vec<u8> = (0..160)
|
||||
.map(|i| mulaw_encode((i as i16 * 200) - 16000))
|
||||
.collect();
|
||||
let pcma = st.transcode(&pcmu_data, PT_PCMU, PT_PCMA, None).unwrap();
|
||||
assert_eq!(pcma.len(), 160); // Same frame size
|
||||
let back = st.transcode(&pcma, PT_PCMA, PT_PCMU, None).unwrap();
|
||||
|
||||
@@ -36,10 +36,7 @@ pub async fn play_wav_file(
|
||||
|
||||
// Read all samples as i16.
|
||||
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
|
||||
reader
|
||||
.samples::<i16>()
|
||||
.filter_map(|s| s.ok())
|
||||
.collect()
|
||||
reader.samples::<i16>().filter_map(|s| s.ok()).collect()
|
||||
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
|
||||
reader
|
||||
.samples::<f32>()
|
||||
@@ -199,10 +196,7 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
|
||||
.map(|s| s as f32 / 32768.0)
|
||||
.collect()
|
||||
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
|
||||
reader
|
||||
.samples::<f32>()
|
||||
.filter_map(|s| s.ok())
|
||||
.collect()
|
||||
reader.samples::<f32>().filter_map(|s| s.ok()).collect()
|
||||
} else {
|
||||
return Err(format!(
|
||||
"unsupported WAV format: {}bit {:?}",
|
||||
@@ -214,14 +208,23 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
pcm_to_mix_frames(&samples, wav_rate)
|
||||
}
|
||||
|
||||
/// Convert PCM samples at an arbitrary rate into 48kHz 20ms mixer frames.
|
||||
pub fn pcm_to_mix_frames(samples: &[f32], sample_rate: u32) -> Result<Vec<Vec<f32>>, String> {
|
||||
if samples.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// Resample to MIX_RATE (48kHz) if needed.
|
||||
let resampled = if wav_rate != MIX_RATE {
|
||||
let resampled = if sample_rate != MIX_RATE {
|
||||
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
|
||||
transcoder
|
||||
.resample_f32(&samples, wav_rate, MIX_RATE)
|
||||
.resample_f32(samples, sample_rate, MIX_RATE)
|
||||
.map_err(|e| format!("resample: {e}"))?
|
||||
} else {
|
||||
samples
|
||||
samples.to_vec()
|
||||
};
|
||||
|
||||
// Split into MIX_FRAME_SIZE (960) sample frames.
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
//! The mixer provides mix-minus audio to all participants.
|
||||
|
||||
use crate::call::{Call, CallDirection, CallState, LegId, LegInfo, LegKind, LegState};
|
||||
use crate::config::{AppConfig, ProviderConfig};
|
||||
use crate::config::{normalize_routing_identity, AppConfig, ProviderConfig};
|
||||
use crate::ipc::{emit_event, OutTx};
|
||||
use crate::leg_io::{create_leg_channels, spawn_sip_inbound, spawn_sip_outbound};
|
||||
use crate::mixer::spawn_mixer;
|
||||
@@ -13,7 +13,9 @@ use crate::registrar::Registrar;
|
||||
use crate::rtp::RtpPortPool;
|
||||
use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig};
|
||||
use crate::tts::TtsEngine;
|
||||
use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions};
|
||||
use sip_proto::helpers::{
|
||||
build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions,
|
||||
};
|
||||
use sip_proto::message::{ResponseOptions, SipMessage};
|
||||
use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri};
|
||||
use std::collections::HashMap;
|
||||
@@ -25,7 +27,7 @@ use tokio::sync::Mutex;
|
||||
|
||||
/// Result of creating an inbound call — carries both the call id and
|
||||
/// whether browsers should be notified (flows from the matched inbound
|
||||
/// route's `ring_browsers` flag, or the fallback default).
|
||||
/// route's `ring_browsers` flag).
|
||||
pub struct InboundCallCreated {
|
||||
pub call_id: String,
|
||||
pub ring_browsers: bool,
|
||||
@@ -214,15 +216,27 @@ impl CallManager {
|
||||
let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice);
|
||||
if let Some(dev) = device_leg {
|
||||
if let Some(dev_addr) = dev.signaling_addr {
|
||||
let ringing = SipMessage::create_response(180, "Ringing", device_invite, None);
|
||||
let ringing = SipMessage::create_response(
|
||||
180,
|
||||
"Ringing",
|
||||
device_invite,
|
||||
None,
|
||||
);
|
||||
let _ = socket.send_to(&ringing.serialize(), dev_addr).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }));
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"call_ringing",
|
||||
serde_json::json!({ "call_id": call_id }),
|
||||
);
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }),
|
||||
);
|
||||
}
|
||||
SipLegAction::ConnectedWithAck(ack_buf) => {
|
||||
let _ = socket.send_to(&ack_buf, target).await;
|
||||
@@ -245,16 +259,30 @@ impl CallManager {
|
||||
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
|
||||
spawn_sip_outbound(rtp_socket, remote_addr, channels.outbound_rx);
|
||||
if let Some(call) = self.calls.get(call_id) {
|
||||
call.add_leg_to_mixer(leg_id, sip_pt, channels.inbound_rx, channels.outbound_tx)
|
||||
.await;
|
||||
call.add_leg_to_mixer(
|
||||
leg_id,
|
||||
sip_pt,
|
||||
channels.inbound_rx,
|
||||
channels.outbound_tx,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// For device-originated calls: send 200 OK to device and wire device leg.
|
||||
if let Some(call) = self.calls.get(call_id) {
|
||||
if let Some(device_invite) = call.device_invite.clone() {
|
||||
let device_leg_info: Option<(SocketAddr, u16, Arc<UdpSocket>, Option<SocketAddr>, String)> =
|
||||
call.legs.values().find(|l| l.kind == LegKind::SipDevice).and_then(|dev| {
|
||||
let device_leg_info: Option<(
|
||||
SocketAddr,
|
||||
u16,
|
||||
Arc<UdpSocket>,
|
||||
Option<SocketAddr>,
|
||||
String,
|
||||
)> = call
|
||||
.legs
|
||||
.values()
|
||||
.find(|l| l.kind == LegKind::SipDevice)
|
||||
.and_then(|dev| {
|
||||
Some((
|
||||
dev.signaling_addr?,
|
||||
dev.rtp_port,
|
||||
@@ -264,11 +292,21 @@ impl CallManager {
|
||||
))
|
||||
});
|
||||
|
||||
if let Some((dev_addr, dev_rtp_port, dev_rtp_socket, dev_remote, dev_leg_id)) = device_leg_info {
|
||||
if let Some((
|
||||
dev_addr,
|
||||
dev_rtp_port,
|
||||
dev_rtp_socket,
|
||||
dev_remote,
|
||||
dev_leg_id,
|
||||
)) = device_leg_info
|
||||
{
|
||||
// Build SDP pointing device to our device_rtp port.
|
||||
// Use LAN IP for the device (it's on the local network).
|
||||
let call_ref = self.calls.get(call_id).unwrap();
|
||||
let prov_leg = call_ref.legs.values().find(|l| l.kind == LegKind::SipProvider);
|
||||
let prov_leg = call_ref
|
||||
.legs
|
||||
.values()
|
||||
.find(|l| l.kind == LegKind::SipProvider);
|
||||
let lan_ip_str = prov_leg
|
||||
.and_then(|l| l.sip_leg.as_ref())
|
||||
.map(|sl| sl.config.lan_ip.clone())
|
||||
@@ -280,13 +318,18 @@ impl CallManager {
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let ok = SipMessage::create_response(200, "OK", &device_invite, Some(ResponseOptions {
|
||||
to_tag: Some(generate_tag()),
|
||||
contact: Some(format!("<sip:{}:{}>", lan_ip_str, 5060)),
|
||||
body: Some(sdp),
|
||||
content_type: Some("application/sdp".to_string()),
|
||||
extra_headers: None,
|
||||
}));
|
||||
let ok = SipMessage::create_response(
|
||||
200,
|
||||
"OK",
|
||||
&device_invite,
|
||||
Some(ResponseOptions {
|
||||
to_tag: Some(generate_tag()),
|
||||
contact: Some(format!("<sip:{}:{}>", lan_ip_str, 5060)),
|
||||
body: Some(sdp),
|
||||
content_type: Some("application/sdp".to_string()),
|
||||
extra_headers: None,
|
||||
}),
|
||||
);
|
||||
let _ = socket.send_to(&ok.serialize(), dev_addr).await;
|
||||
|
||||
// Update device leg state.
|
||||
@@ -313,27 +356,47 @@ impl CallManager {
|
||||
if let Some(dev_remote_addr) = dev_remote {
|
||||
let dev_channels = create_leg_channels();
|
||||
spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx);
|
||||
spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx);
|
||||
spawn_sip_outbound(
|
||||
dev_rtp_socket,
|
||||
dev_remote_addr,
|
||||
dev_channels.outbound_rx,
|
||||
);
|
||||
if let Some(call) = self.calls.get(call_id) {
|
||||
call.add_leg_to_mixer(&dev_leg_id, dev_pt, dev_channels.inbound_rx, dev_channels.outbound_tx)
|
||||
.await;
|
||||
call.add_leg_to_mixer(
|
||||
&dev_leg_id,
|
||||
dev_pt,
|
||||
dev_channels.inbound_rx,
|
||||
dev_channels.outbound_tx,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
emit_event(&self.out_tx, "call_answered", serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"provider_media_addr": remote.map(|a| a.ip().to_string()),
|
||||
"provider_media_port": remote.map(|a| a.port()),
|
||||
"sip_pt": sip_pt,
|
||||
}));
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }));
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"call_answered",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"provider_media_addr": remote.map(|a| a.ip().to_string()),
|
||||
"provider_media_port": remote.map(|a| a.port()),
|
||||
"sip_pt": sip_pt,
|
||||
}),
|
||||
);
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }),
|
||||
);
|
||||
}
|
||||
SipLegAction::Terminated(reason) => {
|
||||
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
|
||||
let duration = self
|
||||
.calls
|
||||
.get(call_id)
|
||||
.map(|c| c.duration_secs())
|
||||
.unwrap_or(0);
|
||||
|
||||
// Notify device if this is a device-originated outbound call.
|
||||
if let Some(call) = self.calls.get(call_id) {
|
||||
@@ -343,7 +406,8 @@ impl CallManager {
|
||||
if let Some(dev_addr) = dev.signaling_addr {
|
||||
// Map reason to SIP response code.
|
||||
let code: u16 = if reason.starts_with("rejected_") {
|
||||
reason.strip_prefix("rejected_")
|
||||
reason
|
||||
.strip_prefix("rejected_")
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(503)
|
||||
} else if reason == "bye" {
|
||||
@@ -354,7 +418,12 @@ impl CallManager {
|
||||
503
|
||||
};
|
||||
if code > 0 && dev.state != LegState::Connected {
|
||||
let resp = SipMessage::create_response(code, "Service Unavailable", device_invite, None);
|
||||
let resp = SipMessage::create_response(
|
||||
code,
|
||||
"Service Unavailable",
|
||||
device_invite,
|
||||
None,
|
||||
);
|
||||
let _ = socket.send_to(&resp.serialize(), dev_addr).await;
|
||||
}
|
||||
}
|
||||
@@ -367,22 +436,38 @@ impl CallManager {
|
||||
leg.state = LegState::Terminated;
|
||||
}
|
||||
}
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
|
||||
emit_event(&self.out_tx, "call_ended",
|
||||
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }),
|
||||
);
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"call_ended",
|
||||
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }),
|
||||
);
|
||||
self.terminate_call(call_id).await;
|
||||
return true;
|
||||
}
|
||||
SipLegAction::SendAndTerminate(buf, reason) => {
|
||||
let _ = socket.send_to(&buf, from_addr).await;
|
||||
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
|
||||
emit_event(&self.out_tx, "call_ended",
|
||||
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
|
||||
let duration = self
|
||||
.calls
|
||||
.get(call_id)
|
||||
.map(|c| c.duration_secs())
|
||||
.unwrap_or(0);
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"call_ended",
|
||||
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }),
|
||||
);
|
||||
self.terminate_call(call_id).await;
|
||||
return true;
|
||||
}
|
||||
SipLegAction::AuthRetry { ack_407, invite_with_auth } => {
|
||||
SipLegAction::AuthRetry {
|
||||
ack_407,
|
||||
invite_with_auth,
|
||||
} => {
|
||||
if let Some(ack) = ack_407 {
|
||||
let _ = socket.send_to(&ack, target).await;
|
||||
}
|
||||
@@ -416,11 +501,21 @@ impl CallManager {
|
||||
let this_kind = this_leg.map(|l| l.kind).unwrap_or(LegKind::SipProvider);
|
||||
|
||||
// Find the counterpart leg.
|
||||
let other_leg = call.legs.values().find(|l| l.id != this_leg_id && l.state != LegState::Terminated);
|
||||
let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) = match other_leg {
|
||||
Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone(), l.kind, l.public_ip.clone()),
|
||||
None => return false,
|
||||
};
|
||||
let other_leg = call
|
||||
.legs
|
||||
.values()
|
||||
.find(|l| l.id != this_leg_id && l.state != LegState::Terminated);
|
||||
let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) =
|
||||
match other_leg {
|
||||
Some(l) => (
|
||||
l.signaling_addr,
|
||||
l.rtp_port,
|
||||
l.id.clone(),
|
||||
l.kind,
|
||||
l.public_ip.clone(),
|
||||
),
|
||||
None => return false,
|
||||
};
|
||||
let forward_to = match other_addr {
|
||||
Some(a) => a,
|
||||
None => return false,
|
||||
@@ -439,7 +534,9 @@ impl CallManager {
|
||||
};
|
||||
|
||||
// Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt).
|
||||
let other_has_sip_leg = call.legs.get(&other_leg_id)
|
||||
let other_has_sip_leg = call
|
||||
.legs
|
||||
.get(&other_leg_id)
|
||||
.map(|l| l.sip_leg.is_some())
|
||||
.unwrap_or(false);
|
||||
|
||||
@@ -473,7 +570,8 @@ impl CallManager {
|
||||
if let Some(other) = call.legs.get_mut(&other_leg_id) {
|
||||
if let Some(sip_leg) = &mut other.sip_leg {
|
||||
if let Some(hangup_buf) = sip_leg.build_hangup() {
|
||||
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
|
||||
let _ =
|
||||
socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -504,7 +602,8 @@ impl CallManager {
|
||||
if let Some(other) = call.legs.get_mut(&other_leg_id) {
|
||||
if let Some(sip_leg) = &mut other.sip_leg {
|
||||
if let Some(hangup_buf) = sip_leg.build_hangup() {
|
||||
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
|
||||
let _ =
|
||||
socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -541,13 +640,17 @@ impl CallManager {
|
||||
if this_kind == LegKind::SipProvider {
|
||||
// From provider → forward to device: rewrite request URI.
|
||||
if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) {
|
||||
let new_ruri = rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port());
|
||||
let new_ruri =
|
||||
rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port());
|
||||
fwd.set_request_uri(&new_ruri);
|
||||
}
|
||||
}
|
||||
if fwd.is_dialog_establishing() {
|
||||
// Record-Route must also be routable from the destination leg.
|
||||
fwd.prepend_header("Record-Route", &format!("<sip:{advertise_ip}:{lan_port};lr>"));
|
||||
fwd.prepend_header(
|
||||
"Record-Route",
|
||||
&format!("<sip:{advertise_ip}:{lan_port};lr>"),
|
||||
);
|
||||
}
|
||||
let _ = socket.send_to(&fwd.serialize(), forward_to).await;
|
||||
return true;
|
||||
@@ -572,13 +675,20 @@ impl CallManager {
|
||||
if code == 180 || code == 183 {
|
||||
if call.state == CallState::SettingUp {
|
||||
call.state = CallState::Ringing;
|
||||
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"call_ringing",
|
||||
serde_json::json!({ "call_id": call_id }),
|
||||
);
|
||||
}
|
||||
if let Some(leg) = call.legs.get_mut(this_leg_id) {
|
||||
leg.state = LegState::Ringing;
|
||||
}
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }),
|
||||
);
|
||||
} else if code >= 200 && code < 300 {
|
||||
let mut needs_wiring = false;
|
||||
if let Some(leg) = call.legs.get_mut(this_leg_id) {
|
||||
@@ -598,12 +708,19 @@ impl CallManager {
|
||||
needs_wiring = true;
|
||||
}
|
||||
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }),
|
||||
);
|
||||
|
||||
if call.state != CallState::Connected {
|
||||
call.state = CallState::Connected;
|
||||
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"call_answered",
|
||||
serde_json::json!({ "call_id": call_id }),
|
||||
);
|
||||
}
|
||||
|
||||
// Forward the response before wiring (drop call borrow).
|
||||
@@ -693,28 +810,27 @@ impl CallManager {
|
||||
|
||||
// Extract caller/callee info.
|
||||
let from_header = invite.get_header("From").unwrap_or("");
|
||||
let caller_number = SipMessage::extract_uri(from_header)
|
||||
.unwrap_or("Unknown")
|
||||
.to_string();
|
||||
let called_number = invite
|
||||
.request_uri()
|
||||
.and_then(|uri| SipMessage::extract_uri(uri))
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let caller_number = normalize_routing_identity(from_header);
|
||||
let called_number = normalize_routing_identity(invite.request_uri().unwrap_or(""));
|
||||
|
||||
// Resolve via the configured inbound routing table. This honors
|
||||
// user-defined routes from the UI (numberPattern, callerPattern,
|
||||
// sourceProvider, targets, ringBrowsers). If no route matches, the
|
||||
// fallback returns an empty `device_ids` and `ring_browsers: true`,
|
||||
// which preserves pre-routing behavior via the `resolve_first_device`
|
||||
// fallback below.
|
||||
// Resolve via the configured inbound routing table. The matched route
|
||||
// is the source of truth for which external numbers this provider is
|
||||
// allowed to deliver to us.
|
||||
//
|
||||
// TODO: Multi-target inbound fork is not yet implemented.
|
||||
// - `route.device_ids` beyond the first registered target are ignored.
|
||||
// - `ring_browsers` is informational only — browsers see a toast but
|
||||
// do not race the SIP device. First-to-answer-wins requires a
|
||||
// multi-leg fork + per-leg CANCEL, which is not built yet.
|
||||
let route = config.resolve_inbound_route(provider_id, &called_number, &caller_number);
|
||||
let route = match config.resolve_inbound_route(provider_id, &called_number, &caller_number)
|
||||
{
|
||||
Some(route) => route,
|
||||
None => {
|
||||
let resp = SipMessage::create_response(404, "Not Found", invite, None);
|
||||
let _ = socket.send_to(&resp.serialize(), from_addr).await;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let ring_browsers = route.ring_browsers;
|
||||
|
||||
// IVR routing: if the route targets an IVR menu, go there directly.
|
||||
@@ -724,12 +840,24 @@ impl CallManager {
|
||||
if let Some(menu) = ivr.menus.iter().find(|m| m.id == *ivr_menu_id) {
|
||||
let call_id = self
|
||||
.route_to_ivr(
|
||||
&call_id, invite, from_addr, &caller_number,
|
||||
provider_id, provider_config, config, rtp_pool, socket,
|
||||
public_ip, menu, &tts_engine,
|
||||
&call_id,
|
||||
invite,
|
||||
from_addr,
|
||||
&caller_number,
|
||||
provider_id,
|
||||
provider_config,
|
||||
config,
|
||||
rtp_pool,
|
||||
socket,
|
||||
public_ip,
|
||||
menu,
|
||||
&tts_engine,
|
||||
)
|
||||
.await?;
|
||||
return Some(InboundCallCreated { call_id, ring_browsers });
|
||||
return Some(InboundCallCreated {
|
||||
call_id,
|
||||
ring_browsers,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -748,19 +876,27 @@ impl CallManager {
|
||||
None => {
|
||||
// No device registered → voicemail.
|
||||
// Resolve greeting WAV on-demand (may trigger TTS generation).
|
||||
let greeting_wav = resolve_greeting_wav(
|
||||
config,
|
||||
route.voicemail_box.as_deref(),
|
||||
&tts_engine,
|
||||
).await;
|
||||
let greeting_wav =
|
||||
resolve_greeting_wav(config, route.voicemail_box.as_deref(), &tts_engine).await;
|
||||
let call_id = self
|
||||
.route_to_voicemail(
|
||||
&call_id, invite, from_addr, &caller_number,
|
||||
provider_id, provider_config, config, rtp_pool, socket, public_ip,
|
||||
&call_id,
|
||||
invite,
|
||||
from_addr,
|
||||
&caller_number,
|
||||
provider_id,
|
||||
provider_config,
|
||||
config,
|
||||
rtp_pool,
|
||||
socket,
|
||||
public_ip,
|
||||
greeting_wav,
|
||||
)
|
||||
.await?;
|
||||
return Some(InboundCallCreated { call_id, ring_browsers });
|
||||
return Some(InboundCallCreated {
|
||||
call_id,
|
||||
ring_browsers,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -855,8 +991,10 @@ impl CallManager {
|
||||
// Register SIP Call-ID → both legs (provider leg handles provider messages).
|
||||
// For passthrough, both legs share the same SIP Call-ID.
|
||||
// We route based on source address in route_passthrough_message.
|
||||
self.sip_index
|
||||
.insert(sip_call_id.clone(), (call_id.clone(), provider_leg_id.clone()));
|
||||
self.sip_index.insert(
|
||||
sip_call_id.clone(),
|
||||
(call_id.clone(), provider_leg_id.clone()),
|
||||
);
|
||||
|
||||
// Rewrite and forward INVITE to device.
|
||||
let mut fwd_invite = invite.clone();
|
||||
@@ -889,7 +1027,10 @@ impl CallManager {
|
||||
}
|
||||
}
|
||||
|
||||
Some(InboundCallCreated { call_id, ring_browsers })
|
||||
Some(InboundCallCreated {
|
||||
call_id,
|
||||
ring_browsers,
|
||||
})
|
||||
}
|
||||
|
||||
/// Initiate an outbound B2BUA call from the dashboard.
|
||||
@@ -938,7 +1079,9 @@ impl CallManager {
|
||||
|
||||
// Send INVITE.
|
||||
let to_uri = format!("sip:{number}@{}", provider_config.domain);
|
||||
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
|
||||
sip_leg
|
||||
.send_invite(registered_aor, &to_uri, &sip_call_id, socket)
|
||||
.await;
|
||||
|
||||
// Create call with mixer.
|
||||
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone());
|
||||
@@ -1109,7 +1252,9 @@ impl CallManager {
|
||||
|
||||
// Build proper To URI and send INVITE.
|
||||
let to_uri = format!("sip:{}@{}", dialed_number, provider_config.domain);
|
||||
sip_leg.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket).await;
|
||||
sip_leg
|
||||
.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket)
|
||||
.await;
|
||||
|
||||
call.legs.insert(
|
||||
provider_leg_id.clone(),
|
||||
@@ -1185,7 +1330,9 @@ impl CallManager {
|
||||
|
||||
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
|
||||
let to_uri = format!("sip:{number}@{}", provider_config.domain);
|
||||
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
|
||||
sip_leg
|
||||
.send_invite(registered_aor, &to_uri, &sip_call_id, socket)
|
||||
.await;
|
||||
|
||||
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
|
||||
|
||||
@@ -1256,9 +1403,16 @@ impl CallManager {
|
||||
};
|
||||
|
||||
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
|
||||
let to_uri = format!("sip:{}@{}:{}", device_id, device_addr.ip(), device_addr.port());
|
||||
let to_uri = format!(
|
||||
"sip:{}@{}:{}",
|
||||
device_id,
|
||||
device_addr.ip(),
|
||||
device_addr.port()
|
||||
);
|
||||
let from_uri = format!("sip:sipproxy@{lan_ip}:{lan_port}");
|
||||
sip_leg.send_invite(&from_uri, &to_uri, &sip_call_id, socket).await;
|
||||
sip_leg
|
||||
.send_invite(&from_uri, &to_uri, &sip_call_id, socket)
|
||||
.await;
|
||||
|
||||
let leg_info = LegInfo {
|
||||
id: leg_id.clone(),
|
||||
@@ -1292,12 +1446,7 @@ impl CallManager {
|
||||
}
|
||||
|
||||
/// Remove a leg from a call.
|
||||
pub async fn remove_leg(
|
||||
&mut self,
|
||||
call_id: &str,
|
||||
leg_id: &str,
|
||||
socket: &UdpSocket,
|
||||
) -> bool {
|
||||
pub async fn remove_leg(&mut self, call_id: &str, leg_id: &str, socket: &UdpSocket) -> bool {
|
||||
let call = match self.calls.get_mut(call_id) {
|
||||
Some(c) => c,
|
||||
None => return false,
|
||||
@@ -1310,7 +1459,9 @@ impl CallManager {
|
||||
if let Some(leg) = call.legs.get_mut(leg_id) {
|
||||
if let Some(sip_leg) = &mut leg.sip_leg {
|
||||
if let Some(hangup_bytes) = sip_leg.build_hangup() {
|
||||
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
|
||||
let _ = socket
|
||||
.send_to(&hangup_bytes, sip_leg.config.sip_target)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
leg.state = LegState::Terminated;
|
||||
@@ -1356,9 +1507,7 @@ impl CallManager {
|
||||
target_call_id: &str,
|
||||
) -> bool {
|
||||
// Validate both calls exist and the leg is in the source call.
|
||||
if !self.calls.contains_key(source_call_id)
|
||||
|| !self.calls.contains_key(target_call_id)
|
||||
{
|
||||
if !self.calls.contains_key(source_call_id) || !self.calls.contains_key(target_call_id) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -1503,7 +1652,9 @@ impl CallManager {
|
||||
}
|
||||
if let Some(sip_leg) = &mut leg.sip_leg {
|
||||
if let Some(hangup_bytes) = sip_leg.build_hangup() {
|
||||
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
|
||||
let _ = socket
|
||||
.send_to(&hangup_bytes, sip_leg.config.sip_target)
|
||||
.await;
|
||||
}
|
||||
} else if let Some(addr) = leg.signaling_addr {
|
||||
// Passthrough leg — send a simple BYE.
|
||||
@@ -1588,7 +1739,9 @@ impl CallManager {
|
||||
});
|
||||
|
||||
let response = SipMessage::create_response(
|
||||
200, "OK", invite,
|
||||
200,
|
||||
"OK",
|
||||
invite,
|
||||
Some(sip_proto::message::ResponseOptions {
|
||||
to_tag: Some(sip_proto::helpers::generate_tag()),
|
||||
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
|
||||
@@ -1665,9 +1818,15 @@ impl CallManager {
|
||||
let rtp_socket = rtp_alloc.socket;
|
||||
tokio::spawn(async move {
|
||||
crate::voicemail::run_voicemail_session(
|
||||
rtp_socket, provider_media, codec_pt,
|
||||
greeting_wav, recording_path, 120_000,
|
||||
call_id_owned, caller_owned, out_tx,
|
||||
rtp_socket,
|
||||
provider_media,
|
||||
codec_pt,
|
||||
greeting_wav,
|
||||
recording_path,
|
||||
120_000,
|
||||
call_id_owned,
|
||||
caller_owned,
|
||||
out_tx,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
@@ -1717,7 +1876,9 @@ impl CallManager {
|
||||
});
|
||||
|
||||
let response = SipMessage::create_response(
|
||||
200, "OK", invite,
|
||||
200,
|
||||
"OK",
|
||||
invite,
|
||||
Some(sip_proto::message::ResponseOptions {
|
||||
to_tag: Some(sip_proto::helpers::generate_tag()),
|
||||
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
|
||||
@@ -1781,24 +1942,23 @@ impl CallManager {
|
||||
}
|
||||
}
|
||||
|
||||
// Generate IVR prompt on-demand via TTS (cached).
|
||||
// Generate the IVR prompt as a live chunked TTS stream so playback can
|
||||
// start after the first chunk instead of waiting for a full WAV render.
|
||||
let voice = menu.prompt_voice.as_deref().unwrap_or("af_bella");
|
||||
let prompt_output = format!(".nogit/tts/ivr-menu-{}.wav", menu.id);
|
||||
let prompt_params = serde_json::json!({
|
||||
"model": ".nogit/tts/kokoro-v1.0.onnx",
|
||||
"voices": ".nogit/tts/voices.bin",
|
||||
"voice": voice,
|
||||
"text": &menu.prompt_text,
|
||||
"output": &prompt_output,
|
||||
"cacheable": true,
|
||||
});
|
||||
|
||||
let prompt_wav = {
|
||||
let live_prompt = {
|
||||
let mut tts = tts_engine.lock().await;
|
||||
match tts.generate(&prompt_params).await {
|
||||
Ok(_) => Some(prompt_output),
|
||||
match tts
|
||||
.start_live_prompt(crate::tts::TtsPromptRequest {
|
||||
model_path: crate::tts::DEFAULT_MODEL_PATH.to_string(),
|
||||
voices_path: crate::tts::DEFAULT_VOICES_PATH.to_string(),
|
||||
voice_name: voice.to_string(),
|
||||
text: menu.prompt_text.clone(),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(prompt) => Some(prompt),
|
||||
Err(e) => {
|
||||
eprintln!("[ivr] TTS generation failed: {e}");
|
||||
eprintln!("[ivr] live TTS setup failed: {e}");
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -1815,17 +1975,14 @@ impl CallManager {
|
||||
let timeout_ms = menu.timeout_sec.unwrap_or(5) * 1000;
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Load prompt PCM frames if available.
|
||||
let prompt_frames = prompt_wav.as_ref().and_then(|wav| {
|
||||
crate::audio_player::load_prompt_pcm_frames(wav).ok()
|
||||
});
|
||||
|
||||
if let Some(frames) = prompt_frames {
|
||||
if let Some(prompt) = live_prompt {
|
||||
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
|
||||
let _ = mixer_cmd_tx
|
||||
.send(crate::mixer::MixerCommand::StartInteraction {
|
||||
leg_id: provider_leg_id.clone(),
|
||||
prompt_pcm_frames: frames,
|
||||
prompt_pcm_frames: prompt.initial_frames,
|
||||
prompt_stream_rx: Some(prompt.stream_rx),
|
||||
prompt_cancel_tx: Some(prompt.cancel_tx),
|
||||
expected_digits: expected_digits.clone(),
|
||||
timeout_ms,
|
||||
result_tx,
|
||||
@@ -1884,7 +2041,11 @@ impl CallManager {
|
||||
// Internal helpers
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option<SocketAddr> {
|
||||
fn resolve_first_device(
|
||||
&self,
|
||||
config: &AppConfig,
|
||||
registrar: &Registrar,
|
||||
) -> Option<SocketAddr> {
|
||||
for device in &config.devices {
|
||||
if let Some(addr) = registrar.get_device_contact(&device.id) {
|
||||
return Some(addr);
|
||||
@@ -1907,8 +2068,7 @@ async fn resolve_greeting_wav(
|
||||
tts_engine: &Arc<Mutex<TtsEngine>>,
|
||||
) -> Option<String> {
|
||||
// 1. Look up voicebox config.
|
||||
let vb = voicebox_id
|
||||
.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled));
|
||||
let vb = voicebox_id.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled));
|
||||
|
||||
if let Some(vb) = vb {
|
||||
// 2. Pre-recorded WAV takes priority.
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
//! proxy engine via the `configure` command. These types mirror the TS interfaces.
|
||||
|
||||
use serde::Deserialize;
|
||||
use sip_proto::message::SipMessage;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
/// Network endpoint.
|
||||
@@ -227,8 +228,101 @@ pub struct IvrMenuEntry {
|
||||
// Pattern matching (ported from ts/config.ts)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Extract the URI user part and normalize phone-like identities for routing.
|
||||
///
|
||||
/// This keeps inbound route matching stable across provider-specific URI shapes,
|
||||
/// e.g. `sip:+49 421 219694@trunk.example` and `sip:0049421219694@trunk.example`
|
||||
/// both normalize to `+49421219694`.
|
||||
pub fn normalize_routing_identity(value: &str) -> String {
|
||||
let extracted = SipMessage::extract_uri_user(value).unwrap_or(value).trim();
|
||||
if extracted.is_empty() {
|
||||
return String::new();
|
||||
}
|
||||
|
||||
let mut digits = String::new();
|
||||
let mut saw_plus = false;
|
||||
|
||||
for (idx, ch) in extracted.chars().enumerate() {
|
||||
if ch.is_ascii_digit() {
|
||||
digits.push(ch);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ch == '+' && idx == 0 {
|
||||
saw_plus = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if matches!(ch, ' ' | '\t' | '-' | '.' | '/' | '(' | ')') {
|
||||
continue;
|
||||
}
|
||||
|
||||
return extracted.to_string();
|
||||
}
|
||||
|
||||
if digits.is_empty() {
|
||||
return extracted.to_string();
|
||||
}
|
||||
if saw_plus {
|
||||
return format!("+{digits}");
|
||||
}
|
||||
if digits.starts_with("00") && digits.len() > 2 {
|
||||
return format!("+{}", &digits[2..]);
|
||||
}
|
||||
|
||||
digits
|
||||
}
|
||||
|
||||
fn parse_numeric_range_value(value: &str) -> Option<(bool, &str)> {
|
||||
let trimmed = value.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let (has_plus, digits) = if let Some(rest) = trimmed.strip_prefix('+') {
|
||||
(true, rest)
|
||||
} else {
|
||||
(false, trimmed)
|
||||
};
|
||||
|
||||
if digits.is_empty() || !digits.chars().all(|c| c.is_ascii_digit()) {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some((has_plus, digits))
|
||||
}
|
||||
|
||||
fn matches_numeric_range_pattern(pattern: &str, value: &str) -> bool {
|
||||
let Some((start, end)) = pattern.split_once("..") else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let Some((start_plus, start_digits)) = parse_numeric_range_value(start) else {
|
||||
return false;
|
||||
};
|
||||
let Some((end_plus, end_digits)) = parse_numeric_range_value(end) else {
|
||||
return false;
|
||||
};
|
||||
let Some((value_plus, value_digits)) = parse_numeric_range_value(value) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
if start_plus != end_plus || value_plus != start_plus {
|
||||
return false;
|
||||
}
|
||||
if start_digits.len() != end_digits.len() || value_digits.len() != start_digits.len() {
|
||||
return false;
|
||||
}
|
||||
if start_digits > end_digits {
|
||||
return false;
|
||||
}
|
||||
|
||||
value_digits >= start_digits && value_digits <= end_digits
|
||||
}
|
||||
|
||||
/// Test a value against a pattern string.
|
||||
/// - None/empty: matches everything (wildcard)
|
||||
/// - `start..end`: numeric range match
|
||||
/// - Trailing '*': prefix match
|
||||
/// - Starts with '/': regex match
|
||||
/// - Otherwise: exact match
|
||||
@@ -244,6 +338,10 @@ pub fn matches_pattern(pattern: Option<&str>, value: &str) -> bool {
|
||||
return value.starts_with(&pattern[..pattern.len() - 1]);
|
||||
}
|
||||
|
||||
if matches_numeric_range_pattern(pattern, value) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Regex match: "/^\\+49/" or "/pattern/i"
|
||||
if pattern.starts_with('/') {
|
||||
if let Some(last_slash) = pattern[1..].rfind('/') {
|
||||
@@ -363,7 +461,7 @@ impl AppConfig {
|
||||
provider_id: &str,
|
||||
called_number: &str,
|
||||
caller_number: &str,
|
||||
) -> InboundRouteResult {
|
||||
) -> Option<InboundRouteResult> {
|
||||
let mut routes: Vec<&Route> = self
|
||||
.routing
|
||||
.routes
|
||||
@@ -387,22 +485,170 @@ impl AppConfig {
|
||||
continue;
|
||||
}
|
||||
|
||||
return InboundRouteResult {
|
||||
return Some(InboundRouteResult {
|
||||
device_ids: route.action.targets.clone().unwrap_or_default(),
|
||||
ring_browsers: route.action.ring_browsers.unwrap_or(false),
|
||||
voicemail_box: route.action.voicemail_box.clone(),
|
||||
ivr_menu_id: route.action.ivr_menu_id.clone(),
|
||||
no_answer_timeout: route.action.no_answer_timeout,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
// Fallback: ring all devices + browsers.
|
||||
InboundRouteResult {
|
||||
device_ids: vec![],
|
||||
ring_browsers: true,
|
||||
voicemail_box: None,
|
||||
ivr_menu_id: None,
|
||||
no_answer_timeout: None,
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn test_app_config(routes: Vec<Route>) -> AppConfig {
|
||||
AppConfig {
|
||||
proxy: ProxyConfig {
|
||||
lan_ip: "127.0.0.1".to_string(),
|
||||
lan_port: 5070,
|
||||
public_ip_seed: None,
|
||||
rtp_port_range: RtpPortRange {
|
||||
min: 20_000,
|
||||
max: 20_100,
|
||||
},
|
||||
},
|
||||
providers: vec![ProviderConfig {
|
||||
id: "provider-a".to_string(),
|
||||
display_name: "Provider A".to_string(),
|
||||
domain: "example.com".to_string(),
|
||||
outbound_proxy: Endpoint {
|
||||
address: "example.com".to_string(),
|
||||
port: 5060,
|
||||
},
|
||||
username: "user".to_string(),
|
||||
password: "pass".to_string(),
|
||||
register_interval_sec: 300,
|
||||
codecs: vec![9],
|
||||
quirks: Quirks {
|
||||
early_media_silence: false,
|
||||
silence_payload_type: None,
|
||||
silence_max_packets: None,
|
||||
},
|
||||
}],
|
||||
devices: vec![DeviceConfig {
|
||||
id: "desk".to_string(),
|
||||
display_name: "Desk".to_string(),
|
||||
expected_address: "127.0.0.1".to_string(),
|
||||
extension: "100".to_string(),
|
||||
}],
|
||||
routing: RoutingConfig { routes },
|
||||
voiceboxes: vec![],
|
||||
ivr: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normalize_routing_identity_extracts_uri_user_and_phone_number() {
|
||||
assert_eq!(
|
||||
normalize_routing_identity("sip:0049 421 219694@voip.easybell.de"),
|
||||
"+49421219694"
|
||||
);
|
||||
assert_eq!(
|
||||
normalize_routing_identity("<tel:+49 (421) 219694>"),
|
||||
"+49421219694"
|
||||
);
|
||||
assert_eq!(normalize_routing_identity("sip:100@pbx.local"), "100");
|
||||
assert_eq!(normalize_routing_identity("sip:alice@pbx.local"), "alice");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_inbound_route_requires_explicit_match() {
|
||||
let cfg = test_app_config(vec![]);
|
||||
assert!(cfg
|
||||
.resolve_inbound_route("provider-a", "+49421219694", "+491701234567")
|
||||
.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_inbound_route_matches_per_number_on_shared_provider() {
|
||||
let cfg = test_app_config(vec![
|
||||
Route {
|
||||
id: "main".to_string(),
|
||||
name: "Main DID".to_string(),
|
||||
priority: 200,
|
||||
enabled: true,
|
||||
match_criteria: RouteMatch {
|
||||
direction: "inbound".to_string(),
|
||||
number_pattern: Some("+49421219694".to_string()),
|
||||
caller_pattern: None,
|
||||
source_provider: Some("provider-a".to_string()),
|
||||
source_device: None,
|
||||
},
|
||||
action: RouteAction {
|
||||
targets: Some(vec!["desk".to_string()]),
|
||||
ring_browsers: Some(true),
|
||||
voicemail_box: None,
|
||||
ivr_menu_id: None,
|
||||
no_answer_timeout: None,
|
||||
provider: None,
|
||||
failover_providers: None,
|
||||
strip_prefix: None,
|
||||
prepend_prefix: None,
|
||||
},
|
||||
},
|
||||
Route {
|
||||
id: "support".to_string(),
|
||||
name: "Support DID".to_string(),
|
||||
priority: 100,
|
||||
enabled: true,
|
||||
match_criteria: RouteMatch {
|
||||
direction: "inbound".to_string(),
|
||||
number_pattern: Some("+49421219695".to_string()),
|
||||
caller_pattern: None,
|
||||
source_provider: Some("provider-a".to_string()),
|
||||
source_device: None,
|
||||
},
|
||||
action: RouteAction {
|
||||
targets: None,
|
||||
ring_browsers: Some(false),
|
||||
voicemail_box: Some("support-box".to_string()),
|
||||
ivr_menu_id: None,
|
||||
no_answer_timeout: Some(20),
|
||||
provider: None,
|
||||
failover_providers: None,
|
||||
strip_prefix: None,
|
||||
prepend_prefix: None,
|
||||
},
|
||||
},
|
||||
]);
|
||||
|
||||
let main = cfg
|
||||
.resolve_inbound_route("provider-a", "+49421219694", "+491701234567")
|
||||
.expect("main DID should match");
|
||||
assert_eq!(main.device_ids, vec!["desk".to_string()]);
|
||||
assert!(main.ring_browsers);
|
||||
|
||||
let support = cfg
|
||||
.resolve_inbound_route("provider-a", "+49421219695", "+491701234567")
|
||||
.expect("support DID should match");
|
||||
assert_eq!(support.voicemail_box.as_deref(), Some("support-box"));
|
||||
assert_eq!(support.no_answer_timeout, Some(20));
|
||||
assert!(!support.ring_browsers);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn matches_pattern_supports_numeric_ranges() {
|
||||
assert!(matches_pattern(
|
||||
Some("042116767546..042116767548"),
|
||||
"042116767547"
|
||||
));
|
||||
assert!(!matches_pattern(
|
||||
Some("042116767546..042116767548"),
|
||||
"042116767549"
|
||||
));
|
||||
assert!(matches_pattern(
|
||||
Some("+4942116767546..+4942116767548"),
|
||||
"+4942116767547"
|
||||
));
|
||||
assert!(!matches_pattern(
|
||||
Some("+4942116767546..+4942116767548"),
|
||||
"042116767547"
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,13 @@ pub struct Command {
|
||||
}
|
||||
|
||||
/// Send a response to a command.
|
||||
pub fn respond(tx: &OutTx, id: &str, success: bool, result: Option<serde_json::Value>, error: Option<&str>) {
|
||||
pub fn respond(
|
||||
tx: &OutTx,
|
||||
id: &str,
|
||||
success: bool,
|
||||
result: Option<serde_json::Value>,
|
||||
error: Option<&str>,
|
||||
) {
|
||||
let mut resp = serde_json::json!({ "id": id, "success": success });
|
||||
if let Some(r) = result {
|
||||
resp["result"] = r;
|
||||
|
||||
@@ -63,7 +63,8 @@ pub fn spawn_sip_inbound(
|
||||
if offset + 4 > n {
|
||||
continue; // Malformed: extension header truncated.
|
||||
}
|
||||
let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
|
||||
let ext_len =
|
||||
u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
|
||||
offset += 4 + ext_len * 4;
|
||||
}
|
||||
if offset >= n {
|
||||
@@ -74,7 +75,17 @@ pub fn spawn_sip_inbound(
|
||||
if payload.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() {
|
||||
if inbound_tx
|
||||
.send(RtpPacket {
|
||||
payload,
|
||||
payload_type: pt,
|
||||
marker,
|
||||
seq,
|
||||
timestamp,
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break; // Channel closed — leg removed.
|
||||
}
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -7,7 +7,8 @@
|
||||
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
|
||||
//!
|
||||
//! The mixer runs a 20ms tick loop:
|
||||
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
|
||||
//! 1. Drain inbound channels, reorder RTP, decode variable-duration packets to 48kHz,
|
||||
//! and queue them in per-leg PCM buffers
|
||||
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
|
||||
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
|
||||
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
|
||||
@@ -16,11 +17,12 @@
|
||||
|
||||
use crate::ipc::{emit_event, OutTx};
|
||||
use crate::jitter_buffer::{JitterBuffer, JitterResult};
|
||||
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
||||
use crate::rtp::{build_rtp_header, rtp_clock_increment, rtp_clock_rate};
|
||||
use crate::tts::TtsStreamMessage;
|
||||
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
|
||||
use nnnoiseless::DenoiseState;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::sync::{mpsc, oneshot, watch};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{self, Duration, MissedTickBehavior};
|
||||
|
||||
@@ -29,6 +31,12 @@ use tokio::time::{self, Duration, MissedTickBehavior};
|
||||
const MIX_RATE: u32 = 48000;
|
||||
/// Samples per 20ms frame at the mixing rate.
|
||||
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
|
||||
/// Safety cap for how much timestamp-derived gap fill we synthesize at once.
|
||||
const MAX_GAP_FILL_SAMPLES: usize = MIX_FRAME_SIZE * 6; // 120ms
|
||||
/// Bound how many decode / concealment steps a leg can consume in one tick.
|
||||
const MAX_PACKET_STEPS_PER_TICK: usize = 24;
|
||||
/// Report the first output drop immediately, then every N drops.
|
||||
const DROP_REPORT_INTERVAL: u64 = 50;
|
||||
|
||||
/// A raw RTP payload received from a leg (no RTP header).
|
||||
pub struct RtpPacket {
|
||||
@@ -39,10 +47,6 @@ pub struct RtpPacket {
|
||||
/// RTP sequence number for reordering.
|
||||
pub seq: u16,
|
||||
/// RTP timestamp from the original packet header.
|
||||
///
|
||||
/// Set on inbound RTP but not yet consumed downstream — reserved for
|
||||
/// future jitter/sync work in the mixer.
|
||||
#[allow(dead_code)]
|
||||
pub timestamp: u32,
|
||||
}
|
||||
|
||||
@@ -61,6 +65,12 @@ enum LegRole {
|
||||
struct IsolationState {
|
||||
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
|
||||
prompt_frames: VecDeque<Vec<f32>>,
|
||||
/// Live TTS frames arrive here while playback is already in progress.
|
||||
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
|
||||
/// Cancels the background TTS producer when the interaction ends early.
|
||||
prompt_cancel_tx: Option<watch::Sender<bool>>,
|
||||
/// Whether the live prompt stream has ended.
|
||||
prompt_stream_finished: bool,
|
||||
/// Digits that complete the interaction (e.g., ['1', '2']).
|
||||
expected_digits: Vec<char>,
|
||||
/// Ticks remaining before timeout (decremented each tick after prompt ends).
|
||||
@@ -109,6 +119,7 @@ struct ToolLegSlot {
|
||||
#[allow(dead_code)]
|
||||
tool_type: ToolType,
|
||||
audio_tx: mpsc::Sender<ToolAudioBatch>,
|
||||
dropped_batches: u64,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -136,6 +147,10 @@ pub enum MixerCommand {
|
||||
leg_id: String,
|
||||
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
|
||||
prompt_pcm_frames: Vec<Vec<f32>>,
|
||||
/// Optional live prompt stream. Frames are appended as they are synthesized.
|
||||
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
|
||||
/// Optional cancellation handle for the live prompt stream.
|
||||
prompt_cancel_tx: Option<watch::Sender<bool>>,
|
||||
expected_digits: Vec<char>,
|
||||
timeout_ms: u32,
|
||||
result_tx: oneshot::Sender<InteractionResult>,
|
||||
@@ -163,8 +178,15 @@ struct MixerLegSlot {
|
||||
denoiser: Box<DenoiseState<'static>>,
|
||||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
/// Decoded PCM waiting for playout. Variable-duration RTP packets are
|
||||
/// decoded into this FIFO; the mixer consumes exactly one 20ms frame per tick.
|
||||
pcm_buffer: VecDeque<f32>,
|
||||
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
|
||||
last_pcm_frame: Vec<f32>,
|
||||
/// Next RTP timestamp expected from the inbound stream.
|
||||
expected_rtp_timestamp: Option<u32>,
|
||||
/// Best-effort estimate of packet duration in RTP clock units.
|
||||
estimated_packet_ts: u32,
|
||||
/// Number of consecutive ticks with no inbound packet.
|
||||
silent_ticks: u32,
|
||||
/// Per-leg jitter buffer for packet reordering and timing.
|
||||
@@ -173,15 +195,302 @@ struct MixerLegSlot {
|
||||
rtp_seq: u16,
|
||||
rtp_ts: u32,
|
||||
rtp_ssrc: u32,
|
||||
/// Dropped outbound frames for this leg (queue full / closed).
|
||||
outbound_drops: u64,
|
||||
/// Current role of this leg in the mixer.
|
||||
role: LegRole,
|
||||
}
|
||||
|
||||
fn mix_samples_to_rtp_ts(codec_pt: u8, mix_samples: usize) -> u32 {
|
||||
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
|
||||
(((mix_samples as u64 * clock_rate) + (MIX_RATE as u64 / 2)) / MIX_RATE as u64) as u32
|
||||
}
|
||||
|
||||
fn rtp_ts_to_mix_samples(codec_pt: u8, rtp_ts: u32) -> usize {
|
||||
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
|
||||
(((rtp_ts as u64 * MIX_RATE as u64) + (clock_rate / 2)) / clock_rate) as usize
|
||||
}
|
||||
|
||||
fn is_forward_rtp_delta(delta: u32) -> bool {
|
||||
delta > 0 && delta < 0x8000_0000
|
||||
}
|
||||
|
||||
fn should_emit_drop_event(total_drops: u64) -> bool {
|
||||
total_drops == 1 || total_drops % DROP_REPORT_INTERVAL == 0
|
||||
}
|
||||
|
||||
fn emit_output_drop_event(
|
||||
out_tx: &OutTx,
|
||||
call_id: &str,
|
||||
leg_id: Option<&str>,
|
||||
tool_leg_id: Option<&str>,
|
||||
stream: &str,
|
||||
reason: &str,
|
||||
total_drops: u64,
|
||||
) {
|
||||
if !should_emit_drop_event(total_drops) {
|
||||
return;
|
||||
}
|
||||
|
||||
emit_event(
|
||||
out_tx,
|
||||
"mixer_output_drop",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"leg_id": leg_id,
|
||||
"tool_leg_id": tool_leg_id,
|
||||
"stream": stream,
|
||||
"reason": reason,
|
||||
"total_drops": total_drops,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
fn fade_concealment_from_last_frame(slot: &mut MixerLegSlot, samples: usize, decay: f32) {
|
||||
let mut template = if slot.last_pcm_frame.is_empty() {
|
||||
vec![0.0f32; MIX_FRAME_SIZE]
|
||||
} else {
|
||||
slot.last_pcm_frame.clone()
|
||||
};
|
||||
|
||||
let mut remaining = samples;
|
||||
while remaining > 0 {
|
||||
for sample in &mut template {
|
||||
*sample *= decay;
|
||||
}
|
||||
let take = remaining.min(template.len());
|
||||
slot.pcm_buffer.extend(template.iter().take(take).copied());
|
||||
remaining -= take;
|
||||
}
|
||||
}
|
||||
|
||||
fn append_packet_loss_concealment(slot: &mut MixerLegSlot, samples: usize) {
|
||||
let mut remaining = samples.max(1);
|
||||
while remaining > 0 {
|
||||
let chunk = remaining.min(MIX_FRAME_SIZE);
|
||||
if slot.codec_pt == codec_lib::PT_OPUS {
|
||||
match slot.transcoder.opus_plc(chunk) {
|
||||
Ok(mut pcm) => {
|
||||
pcm.resize(chunk, 0.0);
|
||||
slot.pcm_buffer.extend(pcm);
|
||||
}
|
||||
Err(_) => fade_concealment_from_last_frame(slot, chunk, 0.8),
|
||||
}
|
||||
} else {
|
||||
fade_concealment_from_last_frame(slot, chunk, 0.85);
|
||||
}
|
||||
remaining -= chunk;
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_packet_to_mix_pcm(slot: &mut MixerLegSlot, pkt: &RtpPacket) -> Option<Vec<f32>> {
|
||||
let (pcm, rate) = slot
|
||||
.transcoder
|
||||
.decode_to_f32(&pkt.payload, pkt.payload_type)
|
||||
.ok()?;
|
||||
|
||||
let pcm_48k = if rate == MIX_RATE {
|
||||
pcm
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample_f32(&pcm, rate, MIX_RATE)
|
||||
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
|
||||
};
|
||||
|
||||
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
|
||||
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
|
||||
} else {
|
||||
pcm_48k
|
||||
};
|
||||
|
||||
Some(processed)
|
||||
}
|
||||
|
||||
fn queue_inbound_packet(slot: &mut MixerLegSlot, pkt: RtpPacket) {
|
||||
if let Some(pcm_48k) = decode_packet_to_mix_pcm(slot, &pkt) {
|
||||
if pcm_48k.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(expected_ts) = slot.expected_rtp_timestamp {
|
||||
let gap_ts = pkt.timestamp.wrapping_sub(expected_ts);
|
||||
if is_forward_rtp_delta(gap_ts) {
|
||||
let gap_samples = rtp_ts_to_mix_samples(slot.codec_pt, gap_ts);
|
||||
if gap_samples <= MAX_GAP_FILL_SAMPLES {
|
||||
append_packet_loss_concealment(slot, gap_samples);
|
||||
} else {
|
||||
slot.pcm_buffer.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let packet_ts = mix_samples_to_rtp_ts(slot.codec_pt, pcm_48k.len());
|
||||
if packet_ts > 0 {
|
||||
slot.estimated_packet_ts = packet_ts;
|
||||
slot.expected_rtp_timestamp = Some(pkt.timestamp.wrapping_add(packet_ts));
|
||||
}
|
||||
slot.pcm_buffer.extend(pcm_48k);
|
||||
}
|
||||
}
|
||||
|
||||
fn fill_leg_playout_buffer(slot: &mut MixerLegSlot) {
|
||||
let mut steps = 0usize;
|
||||
while slot.pcm_buffer.len() < MIX_FRAME_SIZE && steps < MAX_PACKET_STEPS_PER_TICK {
|
||||
steps += 1;
|
||||
match slot.jitter.consume() {
|
||||
JitterResult::Packet(pkt) => queue_inbound_packet(slot, pkt),
|
||||
JitterResult::Missing => {
|
||||
let conceal_ts = slot
|
||||
.estimated_packet_ts
|
||||
.max(rtp_clock_increment(slot.codec_pt));
|
||||
let conceal_samples =
|
||||
rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts).clamp(1, MAX_GAP_FILL_SAMPLES);
|
||||
append_packet_loss_concealment(slot, conceal_samples);
|
||||
if let Some(expected_ts) = slot.expected_rtp_timestamp {
|
||||
slot.expected_rtp_timestamp = Some(expected_ts.wrapping_add(conceal_ts));
|
||||
}
|
||||
}
|
||||
JitterResult::Filling => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn take_mix_frame(slot: &mut MixerLegSlot) -> Vec<f32> {
|
||||
let mut frame = Vec::with_capacity(MIX_FRAME_SIZE);
|
||||
while frame.len() < MIX_FRAME_SIZE {
|
||||
if let Some(sample) = slot.pcm_buffer.pop_front() {
|
||||
frame.push(sample);
|
||||
} else {
|
||||
frame.push(0.0);
|
||||
}
|
||||
}
|
||||
frame
|
||||
}
|
||||
|
||||
fn soft_limit_sample(sample: f32) -> f32 {
|
||||
const KNEE: f32 = 0.85;
|
||||
|
||||
let abs = sample.abs();
|
||||
if abs <= KNEE {
|
||||
sample
|
||||
} else {
|
||||
let excess = abs - KNEE;
|
||||
let compressed = KNEE + (excess / (1.0 + (excess / (1.0 - KNEE))));
|
||||
sample.signum() * compressed.min(1.0)
|
||||
}
|
||||
}
|
||||
|
||||
fn try_send_leg_output(
|
||||
out_tx: &OutTx,
|
||||
call_id: &str,
|
||||
leg_id: &str,
|
||||
slot: &mut MixerLegSlot,
|
||||
rtp: Vec<u8>,
|
||||
stream: &str,
|
||||
) {
|
||||
let reason = match slot.outbound_tx.try_send(rtp) {
|
||||
Ok(()) => return,
|
||||
Err(mpsc::error::TrySendError::Full(_)) => "full",
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
|
||||
};
|
||||
|
||||
slot.outbound_drops += 1;
|
||||
emit_output_drop_event(
|
||||
out_tx,
|
||||
call_id,
|
||||
Some(leg_id),
|
||||
None,
|
||||
stream,
|
||||
reason,
|
||||
slot.outbound_drops,
|
||||
);
|
||||
}
|
||||
|
||||
fn try_send_tool_output(
|
||||
out_tx: &OutTx,
|
||||
call_id: &str,
|
||||
tool_leg_id: &str,
|
||||
tool: &mut ToolLegSlot,
|
||||
batch: ToolAudioBatch,
|
||||
) {
|
||||
let reason = match tool.audio_tx.try_send(batch) {
|
||||
Ok(()) => return,
|
||||
Err(mpsc::error::TrySendError::Full(_)) => "full",
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
|
||||
};
|
||||
|
||||
tool.dropped_batches += 1;
|
||||
emit_output_drop_event(
|
||||
out_tx,
|
||||
call_id,
|
||||
None,
|
||||
Some(tool_leg_id),
|
||||
"tool-batch",
|
||||
reason,
|
||||
tool.dropped_batches,
|
||||
);
|
||||
}
|
||||
|
||||
fn cancel_prompt_producer(state: &mut IsolationState) {
|
||||
if let Some(cancel_tx) = state.prompt_cancel_tx.take() {
|
||||
let _ = cancel_tx.send(true);
|
||||
}
|
||||
}
|
||||
|
||||
fn cancel_isolated_interaction(state: &mut IsolationState) {
|
||||
cancel_prompt_producer(state);
|
||||
if let Some(tx) = state.result_tx.take() {
|
||||
let _ = tx.send(InteractionResult::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
fn drain_prompt_stream(
|
||||
out_tx: &OutTx,
|
||||
call_id: &str,
|
||||
leg_id: &str,
|
||||
state: &mut IsolationState,
|
||||
) {
|
||||
loop {
|
||||
let Some(mut stream_rx) = state.prompt_stream_rx.take() else {
|
||||
return;
|
||||
};
|
||||
|
||||
match stream_rx.try_recv() {
|
||||
Ok(TtsStreamMessage::Frames(frames)) => {
|
||||
state.prompt_frames.extend(frames);
|
||||
state.prompt_stream_rx = Some(stream_rx);
|
||||
}
|
||||
Ok(TtsStreamMessage::Finished) => {
|
||||
state.prompt_stream_finished = true;
|
||||
return;
|
||||
}
|
||||
Ok(TtsStreamMessage::Failed(error)) => {
|
||||
emit_event(
|
||||
out_tx,
|
||||
"mixer_error",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"leg_id": leg_id,
|
||||
"error": format!("tts stream failed: {error}"),
|
||||
}),
|
||||
);
|
||||
state.prompt_stream_finished = true;
|
||||
return;
|
||||
}
|
||||
Err(mpsc::error::TryRecvError::Empty) => {
|
||||
state.prompt_stream_rx = Some(stream_rx);
|
||||
return;
|
||||
}
|
||||
Err(mpsc::error::TryRecvError::Disconnected) => {
|
||||
state.prompt_stream_finished = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the mixer task for a call. Returns the command sender and task handle.
|
||||
pub fn spawn_mixer(
|
||||
call_id: String,
|
||||
out_tx: OutTx,
|
||||
) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
|
||||
pub fn spawn_mixer(call_id: String, out_tx: OutTx) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
|
||||
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
@@ -192,11 +501,7 @@ pub fn spawn_mixer(
|
||||
}
|
||||
|
||||
/// The 20ms mixing loop.
|
||||
async fn mixer_loop(
|
||||
call_id: String,
|
||||
mut cmd_rx: mpsc::Receiver<MixerCommand>,
|
||||
out_tx: OutTx,
|
||||
) {
|
||||
async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, out_tx: OutTx) {
|
||||
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
|
||||
let mut tool_legs: HashMap<String, ToolLegSlot> = HashMap::new();
|
||||
let mut interval = time::interval(Duration::from_millis(20));
|
||||
@@ -237,11 +542,15 @@ async fn mixer_loop(
|
||||
denoiser: new_denoiser(),
|
||||
inbound_rx,
|
||||
outbound_tx,
|
||||
pcm_buffer: VecDeque::new(),
|
||||
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
|
||||
expected_rtp_timestamp: None,
|
||||
estimated_packet_ts: rtp_clock_increment(codec_pt),
|
||||
silent_ticks: 0,
|
||||
rtp_seq: 0,
|
||||
rtp_ts: 0,
|
||||
rtp_ssrc: rand::random(),
|
||||
outbound_drops: 0,
|
||||
role: LegRole::Participant,
|
||||
jitter: JitterBuffer::new(),
|
||||
},
|
||||
@@ -251,9 +560,7 @@ async fn mixer_loop(
|
||||
// If the leg is isolated, send Cancelled before dropping.
|
||||
if let Some(slot) = legs.get_mut(&leg_id) {
|
||||
if let LegRole::Isolated(ref mut state) = slot.role {
|
||||
if let Some(tx) = state.result_tx.take() {
|
||||
let _ = tx.send(InteractionResult::Cancelled);
|
||||
}
|
||||
cancel_isolated_interaction(state);
|
||||
}
|
||||
}
|
||||
legs.remove(&leg_id);
|
||||
@@ -263,9 +570,7 @@ async fn mixer_loop(
|
||||
// Cancel all outstanding interactions before shutting down.
|
||||
for slot in legs.values_mut() {
|
||||
if let LegRole::Isolated(ref mut state) = slot.role {
|
||||
if let Some(tx) = state.result_tx.take() {
|
||||
let _ = tx.send(InteractionResult::Cancelled);
|
||||
}
|
||||
cancel_isolated_interaction(state);
|
||||
}
|
||||
}
|
||||
return;
|
||||
@@ -273,6 +578,8 @@ async fn mixer_loop(
|
||||
Ok(MixerCommand::StartInteraction {
|
||||
leg_id,
|
||||
prompt_pcm_frames,
|
||||
prompt_stream_rx,
|
||||
prompt_cancel_tx,
|
||||
expected_digits,
|
||||
timeout_ms,
|
||||
result_tx,
|
||||
@@ -280,13 +587,14 @@ async fn mixer_loop(
|
||||
if let Some(slot) = legs.get_mut(&leg_id) {
|
||||
// Cancel any existing interaction first.
|
||||
if let LegRole::Isolated(ref mut old_state) = slot.role {
|
||||
if let Some(tx) = old_state.result_tx.take() {
|
||||
let _ = tx.send(InteractionResult::Cancelled);
|
||||
}
|
||||
cancel_isolated_interaction(old_state);
|
||||
}
|
||||
let timeout_ticks = timeout_ms / 20;
|
||||
slot.role = LegRole::Isolated(IsolationState {
|
||||
prompt_frames: VecDeque::from(prompt_pcm_frames),
|
||||
prompt_stream_rx,
|
||||
prompt_cancel_tx,
|
||||
prompt_stream_finished: false,
|
||||
expected_digits,
|
||||
timeout_ticks_remaining: timeout_ticks,
|
||||
prompt_done: false,
|
||||
@@ -294,6 +602,9 @@ async fn mixer_loop(
|
||||
});
|
||||
} else {
|
||||
// Leg not found — immediately cancel.
|
||||
if let Some(cancel_tx) = prompt_cancel_tx {
|
||||
let _ = cancel_tx.send(true);
|
||||
}
|
||||
let _ = result_tx.send(InteractionResult::Cancelled);
|
||||
}
|
||||
}
|
||||
@@ -302,7 +613,14 @@ async fn mixer_loop(
|
||||
tool_type,
|
||||
audio_tx,
|
||||
}) => {
|
||||
tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx });
|
||||
tool_legs.insert(
|
||||
leg_id,
|
||||
ToolLegSlot {
|
||||
tool_type,
|
||||
audio_tx,
|
||||
dropped_batches: 0,
|
||||
},
|
||||
);
|
||||
}
|
||||
Ok(MixerCommand::RemoveToolLeg { leg_id }) => {
|
||||
tool_legs.remove(&leg_id);
|
||||
@@ -343,54 +661,11 @@ async fn mixer_loop(
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2b: Consume exactly one frame from the jitter buffer.
|
||||
match slot.jitter.consume() {
|
||||
JitterResult::Packet(pkt) => {
|
||||
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
|
||||
Ok((pcm, rate)) => {
|
||||
let pcm_48k = if rate == MIX_RATE {
|
||||
pcm
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample_f32(&pcm, rate, MIX_RATE)
|
||||
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
|
||||
};
|
||||
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
|
||||
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
|
||||
} else {
|
||||
pcm_48k
|
||||
};
|
||||
let mut frame = processed;
|
||||
frame.resize(MIX_FRAME_SIZE, 0.0);
|
||||
slot.last_pcm_frame = frame;
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
JitterResult::Missing => {
|
||||
// Invoke Opus PLC or fade for non-Opus codecs.
|
||||
if slot.codec_pt == codec_lib::PT_OPUS {
|
||||
match slot.transcoder.opus_plc(MIX_FRAME_SIZE) {
|
||||
Ok(pcm) => {
|
||||
slot.last_pcm_frame = pcm;
|
||||
}
|
||||
Err(_) => {
|
||||
for s in slot.last_pcm_frame.iter_mut() {
|
||||
*s *= 0.8;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Non-Opus: fade last frame toward silence.
|
||||
for s in slot.last_pcm_frame.iter_mut() {
|
||||
*s *= 0.85;
|
||||
}
|
||||
}
|
||||
}
|
||||
JitterResult::Filling => {
|
||||
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||||
}
|
||||
}
|
||||
// Step 2b: Decode enough RTP to cover one 20ms playout frame.
|
||||
// Variable-duration packets (10ms, 20ms, 60ms, ...) accumulate in
|
||||
// the per-leg PCM FIFO; we pop exactly one 20ms frame below.
|
||||
fill_leg_playout_buffer(slot);
|
||||
slot.last_pcm_frame = take_mix_frame(slot);
|
||||
|
||||
// Run jitter adaptation + prune stale packets.
|
||||
slot.jitter.adapt();
|
||||
@@ -404,6 +679,9 @@ async fn mixer_loop(
|
||||
}
|
||||
if slot.silent_ticks > 150 {
|
||||
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||||
slot.pcm_buffer.clear();
|
||||
slot.expected_rtp_timestamp = None;
|
||||
slot.estimated_packet_ts = rtp_clock_increment(slot.codec_pt);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -426,12 +704,12 @@ async fn mixer_loop(
|
||||
for (lid, slot) in legs.iter_mut() {
|
||||
match &mut slot.role {
|
||||
LegRole::Participant => {
|
||||
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
|
||||
// Mix-minus: total minus this leg's own contribution.
|
||||
// Apply a light soft limiter instead of hard clipping the sum.
|
||||
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
|
||||
for i in 0..MIX_FRAME_SIZE {
|
||||
let sample =
|
||||
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
|
||||
mix_minus.push(sample.clamp(-1.0, 1.0));
|
||||
let sample = (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
|
||||
mix_minus.push(soft_limit_sample(sample));
|
||||
}
|
||||
|
||||
// Resample from 48kHz to the leg's codec native rate.
|
||||
@@ -445,11 +723,10 @@ async fn mixer_loop(
|
||||
};
|
||||
|
||||
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
|
||||
let encoded =
|
||||
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
|
||||
Ok(e) if !e.is_empty() => e,
|
||||
_ => continue,
|
||||
};
|
||||
let encoded = match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
|
||||
Ok(e) if !e.is_empty() => e,
|
||||
_ => continue,
|
||||
};
|
||||
|
||||
// Build RTP packet with header.
|
||||
let header =
|
||||
@@ -460,10 +737,11 @@ async fn mixer_loop(
|
||||
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
|
||||
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
|
||||
|
||||
// Non-blocking send — drop frame if channel is full.
|
||||
let _ = slot.outbound_tx.try_send(rtp);
|
||||
try_send_leg_output(&out_tx, &call_id, lid, slot, rtp, "participant-audio");
|
||||
}
|
||||
LegRole::Isolated(state) => {
|
||||
drain_prompt_stream(&out_tx, &call_id, lid, state);
|
||||
|
||||
// Check for DTMF digit from this leg.
|
||||
let mut matched_digit: Option<char> = None;
|
||||
for (src_lid, dtmf_pkt) in &dtmf_forward {
|
||||
@@ -487,12 +765,14 @@ async fn mixer_loop(
|
||||
|
||||
if let Some(digit) = matched_digit {
|
||||
// Interaction complete — digit matched.
|
||||
completed_interactions
|
||||
.push((lid.clone(), InteractionResult::Digit(digit)));
|
||||
completed_interactions.push((lid.clone(), InteractionResult::Digit(digit)));
|
||||
} else {
|
||||
// Play prompt frame or silence.
|
||||
// Play prompt frame, wait for live TTS, or move to timeout once the
|
||||
// prompt stream has fully drained.
|
||||
let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() {
|
||||
frame
|
||||
} else if !state.prompt_stream_finished {
|
||||
vec![0.0f32; MIX_FRAME_SIZE]
|
||||
} else {
|
||||
state.prompt_done = true;
|
||||
vec![0.0f32; MIX_FRAME_SIZE]
|
||||
@@ -508,6 +788,7 @@ async fn mixer_loop(
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
let mut prompt_rtp: Option<Vec<u8>> = None;
|
||||
if let Ok(encoded) =
|
||||
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
|
||||
{
|
||||
@@ -521,10 +802,9 @@ async fn mixer_loop(
|
||||
let mut rtp = header.to_vec();
|
||||
rtp.extend_from_slice(&encoded);
|
||||
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
|
||||
slot.rtp_ts = slot
|
||||
.rtp_ts
|
||||
.wrapping_add(rtp_clock_increment(slot.codec_pt));
|
||||
let _ = slot.outbound_tx.try_send(rtp);
|
||||
slot.rtp_ts =
|
||||
slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
|
||||
prompt_rtp = Some(rtp);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -537,6 +817,17 @@ async fn mixer_loop(
|
||||
state.timeout_ticks_remaining -= 1;
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(rtp) = prompt_rtp {
|
||||
try_send_leg_output(
|
||||
&out_tx,
|
||||
&call_id,
|
||||
lid,
|
||||
slot,
|
||||
rtp,
|
||||
"isolated-prompt",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -546,6 +837,7 @@ async fn mixer_loop(
|
||||
for (lid, result) in completed_interactions {
|
||||
if let Some(slot) = legs.get_mut(&lid) {
|
||||
if let LegRole::Isolated(ref mut state) = slot.role {
|
||||
cancel_prompt_producer(state);
|
||||
if let Some(tx) = state.result_tx.take() {
|
||||
let _ = tx.send(result);
|
||||
}
|
||||
@@ -566,7 +858,7 @@ async fn mixer_loop(
|
||||
})
|
||||
.collect();
|
||||
|
||||
for tool in tool_legs.values() {
|
||||
for (tool_leg_id, tool) in tool_legs.iter_mut() {
|
||||
let batch = ToolAudioBatch {
|
||||
sources: sources
|
||||
.iter()
|
||||
@@ -576,8 +868,7 @@ async fn mixer_loop(
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
// Non-blocking send — drop batch if tool can't keep up.
|
||||
let _ = tool.audio_tx.try_send(batch);
|
||||
try_send_tool_output(&out_tx, &call_id, tool_leg_id, tool, batch);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -610,7 +901,7 @@ async fn mixer_loop(
|
||||
rtp_out.extend_from_slice(&dtmf_pkt.payload);
|
||||
target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1);
|
||||
// Don't increment rtp_ts for DTMF — it shares timestamp context with audio.
|
||||
let _ = target_slot.outbound_tx.try_send(rtp_out);
|
||||
try_send_leg_output(&out_tx, &call_id, target_lid, target_slot, rtp_out, "dtmf");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -267,11 +267,7 @@ impl ProviderManager {
|
||||
|
||||
/// Try to handle a SIP response as a provider registration response.
|
||||
/// Returns true if consumed.
|
||||
pub async fn handle_response(
|
||||
&self,
|
||||
msg: &SipMessage,
|
||||
socket: &UdpSocket,
|
||||
) -> bool {
|
||||
pub async fn handle_response(&self, msg: &SipMessage, socket: &UdpSocket) -> bool {
|
||||
for ps_arc in &self.providers {
|
||||
let mut ps = ps_arc.lock().await;
|
||||
let was_registered = ps.is_registered;
|
||||
@@ -322,7 +318,10 @@ impl ProviderManager {
|
||||
}
|
||||
|
||||
/// Find a provider by its config ID (e.g. "easybell").
|
||||
pub async fn find_by_provider_id(&self, provider_id: &str) -> Option<Arc<Mutex<ProviderState>>> {
|
||||
pub async fn find_by_provider_id(
|
||||
&self,
|
||||
provider_id: &str,
|
||||
) -> Option<Arc<Mutex<ProviderState>>> {
|
||||
for ps_arc in &self.providers {
|
||||
let ps = ps_arc.lock().await;
|
||||
if ps.config.id == provider_id {
|
||||
|
||||
@@ -25,8 +25,7 @@ impl Recorder {
|
||||
) -> Result<Self, String> {
|
||||
// Ensure parent directory exists.
|
||||
if let Some(parent) = Path::new(file_path).parent() {
|
||||
std::fs::create_dir_all(parent)
|
||||
.map_err(|e| format!("create dir: {e}"))?;
|
||||
std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?;
|
||||
}
|
||||
|
||||
let sample_rate = 8000u32; // Record at 8kHz (standard telephony)
|
||||
@@ -57,10 +56,13 @@ impl Recorder {
|
||||
|
||||
/// Create a recorder that writes raw PCM at a given sample rate.
|
||||
/// Used by tool legs that already have decoded PCM (no RTP processing needed).
|
||||
pub fn new_pcm(file_path: &str, sample_rate: u32, max_duration_ms: Option<u64>) -> Result<Self, String> {
|
||||
pub fn new_pcm(
|
||||
file_path: &str,
|
||||
sample_rate: u32,
|
||||
max_duration_ms: Option<u64>,
|
||||
) -> Result<Self, String> {
|
||||
if let Some(parent) = Path::new(file_path).parent() {
|
||||
std::fs::create_dir_all(parent)
|
||||
.map_err(|e| format!("create dir: {e}"))?;
|
||||
std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?;
|
||||
}
|
||||
|
||||
let spec = hound::WavSpec {
|
||||
|
||||
@@ -60,18 +60,17 @@ impl Registrar {
|
||||
|
||||
/// Try to handle a SIP REGISTER from a device.
|
||||
/// Returns Some(response_bytes) if handled, None if not a known device.
|
||||
pub fn handle_register(
|
||||
&mut self,
|
||||
msg: &SipMessage,
|
||||
from_addr: SocketAddr,
|
||||
) -> Option<Vec<u8>> {
|
||||
pub fn handle_register(&mut self, msg: &SipMessage, from_addr: SocketAddr) -> Option<Vec<u8>> {
|
||||
if msg.method() != Some("REGISTER") {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Find the device by matching the source IP against expectedAddress.
|
||||
let from_ip = from_addr.ip().to_string();
|
||||
let device = self.devices.iter().find(|d| d.expected_address == from_ip)?;
|
||||
let device = self
|
||||
.devices
|
||||
.iter()
|
||||
.find(|d| d.expected_address == from_ip)?;
|
||||
|
||||
let from_header = msg.get_header("From").unwrap_or("");
|
||||
let aor = SipMessage::extract_uri(from_header)
|
||||
@@ -79,9 +78,7 @@ impl Registrar {
|
||||
.unwrap_or_else(|| format!("sip:{}@{}", device.extension, from_ip));
|
||||
|
||||
let expires_header = msg.get_header("Expires");
|
||||
let requested: u32 = expires_header
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(3600);
|
||||
let requested: u32 = expires_header.and_then(|s| s.parse().ok()).unwrap_or(3600);
|
||||
let expires = requested.min(MAX_EXPIRES);
|
||||
|
||||
let entry = RegisteredDevice {
|
||||
@@ -122,10 +119,7 @@ impl Registrar {
|
||||
Some(ResponseOptions {
|
||||
to_tag: Some(generate_tag()),
|
||||
contact: Some(contact),
|
||||
extra_headers: Some(vec![(
|
||||
"Expires".to_string(),
|
||||
expires.to_string(),
|
||||
)]),
|
||||
extra_headers: Some(vec![("Expires".to_string(), expires.to_string())]),
|
||||
..Default::default()
|
||||
}),
|
||||
);
|
||||
@@ -145,8 +139,8 @@ impl Registrar {
|
||||
/// Find a registered device by its source IP address.
|
||||
pub fn find_by_address(&self, addr: &SocketAddr) -> Option<&RegisteredDevice> {
|
||||
let ip = addr.ip().to_string();
|
||||
self.registered.values().find(|e| {
|
||||
e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at
|
||||
})
|
||||
self.registered
|
||||
.values()
|
||||
.find(|e| e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,10 +82,15 @@ pub fn build_rtp_header(pt: u8, seq: u16, timestamp: u32, ssrc: u32) -> [u8; 12]
|
||||
|
||||
/// Get the RTP clock increment per 20ms frame for a payload type.
|
||||
pub fn rtp_clock_increment(pt: u8) -> u32 {
|
||||
rtp_clock_rate(pt) / 50
|
||||
}
|
||||
|
||||
/// Get the RTP clock rate for a payload type.
|
||||
pub fn rtp_clock_rate(pt: u8) -> u32 {
|
||||
match pt {
|
||||
9 => 160, // G.722: 8000 Hz clock rate (despite 16kHz audio) × 0.02s
|
||||
0 | 8 => 160, // PCMU/PCMA: 8000 × 0.02
|
||||
111 => 960, // Opus: 48000 × 0.02
|
||||
_ => 160,
|
||||
9 => 8000, // G.722 uses an 8kHz RTP clock despite 16kHz audio.
|
||||
0 | 8 => 8000, // PCMU/PCMA
|
||||
111 => 48000, // Opus
|
||||
_ => 8000,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -128,17 +128,24 @@ impl SipLeg {
|
||||
max_forwards: Some(70),
|
||||
body: Some(sdp),
|
||||
content_type: Some("application/sdp".to_string()),
|
||||
extra_headers: Some(vec![
|
||||
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
|
||||
]),
|
||||
extra_headers: Some(vec![(
|
||||
"User-Agent".to_string(),
|
||||
"SipRouter/1.0".to_string(),
|
||||
)]),
|
||||
},
|
||||
);
|
||||
|
||||
self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port));
|
||||
self.dialog = Some(SipDialog::from_uac_invite(
|
||||
&invite,
|
||||
ip,
|
||||
self.config.lan_port,
|
||||
));
|
||||
self.invite = Some(invite.clone());
|
||||
self.state = LegState::Inviting;
|
||||
|
||||
let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await;
|
||||
let _ = socket
|
||||
.send_to(&invite.serialize(), self.config.sip_target)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Handle an incoming SIP message routed to this leg.
|
||||
@@ -443,10 +450,7 @@ pub enum SipLegAction {
|
||||
/// Build an ACK for a non-2xx response (same transaction as the INVITE).
|
||||
fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage {
|
||||
let via = original_invite.get_header("Via").unwrap_or("").to_string();
|
||||
let from = original_invite
|
||||
.get_header("From")
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
let from = original_invite.get_header("From").unwrap_or("").to_string();
|
||||
let to = response.get_header("To").unwrap_or("").to_string();
|
||||
let call_id = original_invite.call_id().to_string();
|
||||
let cseq_num: u32 = original_invite
|
||||
|
||||
@@ -28,10 +28,8 @@ impl SipTransport {
|
||||
}
|
||||
|
||||
/// Spawn the UDP receive loop. Calls the handler for every received packet.
|
||||
pub fn spawn_receiver<F>(
|
||||
&self,
|
||||
handler: F,
|
||||
) where
|
||||
pub fn spawn_receiver<F>(&self, handler: F)
|
||||
where
|
||||
F: Fn(&[u8], SocketAddr) + Send + 'static,
|
||||
{
|
||||
let socket = self.socket.clone();
|
||||
|
||||
@@ -51,7 +51,8 @@ pub fn spawn_recording_tool(
|
||||
});
|
||||
|
||||
// Convert f32 [-1.0, 1.0] to i16 for WAV writing.
|
||||
let pcm_i16: Vec<i16> = source.pcm_48k
|
||||
let pcm_i16: Vec<i16> = source
|
||||
.pcm_48k
|
||||
.iter()
|
||||
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
|
||||
.collect();
|
||||
|
||||
@@ -9,12 +9,41 @@
|
||||
//! Callers never need to check for cached files — that is entirely this module's
|
||||
//! responsibility.
|
||||
|
||||
use crate::audio_player::pcm_to_mix_frames;
|
||||
use kokoro_tts::{KokoroTts, Voice};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
pub const DEFAULT_MODEL_PATH: &str = ".nogit/tts/kokoro-v1.0.onnx";
|
||||
pub const DEFAULT_VOICES_PATH: &str = ".nogit/tts/voices.bin";
|
||||
const TTS_OUTPUT_RATE: u32 = 24000;
|
||||
const MAX_CHUNK_CHARS: usize = 220;
|
||||
const MIN_CHUNK_CHARS: usize = 80;
|
||||
|
||||
pub enum TtsStreamMessage {
|
||||
Frames(Vec<Vec<f32>>),
|
||||
Finished,
|
||||
Failed(String),
|
||||
}
|
||||
|
||||
pub struct TtsLivePrompt {
|
||||
pub initial_frames: Vec<Vec<f32>>,
|
||||
pub stream_rx: mpsc::Receiver<TtsStreamMessage>,
|
||||
pub cancel_tx: watch::Sender<bool>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TtsPromptRequest {
|
||||
pub model_path: String,
|
||||
pub voices_path: String,
|
||||
pub voice_name: String,
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
/// Wraps the Kokoro TTS engine with lazy model loading.
|
||||
pub struct TtsEngine {
|
||||
tts: Option<KokoroTts>,
|
||||
tts: Option<Arc<KokoroTts>>,
|
||||
/// Path that was used to load the current model (for cache invalidation).
|
||||
loaded_model_path: String,
|
||||
loaded_voices_path: String,
|
||||
@@ -29,6 +58,69 @@ impl TtsEngine {
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_loaded(
|
||||
&mut self,
|
||||
model_path: &str,
|
||||
voices_path: &str,
|
||||
) -> Result<Arc<KokoroTts>, String> {
|
||||
if !Path::new(model_path).exists() {
|
||||
return Err(format!("model not found: {model_path}"));
|
||||
}
|
||||
if !Path::new(voices_path).exists() {
|
||||
return Err(format!("voices not found: {voices_path}"));
|
||||
}
|
||||
|
||||
if self.tts.is_none()
|
||||
|| self.loaded_model_path != model_path
|
||||
|| self.loaded_voices_path != voices_path
|
||||
{
|
||||
eprintln!("[tts] loading model: {model_path}");
|
||||
let tts = Arc::new(
|
||||
KokoroTts::new(model_path, voices_path)
|
||||
.await
|
||||
.map_err(|e| format!("model load failed: {e:?}"))?,
|
||||
);
|
||||
self.tts = Some(tts);
|
||||
self.loaded_model_path = model_path.to_string();
|
||||
self.loaded_voices_path = voices_path.to_string();
|
||||
}
|
||||
|
||||
Ok(self.tts.as_ref().unwrap().clone())
|
||||
}
|
||||
|
||||
pub async fn start_live_prompt(
|
||||
&mut self,
|
||||
request: TtsPromptRequest,
|
||||
) -> Result<TtsLivePrompt, String> {
|
||||
if request.text.trim().is_empty() {
|
||||
return Err("empty text".into());
|
||||
}
|
||||
|
||||
let tts = self
|
||||
.ensure_loaded(&request.model_path, &request.voices_path)
|
||||
.await?;
|
||||
let voice = select_voice(&request.voice_name);
|
||||
let chunks = chunk_text(&request.text);
|
||||
if chunks.is_empty() {
|
||||
return Err("empty text".into());
|
||||
}
|
||||
|
||||
let initial_frames = synth_text_to_mix_frames(&tts, chunks[0].as_str(), voice).await?;
|
||||
let remaining_chunks: Vec<String> = chunks.into_iter().skip(1).collect();
|
||||
let (stream_tx, stream_rx) = mpsc::channel(8);
|
||||
let (cancel_tx, cancel_rx) = watch::channel(false);
|
||||
|
||||
tokio::spawn(async move {
|
||||
stream_live_prompt_chunks(tts, voice, remaining_chunks, stream_tx, cancel_rx).await;
|
||||
});
|
||||
|
||||
Ok(TtsLivePrompt {
|
||||
initial_frames,
|
||||
stream_rx,
|
||||
cancel_tx,
|
||||
})
|
||||
}
|
||||
|
||||
/// Generate a WAV file from text.
|
||||
///
|
||||
/// Params (from IPC JSON):
|
||||
@@ -39,18 +131,33 @@ impl TtsEngine {
|
||||
/// - `output`: output WAV file path
|
||||
/// - `cacheable`: if true, skip synthesis when the output WAV already
|
||||
/// matches the same text+voice (checked via a `.meta` sidecar file)
|
||||
pub async fn generate(&mut self, params: &serde_json::Value) -> Result<serde_json::Value, String> {
|
||||
let model_path = params.get("model").and_then(|v| v.as_str())
|
||||
pub async fn generate(
|
||||
&mut self,
|
||||
params: &serde_json::Value,
|
||||
) -> Result<serde_json::Value, String> {
|
||||
let model_path = params
|
||||
.get("model")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or("missing 'model' param")?;
|
||||
let voices_path = params.get("voices").and_then(|v| v.as_str())
|
||||
let voices_path = params
|
||||
.get("voices")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or("missing 'voices' param")?;
|
||||
let voice_name = params.get("voice").and_then(|v| v.as_str())
|
||||
let voice_name = params
|
||||
.get("voice")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("af_bella");
|
||||
let text = params.get("text").and_then(|v| v.as_str())
|
||||
let text = params
|
||||
.get("text")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or("missing 'text' param")?;
|
||||
let output_path = params.get("output").and_then(|v| v.as_str())
|
||||
let output_path = params
|
||||
.get("output")
|
||||
.and_then(|v| v.as_str())
|
||||
.ok_or("missing 'output' param")?;
|
||||
let cacheable = params.get("cacheable").and_then(|v| v.as_bool())
|
||||
let cacheable = params
|
||||
.get("cacheable")
|
||||
.and_then(|v| v.as_bool())
|
||||
.unwrap_or(false);
|
||||
|
||||
if text.is_empty() {
|
||||
@@ -63,41 +170,23 @@ impl TtsEngine {
|
||||
return Ok(serde_json::json!({ "output": output_path }));
|
||||
}
|
||||
|
||||
// Check that model/voices files exist.
|
||||
if !Path::new(model_path).exists() {
|
||||
return Err(format!("model not found: {model_path}"));
|
||||
}
|
||||
if !Path::new(voices_path).exists() {
|
||||
return Err(format!("voices not found: {voices_path}"));
|
||||
}
|
||||
|
||||
// Ensure parent directory exists.
|
||||
if let Some(parent) = Path::new(output_path).parent() {
|
||||
let _ = std::fs::create_dir_all(parent);
|
||||
}
|
||||
|
||||
// Lazy-load or reload if paths changed.
|
||||
if self.tts.is_none()
|
||||
|| self.loaded_model_path != model_path
|
||||
|| self.loaded_voices_path != voices_path
|
||||
{
|
||||
eprintln!("[tts] loading model: {model_path}");
|
||||
let tts = KokoroTts::new(model_path, voices_path)
|
||||
.await
|
||||
.map_err(|e| format!("model load failed: {e:?}"))?;
|
||||
self.tts = Some(tts);
|
||||
self.loaded_model_path = model_path.to_string();
|
||||
self.loaded_voices_path = voices_path.to_string();
|
||||
}
|
||||
|
||||
let tts = self.tts.as_ref().unwrap();
|
||||
let tts = self.ensure_loaded(model_path, voices_path).await?;
|
||||
let voice = select_voice(voice_name);
|
||||
|
||||
eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\"");
|
||||
let (samples, duration) = tts.synth(text, voice)
|
||||
eprintln!("[tts] synthesizing WAV voice '{voice_name}' to {output_path}");
|
||||
let (samples, duration) = tts
|
||||
.synth(text, voice)
|
||||
.await
|
||||
.map_err(|e| format!("synthesis failed: {e:?}"))?;
|
||||
eprintln!("[tts] synthesized {} samples in {duration:?}", samples.len());
|
||||
eprintln!(
|
||||
"[tts] synthesized {} samples in {duration:?}",
|
||||
samples.len()
|
||||
);
|
||||
|
||||
// Write 24kHz 16-bit mono WAV.
|
||||
let spec = hound::WavSpec {
|
||||
@@ -111,9 +200,13 @@ impl TtsEngine {
|
||||
.map_err(|e| format!("WAV create failed: {e}"))?;
|
||||
for &sample in &samples {
|
||||
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
|
||||
writer.write_sample(s16).map_err(|e| format!("WAV write: {e}"))?;
|
||||
writer
|
||||
.write_sample(s16)
|
||||
.map_err(|e| format!("WAV write: {e}"))?;
|
||||
}
|
||||
writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?;
|
||||
writer
|
||||
.finalize()
|
||||
.map_err(|e| format!("WAV finalize: {e}"))?;
|
||||
|
||||
// Write sidecar for future cache checks.
|
||||
if cacheable {
|
||||
@@ -152,6 +245,106 @@ impl TtsEngine {
|
||||
}
|
||||
}
|
||||
|
||||
async fn synth_text_to_mix_frames(
|
||||
tts: &Arc<KokoroTts>,
|
||||
text: &str,
|
||||
voice: Voice,
|
||||
) -> Result<Vec<Vec<f32>>, String> {
|
||||
let (samples, duration) = tts
|
||||
.synth(text, voice)
|
||||
.await
|
||||
.map_err(|e| format!("synthesis failed: {e:?}"))?;
|
||||
eprintln!(
|
||||
"[tts] synthesized chunk ({} chars, {} samples) in {duration:?}",
|
||||
text.chars().count(),
|
||||
samples.len()
|
||||
);
|
||||
pcm_to_mix_frames(&samples, TTS_OUTPUT_RATE)
|
||||
}
|
||||
|
||||
async fn stream_live_prompt_chunks(
|
||||
tts: Arc<KokoroTts>,
|
||||
voice: Voice,
|
||||
chunks: Vec<String>,
|
||||
stream_tx: mpsc::Sender<TtsStreamMessage>,
|
||||
mut cancel_rx: watch::Receiver<bool>,
|
||||
) {
|
||||
for chunk in chunks {
|
||||
if *cancel_rx.borrow() {
|
||||
break;
|
||||
}
|
||||
|
||||
match synth_text_to_mix_frames(&tts, &chunk, voice).await {
|
||||
Ok(frames) => {
|
||||
if *cancel_rx.borrow() {
|
||||
break;
|
||||
}
|
||||
if stream_tx.send(TtsStreamMessage::Frames(frames)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
let _ = stream_tx.send(TtsStreamMessage::Failed(error)).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if cancel_rx.has_changed().unwrap_or(false) && *cancel_rx.borrow_and_update() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let _ = stream_tx.send(TtsStreamMessage::Finished).await;
|
||||
}
|
||||
|
||||
fn chunk_text(text: &str) -> Vec<String> {
|
||||
let mut chunks = Vec::new();
|
||||
let mut current = String::new();
|
||||
|
||||
for ch in text.chars() {
|
||||
current.push(ch);
|
||||
|
||||
let len = current.chars().count();
|
||||
let hard_split = len >= MAX_CHUNK_CHARS && (ch.is_whitespace() || is_soft_boundary(ch));
|
||||
let natural_split = len >= MIN_CHUNK_CHARS && is_sentence_boundary(ch);
|
||||
|
||||
if natural_split || hard_split {
|
||||
push_chunk(&mut chunks, &mut current);
|
||||
}
|
||||
}
|
||||
|
||||
push_chunk(&mut chunks, &mut current);
|
||||
|
||||
if chunks.len() >= 2 {
|
||||
let last_len = chunks.last().unwrap().chars().count();
|
||||
if last_len < (MIN_CHUNK_CHARS / 2) {
|
||||
let tail = chunks.pop().unwrap();
|
||||
if let Some(prev) = chunks.last_mut() {
|
||||
prev.push(' ');
|
||||
prev.push_str(tail.trim());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
chunks
|
||||
}
|
||||
|
||||
fn push_chunk(chunks: &mut Vec<String>, current: &mut String) {
|
||||
let trimmed = current.trim();
|
||||
if !trimmed.is_empty() {
|
||||
chunks.push(trimmed.to_string());
|
||||
}
|
||||
current.clear();
|
||||
}
|
||||
|
||||
fn is_sentence_boundary(ch: char) -> bool {
|
||||
matches!(ch, '.' | '!' | '?' | '\n' | ';' | ':')
|
||||
}
|
||||
|
||||
fn is_soft_boundary(ch: char) -> bool {
|
||||
matches!(ch, ',' | ';' | ':' | ')' | ']' | '\n')
|
||||
}
|
||||
|
||||
/// Map voice name string to Kokoro Voice enum variant.
|
||||
fn select_voice(name: &str) -> Voice {
|
||||
match name {
|
||||
|
||||
@@ -128,8 +128,8 @@ async fn record_from_socket(
|
||||
break; // Max duration reached.
|
||||
}
|
||||
}
|
||||
Ok(Err(_)) => break, // Socket error (closed).
|
||||
Err(_) => break, // Timeout (max duration + grace).
|
||||
Ok(Err(_)) => break, // Socket error (closed).
|
||||
Err(_) => break, // Timeout (max duration + grace).
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -58,9 +58,7 @@ impl WebRtcEngine {
|
||||
.register_default_codecs()
|
||||
.map_err(|e| format!("register codecs: {e}"))?;
|
||||
|
||||
let api = APIBuilder::new()
|
||||
.with_media_engine(media_engine)
|
||||
.build();
|
||||
let api = APIBuilder::new().with_media_engine(media_engine).build();
|
||||
|
||||
let config = RTCConfiguration {
|
||||
ice_servers: vec![],
|
||||
@@ -91,8 +89,7 @@ impl WebRtcEngine {
|
||||
.map_err(|e| format!("add track: {e}"))?;
|
||||
|
||||
// Shared mixer channel sender (populated when linked to a call).
|
||||
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> =
|
||||
Arc::new(Mutex::new(None));
|
||||
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
// ICE candidate handler.
|
||||
let out_tx_ice = self.out_tx.clone();
|
||||
@@ -256,7 +253,11 @@ impl WebRtcEngine {
|
||||
|
||||
pub async fn close_session(&mut self, session_id: &str) -> Result<(), String> {
|
||||
if let Some(session) = self.sessions.remove(session_id) {
|
||||
session.pc.close().await.map_err(|e| format!("close: {e}"))?;
|
||||
session
|
||||
.pc
|
||||
.close()
|
||||
.await
|
||||
.map_err(|e| format!("close: {e}"))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -51,9 +51,7 @@ impl SipDialog {
|
||||
.map(|s| s.to_string())
|
||||
.unwrap_or_else(generate_tag),
|
||||
remote_tag: None,
|
||||
local_uri: SipMessage::extract_uri(from)
|
||||
.unwrap_or("")
|
||||
.to_string(),
|
||||
local_uri: SipMessage::extract_uri(from).unwrap_or("").to_string(),
|
||||
remote_uri: SipMessage::extract_uri(to).unwrap_or("").to_string(),
|
||||
local_cseq,
|
||||
remote_cseq: 0,
|
||||
@@ -181,10 +179,7 @@ impl SipDialog {
|
||||
format!("<{}>{remote_tag_str}", self.remote_uri),
|
||||
),
|
||||
("Call-ID".to_string(), self.call_id.clone()),
|
||||
(
|
||||
"CSeq".to_string(),
|
||||
format!("{} {method}", self.local_cseq),
|
||||
),
|
||||
("CSeq".to_string(), format!("{} {method}", self.local_cseq)),
|
||||
("Max-Forwards".to_string(), "70".to_string()),
|
||||
];
|
||||
|
||||
@@ -243,10 +238,7 @@ impl SipDialog {
|
||||
format!("<{}>{remote_tag_str}", self.remote_uri),
|
||||
),
|
||||
("Call-ID".to_string(), self.call_id.clone()),
|
||||
(
|
||||
"CSeq".to_string(),
|
||||
format!("{} ACK", self.local_cseq),
|
||||
),
|
||||
("CSeq".to_string(), format!("{} ACK", self.local_cseq)),
|
||||
("Max-Forwards".to_string(), "70".to_string()),
|
||||
];
|
||||
|
||||
@@ -271,10 +263,7 @@ impl SipDialog {
|
||||
("From".to_string(), from),
|
||||
("To".to_string(), to),
|
||||
("Call-ID".to_string(), self.call_id.clone()),
|
||||
(
|
||||
"CSeq".to_string(),
|
||||
format!("{} CANCEL", self.local_cseq),
|
||||
),
|
||||
("CSeq".to_string(), format!("{} CANCEL", self.local_cseq)),
|
||||
("Max-Forwards".to_string(), "70".to_string()),
|
||||
("Content-Length".to_string(), "0".to_string()),
|
||||
];
|
||||
@@ -284,11 +273,7 @@ impl SipDialog {
|
||||
.unwrap_or(&self.remote_target)
|
||||
.to_string();
|
||||
|
||||
SipMessage::new(
|
||||
format!("CANCEL {ruri} SIP/2.0"),
|
||||
headers,
|
||||
String::new(),
|
||||
)
|
||||
SipMessage::new(format!("CANCEL {ruri} SIP/2.0"), headers, String::new())
|
||||
}
|
||||
|
||||
/// Transition the dialog to terminated state.
|
||||
|
||||
@@ -27,7 +27,9 @@ pub fn generate_branch() -> String {
|
||||
|
||||
fn random_hex(bytes: usize) -> String {
|
||||
let mut rng = rand::thread_rng();
|
||||
(0..bytes).map(|_| format!("{:02x}", rng.gen::<u8>())).collect()
|
||||
(0..bytes)
|
||||
.map(|_| format!("{:02x}", rng.gen::<u8>()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
// ---- Codec registry --------------------------------------------------------
|
||||
@@ -142,7 +144,9 @@ pub fn parse_digest_challenge(header: &str) -> Option<DigestChallenge> {
|
||||
return Some(after[1..1 + end].to_string());
|
||||
}
|
||||
// Unquoted value.
|
||||
let end = after.find(|c: char| c == ',' || c.is_whitespace()).unwrap_or(after.len());
|
||||
let end = after
|
||||
.find(|c: char| c == ',' || c.is_whitespace())
|
||||
.unwrap_or(after.len());
|
||||
return Some(after[..end].to_string());
|
||||
}
|
||||
None
|
||||
@@ -241,11 +245,7 @@ pub struct MwiResult {
|
||||
pub extra_headers: Vec<(String, String)>,
|
||||
}
|
||||
|
||||
pub fn build_mwi_body(
|
||||
new_messages: u32,
|
||||
old_messages: u32,
|
||||
account_uri: &str,
|
||||
) -> MwiResult {
|
||||
pub fn build_mwi_body(new_messages: u32, old_messages: u32, account_uri: &str) -> MwiResult {
|
||||
let waiting = if new_messages > 0 { "yes" } else { "no" };
|
||||
let body = format!(
|
||||
"Messages-Waiting: {waiting}\r\n\
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
//! SDP handling, Digest authentication, and URI rewriting.
|
||||
//! Ported from the TypeScript `ts/sip/` library.
|
||||
|
||||
pub mod message;
|
||||
pub mod dialog;
|
||||
pub mod helpers;
|
||||
pub mod message;
|
||||
pub mod rewrite;
|
||||
|
||||
/// Network endpoint (address + port + optional negotiated codec).
|
||||
|
||||
@@ -14,7 +14,11 @@ pub struct SipMessage {
|
||||
|
||||
impl SipMessage {
|
||||
pub fn new(start_line: String, headers: Vec<(String, String)>, body: String) -> Self {
|
||||
Self { start_line, headers, body }
|
||||
Self {
|
||||
start_line,
|
||||
headers,
|
||||
body,
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Parsing -----------------------------------------------------------
|
||||
@@ -175,7 +179,8 @@ impl SipMessage {
|
||||
|
||||
/// Inserts a header at the top of the header list.
|
||||
pub fn prepend_header(&mut self, name: &str, value: &str) -> &mut Self {
|
||||
self.headers.insert(0, (name.to_string(), value.to_string()));
|
||||
self.headers
|
||||
.insert(0, (name.to_string(), value.to_string()));
|
||||
self
|
||||
}
|
||||
|
||||
@@ -233,10 +238,7 @@ impl SipMessage {
|
||||
.to_display_name
|
||||
.map(|d| format!("\"{d}\" "))
|
||||
.unwrap_or_default();
|
||||
let to_tag_str = opts
|
||||
.to_tag
|
||||
.map(|t| format!(";tag={t}"))
|
||||
.unwrap_or_default();
|
||||
let to_tag_str = opts.to_tag.map(|t| format!(";tag={t}")).unwrap_or_default();
|
||||
|
||||
let mut headers = vec![
|
||||
(
|
||||
@@ -364,7 +366,43 @@ impl SipMessage {
|
||||
.find(|c: char| c == ';' || c == '>')
|
||||
.unwrap_or(trimmed.len());
|
||||
let result = &trimmed[..end];
|
||||
if result.is_empty() { None } else { Some(result) }
|
||||
if result.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract the user part from a SIP/TEL URI or header value.
|
||||
pub fn extract_uri_user(uri_or_header_value: &str) -> Option<&str> {
|
||||
let raw = Self::extract_uri(uri_or_header_value).unwrap_or(uri_or_header_value);
|
||||
let raw = raw.trim();
|
||||
if raw.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let user_part = if raw
|
||||
.get(..5)
|
||||
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("sips:"))
|
||||
{
|
||||
&raw[5..]
|
||||
} else if raw.get(..4).is_some_and(|prefix| {
|
||||
prefix.eq_ignore_ascii_case("sip:") || prefix.eq_ignore_ascii_case("tel:")
|
||||
}) {
|
||||
&raw[4..]
|
||||
} else {
|
||||
raw
|
||||
};
|
||||
|
||||
let end = user_part
|
||||
.find(|c: char| matches!(c, '@' | ';' | '?' | '>'))
|
||||
.unwrap_or(user_part.len());
|
||||
let result = &user_part[..end];
|
||||
if result.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(result)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -506,6 +544,19 @@ mod tests {
|
||||
SipMessage::extract_uri("\"Name\" <sip:user@host>;tag=abc"),
|
||||
Some("sip:user@host")
|
||||
);
|
||||
assert_eq!(
|
||||
SipMessage::extract_uri_user("\"Name\" <sip:+49 421 219694@host>;tag=abc"),
|
||||
Some("+49 421 219694")
|
||||
);
|
||||
assert_eq!(
|
||||
SipMessage::extract_uri_user("sip:0049421219694@voip.easybell.de"),
|
||||
Some("0049421219694")
|
||||
);
|
||||
assert_eq!(
|
||||
SipMessage::extract_uri_user("tel:+49421219694;phone-context=example.com"),
|
||||
Some("+49421219694")
|
||||
);
|
||||
assert_eq!(SipMessage::extract_uri_user("SIP:user@host"), Some("user"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -535,7 +586,10 @@ mod tests {
|
||||
);
|
||||
assert_eq!(invite.method(), Some("INVITE"));
|
||||
assert_eq!(invite.call_id(), "test-123");
|
||||
assert!(invite.get_header("Via").unwrap().contains("192.168.1.1:5070"));
|
||||
assert!(invite
|
||||
.get_header("Via")
|
||||
.unwrap()
|
||||
.contains("192.168.1.1:5070"));
|
||||
|
||||
let response = SipMessage::create_response(
|
||||
200,
|
||||
|
||||
@@ -92,7 +92,11 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option<Endpoint>
|
||||
.collect();
|
||||
|
||||
let original = match (orig_addr, orig_port) {
|
||||
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }),
|
||||
(Some(a), Some(p)) => Some(Endpoint {
|
||||
address: a,
|
||||
port: p,
|
||||
codec_pt: None,
|
||||
}),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: 'siprouter',
|
||||
version: '1.23.0',
|
||||
version: '1.25.0',
|
||||
description: 'undefined'
|
||||
}
|
||||
|
||||
38
ts/config.ts
38
ts/config.ts
@@ -48,6 +48,24 @@ export interface IDeviceConfig {
|
||||
extension: string;
|
||||
}
|
||||
|
||||
export type TIncomingNumberMode = 'single' | 'range' | 'regex';
|
||||
|
||||
export interface IIncomingNumberConfig {
|
||||
id: string;
|
||||
label: string;
|
||||
providerId?: string;
|
||||
mode: TIncomingNumberMode;
|
||||
countryCode?: string;
|
||||
areaCode?: string;
|
||||
localNumber?: string;
|
||||
rangeEnd?: string;
|
||||
pattern?: string;
|
||||
|
||||
// Legacy persisted fields kept for migration compatibility.
|
||||
number?: string;
|
||||
rangeStart?: string;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Match/Action routing model
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -62,8 +80,11 @@ export interface ISipRouteMatch {
|
||||
direction: 'inbound' | 'outbound';
|
||||
|
||||
/**
|
||||
* Match the dialed/called number (To/Request-URI for inbound DID, dialed digits for outbound).
|
||||
* Supports: exact string, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/").
|
||||
* Match the normalized called number.
|
||||
*
|
||||
* Inbound: matches the provider-delivered DID / Request-URI user part.
|
||||
* Outbound: matches the normalized dialed digits.
|
||||
* Supports: exact string, numeric range `start..end`, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/").
|
||||
*/
|
||||
numberPattern?: string;
|
||||
|
||||
@@ -89,13 +110,13 @@ export interface ISipRouteAction {
|
||||
|
||||
// --- Inbound actions (IVR / voicemail) ---
|
||||
|
||||
/** Route directly to a voicemail box (skip ringing devices). */
|
||||
/** Voicemail fallback for matched inbound routes. */
|
||||
voicemailBox?: string;
|
||||
|
||||
/** Route to an IVR menu by menu ID (skip ringing devices). */
|
||||
ivrMenuId?: string;
|
||||
|
||||
/** Override no-answer timeout (seconds) before routing to voicemail. */
|
||||
/** Reserved for future no-answer handling. */
|
||||
noAnswerTimeout?: number;
|
||||
|
||||
// --- Outbound actions (provider selection) ---
|
||||
@@ -231,6 +252,7 @@ export interface IAppConfig {
|
||||
proxy: IProxyConfig;
|
||||
providers: IProviderConfig[];
|
||||
devices: IDeviceConfig[];
|
||||
incomingNumbers?: IIncomingNumberConfig[];
|
||||
routing: IRoutingConfig;
|
||||
contacts: IContact[];
|
||||
voiceboxes?: IVoiceboxConfig[];
|
||||
@@ -285,6 +307,14 @@ export function loadConfig(): IAppConfig {
|
||||
d.extension ??= '100';
|
||||
}
|
||||
|
||||
cfg.incomingNumbers ??= [];
|
||||
for (const incoming of cfg.incomingNumbers) {
|
||||
if (!incoming.id) incoming.id = `incoming-${Date.now()}`;
|
||||
incoming.label ??= incoming.id;
|
||||
incoming.mode ??= incoming.pattern ? 'regex' : incoming.rangeStart || incoming.rangeEnd ? 'range' : 'single';
|
||||
incoming.countryCode ??= incoming.mode === 'regex' ? undefined : '+49';
|
||||
}
|
||||
|
||||
cfg.routing ??= { routes: [] };
|
||||
cfg.routing.routes ??= [];
|
||||
|
||||
|
||||
@@ -266,6 +266,7 @@ async function handleRequest(
|
||||
if (existing && ud.displayName !== undefined) existing.displayName = ud.displayName;
|
||||
}
|
||||
}
|
||||
if (updates.incomingNumbers !== undefined) cfg.incomingNumbers = updates.incomingNumbers;
|
||||
if (updates.routing) {
|
||||
if (updates.routing.routes) {
|
||||
cfg.routing.routes = updates.routing.routes;
|
||||
|
||||
@@ -82,6 +82,19 @@ type TProxyCommands = {
|
||||
};
|
||||
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
|
||||
};
|
||||
start_tts_interaction: {
|
||||
params: {
|
||||
call_id: string;
|
||||
leg_id: string;
|
||||
text: string;
|
||||
voice?: string;
|
||||
model?: string;
|
||||
voices?: string;
|
||||
expected_digits: string;
|
||||
timeout_ms: number;
|
||||
};
|
||||
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
|
||||
};
|
||||
add_tool_leg: {
|
||||
params: {
|
||||
call_id: string;
|
||||
@@ -446,6 +459,40 @@ export async function startInteraction(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a live TTS interaction on a specific leg. The first chunk is rendered
|
||||
* up front and the rest streams into the mixer while playback is already live.
|
||||
*/
|
||||
export async function startTtsInteraction(
|
||||
callId: string,
|
||||
legId: string,
|
||||
text: string,
|
||||
expectedDigits: string,
|
||||
timeoutMs: number,
|
||||
options?: {
|
||||
voice?: string;
|
||||
model?: string;
|
||||
voices?: string;
|
||||
},
|
||||
): Promise<{ result: 'digit' | 'timeout' | 'cancelled'; digit?: string } | null> {
|
||||
if (!bridge || !initialized) return null;
|
||||
try {
|
||||
return await sendProxyCommand('start_tts_interaction', {
|
||||
call_id: callId,
|
||||
leg_id: legId,
|
||||
text,
|
||||
expected_digits: expectedDigits,
|
||||
timeout_ms: timeoutMs,
|
||||
voice: options?.voice,
|
||||
model: options?.model,
|
||||
voices: options?.voices,
|
||||
});
|
||||
} catch (error: unknown) {
|
||||
logFn?.(`[proxy-engine] start_tts_interaction error: ${errorMessage(error)}`);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a tool leg (recording or transcription) to a call.
|
||||
* Tool legs receive per-source unmerged audio from all participants.
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: 'siprouter',
|
||||
version: '1.23.0',
|
||||
version: '1.25.0',
|
||||
description: 'undefined'
|
||||
}
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user