4 Commits

Author SHA1 Message Date
89ae12318e v1.25.0
Some checks failed
Docker (tags) / release (push) Failing after 3s
2026-04-14 18:52:13 +00:00
feb3514de4 feat(proxy-engine): add live TTS streaming interactions and incoming number range support 2026-04-14 18:52:13 +00:00
adfc4726fd v1.24.0
Some checks failed
Docker (tags) / release (push) Failing after 2s
2026-04-14 16:35:54 +00:00
06c86d7e81 feat(routing): require explicit inbound DID routes and normalize SIP identities for provider-based number matching 2026-04-14 16:35:54 +00:00
32 changed files with 2872 additions and 827 deletions

View File

@@ -1,5 +1,23 @@
# Changelog
## 2026-04-14 - 1.25.0 - feat(proxy-engine)
add live TTS streaming interactions and incoming number range support
- add a new start_tts_interaction command and bridge API to begin IVR or leg interactions before full TTS rendering completes
- stream synthesized TTS chunks into the mixer with cancellation handling so prompts can stop cleanly on digit match, leg removal, or shutdown
- extract PCM-to-mixer frame conversion for reusable live prompt processing
- extend routing pattern matching to support numeric number ranges like start..end, including + prefixed values
- add incomingNumbers config typing and frontend config update support for single, range, and regex number modes
## 2026-04-14 - 1.24.0 - feat(routing)
require explicit inbound DID routes and normalize SIP identities for provider-based number matching
- Inbound route resolution now returns no match unless a configured inbound route explicitly matches the provider and called number.
- Normalized routing identities were added for SIP/TEL URIs so inbound DIDs and outbound dialed numbers match consistently across provider-specific formats.
- Call handling and incoming call events now use normalized numbers, improving routing accuracy for shared trunk providers.
- Route configuration docs and the web route editor were updated to support explicit inbound DID ownership, voicemail fallback, and IVR selection.
- Mixer RTP handling was enhanced to better support variable packet durations, timestamp-based gap fill, and non-blocking output drop reporting.
## 2026-04-14 - 1.23.0 - feat(runtime)
refactor runtime state and proxy event handling for typed WebRTC linking and shared status models

View File

@@ -1,6 +1,6 @@
{
"name": "siprouter",
"version": "1.23.0",
"version": "1.25.0",
"private": true,
"type": "module",
"scripts": {

View File

@@ -148,24 +148,41 @@ Create `.nogit/config.json`:
"routing": {
"routes": [
{
"id": "inbound-default",
"name": "Ring all devices",
"priority": 100,
"direction": "inbound",
"match": {},
"id": "inbound-main-did",
"name": "Main DID",
"priority": 200,
"enabled": true,
"match": {
"direction": "inbound",
"sourceProvider": "my-trunk",
"numberPattern": "+49421219694"
},
"action": {
"targets": ["desk-phone"],
"ringBrowsers": true,
"voicemailBox": "main",
"noAnswerTimeout": 25
"voicemailBox": "main"
}
},
{
"id": "inbound-support-did",
"name": "Support DID",
"priority": 190,
"enabled": true,
"match": {
"direction": "inbound",
"sourceProvider": "my-trunk",
"numberPattern": "+49421219695"
},
"action": {
"ivrMenuId": "support-menu"
}
},
{
"id": "outbound-default",
"name": "Route via trunk",
"priority": 100,
"direction": "outbound",
"match": {},
"enabled": true,
"match": { "direction": "outbound" },
"action": { "provider": "my-trunk" }
}
]
@@ -187,6 +204,8 @@ Create `.nogit/config.json`:
}
```
Inbound number ownership is explicit: add one inbound route per DID (or DID prefix) and scope it with `sourceProvider` when a provider delivers multiple external numbers.
### TTS Setup (Optional)
For neural voicemail greetings and IVR prompts, download the Kokoro TTS model:

View File

@@ -115,9 +115,8 @@ pub struct TranscodeState {
impl TranscodeState {
/// Create a new transcoding session with fresh codec state.
pub fn new() -> Result<Self, String> {
let mut opus_enc =
OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip)
.map_err(|e| format!("opus encoder: {e}"))?;
let mut opus_enc = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip)
.map_err(|e| format!("opus encoder: {e}"))?;
opus_enc
.set_complexity(5)
.map_err(|e| format!("opus set_complexity: {e}"))?;
@@ -160,14 +159,9 @@ impl TranscodeState {
let key = (from_rate, to_rate, canonical_chunk);
if !self.resamplers.contains_key(&key) {
let r = FftFixedIn::<f64>::new(
from_rate as usize,
to_rate as usize,
canonical_chunk,
1,
1,
)
.map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?;
let r =
FftFixedIn::<f64>::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1)
.map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?;
self.resamplers.insert(key, r);
}
let resampler = self.resamplers.get_mut(&key).unwrap();
@@ -284,8 +278,7 @@ impl TranscodeState {
match pt {
PT_OPUS => {
let mut pcm = vec![0i16; 5760]; // up to 120ms at 48kHz
let packet =
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
let n: usize = self
@@ -343,8 +336,7 @@ impl TranscodeState {
match pt {
PT_OPUS => {
let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz
let packet =
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let packet = OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
let n: usize = self
@@ -368,8 +360,8 @@ impl TranscodeState {
/// Returns f32 PCM at 48kHz. `frame_size` should be 960 for 20ms.
pub fn opus_plc(&mut self, frame_size: usize) -> Result<Vec<f32>, String> {
let mut pcm = vec![0.0f32; frame_size];
let out = MutSignals::try_from(&mut pcm[..])
.map_err(|e| format!("opus plc signals: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus plc signals: {e}"))?;
let n: usize = self
.opus_dec
.decode_float(None::<OpusPacket<'_>>, out, false)
@@ -425,14 +417,9 @@ impl TranscodeState {
let key = (from_rate, to_rate, canonical_chunk);
if !self.resamplers_f32.contains_key(&key) {
let r = FftFixedIn::<f32>::new(
from_rate as usize,
to_rate as usize,
canonical_chunk,
1,
1,
)
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
let r =
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, canonical_chunk, 1, 1)
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
self.resamplers_f32.insert(key, r);
}
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
@@ -508,8 +495,10 @@ mod tests {
let encoded = mulaw_encode(sample);
let decoded = mulaw_decode(encoded);
// µ-law is lossy; verify the decoded value is close.
assert!((sample as i32 - decoded as i32).abs() < 1000,
"µ-law roundtrip failed for {sample}: got {decoded}");
assert!(
(sample as i32 - decoded as i32).abs() < 1000,
"µ-law roundtrip failed for {sample}: got {decoded}"
);
}
}
@@ -518,8 +507,10 @@ mod tests {
for sample in [-32768i16, -1000, -1, 0, 1, 1000, 32767] {
let encoded = alaw_encode(sample);
let decoded = alaw_decode(encoded);
assert!((sample as i32 - decoded as i32).abs() < 1000,
"A-law roundtrip failed for {sample}: got {decoded}");
assert!(
(sample as i32 - decoded as i32).abs() < 1000,
"A-law roundtrip failed for {sample}: got {decoded}"
);
}
}
@@ -543,7 +534,9 @@ mod tests {
fn pcmu_to_pcma_roundtrip() {
let mut st = TranscodeState::new().unwrap();
// 160 bytes = 20ms of PCMU at 8kHz
let pcmu_data: Vec<u8> = (0..160).map(|i| mulaw_encode((i as i16 * 200) - 16000)).collect();
let pcmu_data: Vec<u8> = (0..160)
.map(|i| mulaw_encode((i as i16 * 200) - 16000))
.collect();
let pcma = st.transcode(&pcmu_data, PT_PCMU, PT_PCMA, None).unwrap();
assert_eq!(pcma.len(), 160); // Same frame size
let back = st.transcode(&pcma, PT_PCMA, PT_PCMU, None).unwrap();

View File

@@ -36,10 +36,7 @@ pub async fn play_wav_file(
// Read all samples as i16.
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
reader
.samples::<i16>()
.filter_map(|s| s.ok())
.collect()
reader.samples::<i16>().filter_map(|s| s.ok()).collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
@@ -199,10 +196,7 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
.map(|s| s as f32 / 32768.0)
.collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
.filter_map(|s| s.ok())
.collect()
reader.samples::<f32>().filter_map(|s| s.ok()).collect()
} else {
return Err(format!(
"unsupported WAV format: {}bit {:?}",
@@ -214,14 +208,23 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
return Ok(vec![]);
}
pcm_to_mix_frames(&samples, wav_rate)
}
/// Convert PCM samples at an arbitrary rate into 48kHz 20ms mixer frames.
pub fn pcm_to_mix_frames(samples: &[f32], sample_rate: u32) -> Result<Vec<Vec<f32>>, String> {
if samples.is_empty() {
return Ok(vec![]);
}
// Resample to MIX_RATE (48kHz) if needed.
let resampled = if wav_rate != MIX_RATE {
let resampled = if sample_rate != MIX_RATE {
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
transcoder
.resample_f32(&samples, wav_rate, MIX_RATE)
.resample_f32(samples, sample_rate, MIX_RATE)
.map_err(|e| format!("resample: {e}"))?
} else {
samples
samples.to_vec()
};
// Split into MIX_FRAME_SIZE (960) sample frames.

View File

@@ -5,7 +5,7 @@
//! The mixer provides mix-minus audio to all participants.
use crate::call::{Call, CallDirection, CallState, LegId, LegInfo, LegKind, LegState};
use crate::config::{AppConfig, ProviderConfig};
use crate::config::{normalize_routing_identity, AppConfig, ProviderConfig};
use crate::ipc::{emit_event, OutTx};
use crate::leg_io::{create_leg_channels, spawn_sip_inbound, spawn_sip_outbound};
use crate::mixer::spawn_mixer;
@@ -13,7 +13,9 @@ use crate::registrar::Registrar;
use crate::rtp::RtpPortPool;
use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig};
use crate::tts::TtsEngine;
use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions};
use sip_proto::helpers::{
build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions,
};
use sip_proto::message::{ResponseOptions, SipMessage};
use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri};
use std::collections::HashMap;
@@ -25,7 +27,7 @@ use tokio::sync::Mutex;
/// Result of creating an inbound call — carries both the call id and
/// whether browsers should be notified (flows from the matched inbound
/// route's `ring_browsers` flag, or the fallback default).
/// route's `ring_browsers` flag).
pub struct InboundCallCreated {
pub call_id: String,
pub ring_browsers: bool,
@@ -214,15 +216,27 @@ impl CallManager {
let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice);
if let Some(dev) = device_leg {
if let Some(dev_addr) = dev.signaling_addr {
let ringing = SipMessage::create_response(180, "Ringing", device_invite, None);
let ringing = SipMessage::create_response(
180,
"Ringing",
device_invite,
None,
);
let _ = socket.send_to(&ringing.serialize(), dev_addr).await;
}
}
}
}
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }));
emit_event(
&self.out_tx,
"call_ringing",
serde_json::json!({ "call_id": call_id }),
);
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "ringing" }),
);
}
SipLegAction::ConnectedWithAck(ack_buf) => {
let _ = socket.send_to(&ack_buf, target).await;
@@ -245,16 +259,30 @@ impl CallManager {
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
spawn_sip_outbound(rtp_socket, remote_addr, channels.outbound_rx);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(leg_id, sip_pt, channels.inbound_rx, channels.outbound_tx)
.await;
call.add_leg_to_mixer(
leg_id,
sip_pt,
channels.inbound_rx,
channels.outbound_tx,
)
.await;
}
}
// For device-originated calls: send 200 OK to device and wire device leg.
if let Some(call) = self.calls.get(call_id) {
if let Some(device_invite) = call.device_invite.clone() {
let device_leg_info: Option<(SocketAddr, u16, Arc<UdpSocket>, Option<SocketAddr>, String)> =
call.legs.values().find(|l| l.kind == LegKind::SipDevice).and_then(|dev| {
let device_leg_info: Option<(
SocketAddr,
u16,
Arc<UdpSocket>,
Option<SocketAddr>,
String,
)> = call
.legs
.values()
.find(|l| l.kind == LegKind::SipDevice)
.and_then(|dev| {
Some((
dev.signaling_addr?,
dev.rtp_port,
@@ -264,11 +292,21 @@ impl CallManager {
))
});
if let Some((dev_addr, dev_rtp_port, dev_rtp_socket, dev_remote, dev_leg_id)) = device_leg_info {
if let Some((
dev_addr,
dev_rtp_port,
dev_rtp_socket,
dev_remote,
dev_leg_id,
)) = device_leg_info
{
// Build SDP pointing device to our device_rtp port.
// Use LAN IP for the device (it's on the local network).
let call_ref = self.calls.get(call_id).unwrap();
let prov_leg = call_ref.legs.values().find(|l| l.kind == LegKind::SipProvider);
let prov_leg = call_ref
.legs
.values()
.find(|l| l.kind == LegKind::SipProvider);
let lan_ip_str = prov_leg
.and_then(|l| l.sip_leg.as_ref())
.map(|sl| sl.config.lan_ip.clone())
@@ -280,13 +318,18 @@ impl CallManager {
..Default::default()
});
let ok = SipMessage::create_response(200, "OK", &device_invite, Some(ResponseOptions {
to_tag: Some(generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip_str, 5060)),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: None,
}));
let ok = SipMessage::create_response(
200,
"OK",
&device_invite,
Some(ResponseOptions {
to_tag: Some(generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip_str, 5060)),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: None,
}),
);
let _ = socket.send_to(&ok.serialize(), dev_addr).await;
// Update device leg state.
@@ -313,27 +356,47 @@ impl CallManager {
if let Some(dev_remote_addr) = dev_remote {
let dev_channels = create_leg_channels();
spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx);
spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx);
spawn_sip_outbound(
dev_rtp_socket,
dev_remote_addr,
dev_channels.outbound_rx,
);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(&dev_leg_id, dev_pt, dev_channels.inbound_rx, dev_channels.outbound_tx)
.await;
call.add_leg_to_mixer(
&dev_leg_id,
dev_pt,
dev_channels.inbound_rx,
dev_channels.outbound_tx,
)
.await;
}
}
}
}
}
emit_event(&self.out_tx, "call_answered", serde_json::json!({
"call_id": call_id,
"provider_media_addr": remote.map(|a| a.ip().to_string()),
"provider_media_port": remote.map(|a| a.port()),
"sip_pt": sip_pt,
}));
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }));
emit_event(
&self.out_tx,
"call_answered",
serde_json::json!({
"call_id": call_id,
"provider_media_addr": remote.map(|a| a.ip().to_string()),
"provider_media_port": remote.map(|a| a.port()),
"sip_pt": sip_pt,
}),
);
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "connected" }),
);
}
SipLegAction::Terminated(reason) => {
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
let duration = self
.calls
.get(call_id)
.map(|c| c.duration_secs())
.unwrap_or(0);
// Notify device if this is a device-originated outbound call.
if let Some(call) = self.calls.get(call_id) {
@@ -343,7 +406,8 @@ impl CallManager {
if let Some(dev_addr) = dev.signaling_addr {
// Map reason to SIP response code.
let code: u16 = if reason.starts_with("rejected_") {
reason.strip_prefix("rejected_")
reason
.strip_prefix("rejected_")
.and_then(|s| s.parse().ok())
.unwrap_or(503)
} else if reason == "bye" {
@@ -354,7 +418,12 @@ impl CallManager {
503
};
if code > 0 && dev.state != LegState::Connected {
let resp = SipMessage::create_response(code, "Service Unavailable", device_invite, None);
let resp = SipMessage::create_response(
code,
"Service Unavailable",
device_invite,
None,
);
let _ = socket.send_to(&resp.serialize(), dev_addr).await;
}
}
@@ -367,22 +436,38 @@ impl CallManager {
leg.state = LegState::Terminated;
}
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }),
);
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }),
);
self.terminate_call(call_id).await;
return true;
}
SipLegAction::SendAndTerminate(buf, reason) => {
let _ = socket.send_to(&buf, from_addr).await;
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
let duration = self
.calls
.get(call_id)
.map(|c| c.duration_secs())
.unwrap_or(0);
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }),
);
self.terminate_call(call_id).await;
return true;
}
SipLegAction::AuthRetry { ack_407, invite_with_auth } => {
SipLegAction::AuthRetry {
ack_407,
invite_with_auth,
} => {
if let Some(ack) = ack_407 {
let _ = socket.send_to(&ack, target).await;
}
@@ -416,11 +501,21 @@ impl CallManager {
let this_kind = this_leg.map(|l| l.kind).unwrap_or(LegKind::SipProvider);
// Find the counterpart leg.
let other_leg = call.legs.values().find(|l| l.id != this_leg_id && l.state != LegState::Terminated);
let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) = match other_leg {
Some(l) => (l.signaling_addr, l.rtp_port, l.id.clone(), l.kind, l.public_ip.clone()),
None => return false,
};
let other_leg = call
.legs
.values()
.find(|l| l.id != this_leg_id && l.state != LegState::Terminated);
let (other_addr, other_rtp_port, other_leg_id, other_kind, other_public_ip) =
match other_leg {
Some(l) => (
l.signaling_addr,
l.rtp_port,
l.id.clone(),
l.kind,
l.public_ip.clone(),
),
None => return false,
};
let forward_to = match other_addr {
Some(a) => a,
None => return false,
@@ -439,7 +534,9 @@ impl CallManager {
};
// Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt).
let other_has_sip_leg = call.legs.get(&other_leg_id)
let other_has_sip_leg = call
.legs
.get(&other_leg_id)
.map(|l| l.sip_leg.is_some())
.unwrap_or(false);
@@ -473,7 +570,8 @@ impl CallManager {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
let _ =
socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
@@ -504,7 +602,8 @@ impl CallManager {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
let _ =
socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
@@ -541,13 +640,17 @@ impl CallManager {
if this_kind == LegKind::SipProvider {
// From provider → forward to device: rewrite request URI.
if let Some(ruri) = fwd.request_uri().map(|s| s.to_string()) {
let new_ruri = rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port());
let new_ruri =
rewrite_sip_uri(&ruri, &forward_to.ip().to_string(), forward_to.port());
fwd.set_request_uri(&new_ruri);
}
}
if fwd.is_dialog_establishing() {
// Record-Route must also be routable from the destination leg.
fwd.prepend_header("Record-Route", &format!("<sip:{advertise_ip}:{lan_port};lr>"));
fwd.prepend_header(
"Record-Route",
&format!("<sip:{advertise_ip}:{lan_port};lr>"),
);
}
let _ = socket.send_to(&fwd.serialize(), forward_to).await;
return true;
@@ -572,13 +675,20 @@ impl CallManager {
if code == 180 || code == 183 {
if call.state == CallState::SettingUp {
call.state = CallState::Ringing;
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
emit_event(
&self.out_tx,
"call_ringing",
serde_json::json!({ "call_id": call_id }),
);
}
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }),
);
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
@@ -598,12 +708,19 @@ impl CallManager {
needs_wiring = true;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }),
);
if call.state != CallState::Connected {
call.state = CallState::Connected;
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
emit_event(
&self.out_tx,
"call_answered",
serde_json::json!({ "call_id": call_id }),
);
}
// Forward the response before wiring (drop call borrow).
@@ -693,28 +810,27 @@ impl CallManager {
// Extract caller/callee info.
let from_header = invite.get_header("From").unwrap_or("");
let caller_number = SipMessage::extract_uri(from_header)
.unwrap_or("Unknown")
.to_string();
let called_number = invite
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or("")
.to_string();
let caller_number = normalize_routing_identity(from_header);
let called_number = normalize_routing_identity(invite.request_uri().unwrap_or(""));
// Resolve via the configured inbound routing table. This honors
// user-defined routes from the UI (numberPattern, callerPattern,
// sourceProvider, targets, ringBrowsers). If no route matches, the
// fallback returns an empty `device_ids` and `ring_browsers: true`,
// which preserves pre-routing behavior via the `resolve_first_device`
// fallback below.
// Resolve via the configured inbound routing table. The matched route
// is the source of truth for which external numbers this provider is
// allowed to deliver to us.
//
// TODO: Multi-target inbound fork is not yet implemented.
// - `route.device_ids` beyond the first registered target are ignored.
// - `ring_browsers` is informational only — browsers see a toast but
// do not race the SIP device. First-to-answer-wins requires a
// multi-leg fork + per-leg CANCEL, which is not built yet.
let route = config.resolve_inbound_route(provider_id, &called_number, &caller_number);
let route = match config.resolve_inbound_route(provider_id, &called_number, &caller_number)
{
Some(route) => route,
None => {
let resp = SipMessage::create_response(404, "Not Found", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
}
};
let ring_browsers = route.ring_browsers;
// IVR routing: if the route targets an IVR menu, go there directly.
@@ -724,12 +840,24 @@ impl CallManager {
if let Some(menu) = ivr.menus.iter().find(|m| m.id == *ivr_menu_id) {
let call_id = self
.route_to_ivr(
&call_id, invite, from_addr, &caller_number,
provider_id, provider_config, config, rtp_pool, socket,
public_ip, menu, &tts_engine,
&call_id,
invite,
from_addr,
&caller_number,
provider_id,
provider_config,
config,
rtp_pool,
socket,
public_ip,
menu,
&tts_engine,
)
.await?;
return Some(InboundCallCreated { call_id, ring_browsers });
return Some(InboundCallCreated {
call_id,
ring_browsers,
});
}
}
}
@@ -748,19 +876,27 @@ impl CallManager {
None => {
// No device registered → voicemail.
// Resolve greeting WAV on-demand (may trigger TTS generation).
let greeting_wav = resolve_greeting_wav(
config,
route.voicemail_box.as_deref(),
&tts_engine,
).await;
let greeting_wav =
resolve_greeting_wav(config, route.voicemail_box.as_deref(), &tts_engine).await;
let call_id = self
.route_to_voicemail(
&call_id, invite, from_addr, &caller_number,
provider_id, provider_config, config, rtp_pool, socket, public_ip,
&call_id,
invite,
from_addr,
&caller_number,
provider_id,
provider_config,
config,
rtp_pool,
socket,
public_ip,
greeting_wav,
)
.await?;
return Some(InboundCallCreated { call_id, ring_browsers });
return Some(InboundCallCreated {
call_id,
ring_browsers,
});
}
};
@@ -855,8 +991,10 @@ impl CallManager {
// Register SIP Call-ID → both legs (provider leg handles provider messages).
// For passthrough, both legs share the same SIP Call-ID.
// We route based on source address in route_passthrough_message.
self.sip_index
.insert(sip_call_id.clone(), (call_id.clone(), provider_leg_id.clone()));
self.sip_index.insert(
sip_call_id.clone(),
(call_id.clone(), provider_leg_id.clone()),
);
// Rewrite and forward INVITE to device.
let mut fwd_invite = invite.clone();
@@ -889,7 +1027,10 @@ impl CallManager {
}
}
Some(InboundCallCreated { call_id, ring_browsers })
Some(InboundCallCreated {
call_id,
ring_browsers,
})
}
/// Initiate an outbound B2BUA call from the dashboard.
@@ -938,7 +1079,9 @@ impl CallManager {
// Send INVITE.
let to_uri = format!("sip:{number}@{}", provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
sip_leg
.send_invite(registered_aor, &to_uri, &sip_call_id, socket)
.await;
// Create call with mixer.
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.clone(), self.out_tx.clone());
@@ -1109,7 +1252,9 @@ impl CallManager {
// Build proper To URI and send INVITE.
let to_uri = format!("sip:{}@{}", dialed_number, provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket).await;
sip_leg
.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket)
.await;
call.legs.insert(
provider_leg_id.clone(),
@@ -1185,7 +1330,9 @@ impl CallManager {
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
let to_uri = format!("sip:{number}@{}", provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &sip_call_id, socket).await;
sip_leg
.send_invite(registered_aor, &to_uri, &sip_call_id, socket)
.await;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
@@ -1256,9 +1403,16 @@ impl CallManager {
};
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
let to_uri = format!("sip:{}@{}:{}", device_id, device_addr.ip(), device_addr.port());
let to_uri = format!(
"sip:{}@{}:{}",
device_id,
device_addr.ip(),
device_addr.port()
);
let from_uri = format!("sip:sipproxy@{lan_ip}:{lan_port}");
sip_leg.send_invite(&from_uri, &to_uri, &sip_call_id, socket).await;
sip_leg
.send_invite(&from_uri, &to_uri, &sip_call_id, socket)
.await;
let leg_info = LegInfo {
id: leg_id.clone(),
@@ -1292,12 +1446,7 @@ impl CallManager {
}
/// Remove a leg from a call.
pub async fn remove_leg(
&mut self,
call_id: &str,
leg_id: &str,
socket: &UdpSocket,
) -> bool {
pub async fn remove_leg(&mut self, call_id: &str, leg_id: &str, socket: &UdpSocket) -> bool {
let call = match self.calls.get_mut(call_id) {
Some(c) => c,
None => return false,
@@ -1310,7 +1459,9 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(leg_id) {
if let Some(sip_leg) = &mut leg.sip_leg {
if let Some(hangup_bytes) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
let _ = socket
.send_to(&hangup_bytes, sip_leg.config.sip_target)
.await;
}
}
leg.state = LegState::Terminated;
@@ -1356,9 +1507,7 @@ impl CallManager {
target_call_id: &str,
) -> bool {
// Validate both calls exist and the leg is in the source call.
if !self.calls.contains_key(source_call_id)
|| !self.calls.contains_key(target_call_id)
{
if !self.calls.contains_key(source_call_id) || !self.calls.contains_key(target_call_id) {
return false;
}
@@ -1503,7 +1652,9 @@ impl CallManager {
}
if let Some(sip_leg) = &mut leg.sip_leg {
if let Some(hangup_bytes) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_bytes, sip_leg.config.sip_target).await;
let _ = socket
.send_to(&hangup_bytes, sip_leg.config.sip_target)
.await;
}
} else if let Some(addr) = leg.signaling_addr {
// Passthrough leg — send a simple BYE.
@@ -1588,7 +1739,9 @@ impl CallManager {
});
let response = SipMessage::create_response(
200, "OK", invite,
200,
"OK",
invite,
Some(sip_proto::message::ResponseOptions {
to_tag: Some(sip_proto::helpers::generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
@@ -1665,9 +1818,15 @@ impl CallManager {
let rtp_socket = rtp_alloc.socket;
tokio::spawn(async move {
crate::voicemail::run_voicemail_session(
rtp_socket, provider_media, codec_pt,
greeting_wav, recording_path, 120_000,
call_id_owned, caller_owned, out_tx,
rtp_socket,
provider_media,
codec_pt,
greeting_wav,
recording_path,
120_000,
call_id_owned,
caller_owned,
out_tx,
)
.await;
});
@@ -1717,7 +1876,9 @@ impl CallManager {
});
let response = SipMessage::create_response(
200, "OK", invite,
200,
"OK",
invite,
Some(sip_proto::message::ResponseOptions {
to_tag: Some(sip_proto::helpers::generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
@@ -1781,24 +1942,23 @@ impl CallManager {
}
}
// Generate IVR prompt on-demand via TTS (cached).
// Generate the IVR prompt as a live chunked TTS stream so playback can
// start after the first chunk instead of waiting for a full WAV render.
let voice = menu.prompt_voice.as_deref().unwrap_or("af_bella");
let prompt_output = format!(".nogit/tts/ivr-menu-{}.wav", menu.id);
let prompt_params = serde_json::json!({
"model": ".nogit/tts/kokoro-v1.0.onnx",
"voices": ".nogit/tts/voices.bin",
"voice": voice,
"text": &menu.prompt_text,
"output": &prompt_output,
"cacheable": true,
});
let prompt_wav = {
let live_prompt = {
let mut tts = tts_engine.lock().await;
match tts.generate(&prompt_params).await {
Ok(_) => Some(prompt_output),
match tts
.start_live_prompt(crate::tts::TtsPromptRequest {
model_path: crate::tts::DEFAULT_MODEL_PATH.to_string(),
voices_path: crate::tts::DEFAULT_VOICES_PATH.to_string(),
voice_name: voice.to_string(),
text: menu.prompt_text.clone(),
})
.await
{
Ok(prompt) => Some(prompt),
Err(e) => {
eprintln!("[ivr] TTS generation failed: {e}");
eprintln!("[ivr] live TTS setup failed: {e}");
None
}
}
@@ -1815,17 +1975,14 @@ impl CallManager {
let timeout_ms = menu.timeout_sec.unwrap_or(5) * 1000;
tokio::spawn(async move {
// Load prompt PCM frames if available.
let prompt_frames = prompt_wav.as_ref().and_then(|wav| {
crate::audio_player::load_prompt_pcm_frames(wav).ok()
});
if let Some(frames) = prompt_frames {
if let Some(prompt) = live_prompt {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let _ = mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: provider_leg_id.clone(),
prompt_pcm_frames: frames,
prompt_pcm_frames: prompt.initial_frames,
prompt_stream_rx: Some(prompt.stream_rx),
prompt_cancel_tx: Some(prompt.cancel_tx),
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
@@ -1884,7 +2041,11 @@ impl CallManager {
// Internal helpers
// -----------------------------------------------------------------------
fn resolve_first_device(&self, config: &AppConfig, registrar: &Registrar) -> Option<SocketAddr> {
fn resolve_first_device(
&self,
config: &AppConfig,
registrar: &Registrar,
) -> Option<SocketAddr> {
for device in &config.devices {
if let Some(addr) = registrar.get_device_contact(&device.id) {
return Some(addr);
@@ -1907,8 +2068,7 @@ async fn resolve_greeting_wav(
tts_engine: &Arc<Mutex<TtsEngine>>,
) -> Option<String> {
// 1. Look up voicebox config.
let vb = voicebox_id
.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled));
let vb = voicebox_id.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled));
if let Some(vb) = vb {
// 2. Pre-recorded WAV takes priority.

View File

@@ -4,6 +4,7 @@
//! proxy engine via the `configure` command. These types mirror the TS interfaces.
use serde::Deserialize;
use sip_proto::message::SipMessage;
use std::net::SocketAddr;
/// Network endpoint.
@@ -227,8 +228,101 @@ pub struct IvrMenuEntry {
// Pattern matching (ported from ts/config.ts)
// ---------------------------------------------------------------------------
/// Extract the URI user part and normalize phone-like identities for routing.
///
/// This keeps inbound route matching stable across provider-specific URI shapes,
/// e.g. `sip:+49 421 219694@trunk.example` and `sip:0049421219694@trunk.example`
/// both normalize to `+49421219694`.
pub fn normalize_routing_identity(value: &str) -> String {
let extracted = SipMessage::extract_uri_user(value).unwrap_or(value).trim();
if extracted.is_empty() {
return String::new();
}
let mut digits = String::new();
let mut saw_plus = false;
for (idx, ch) in extracted.chars().enumerate() {
if ch.is_ascii_digit() {
digits.push(ch);
continue;
}
if ch == '+' && idx == 0 {
saw_plus = true;
continue;
}
if matches!(ch, ' ' | '\t' | '-' | '.' | '/' | '(' | ')') {
continue;
}
return extracted.to_string();
}
if digits.is_empty() {
return extracted.to_string();
}
if saw_plus {
return format!("+{digits}");
}
if digits.starts_with("00") && digits.len() > 2 {
return format!("+{}", &digits[2..]);
}
digits
}
fn parse_numeric_range_value(value: &str) -> Option<(bool, &str)> {
let trimmed = value.trim();
if trimmed.is_empty() {
return None;
}
let (has_plus, digits) = if let Some(rest) = trimmed.strip_prefix('+') {
(true, rest)
} else {
(false, trimmed)
};
if digits.is_empty() || !digits.chars().all(|c| c.is_ascii_digit()) {
return None;
}
Some((has_plus, digits))
}
fn matches_numeric_range_pattern(pattern: &str, value: &str) -> bool {
let Some((start, end)) = pattern.split_once("..") else {
return false;
};
let Some((start_plus, start_digits)) = parse_numeric_range_value(start) else {
return false;
};
let Some((end_plus, end_digits)) = parse_numeric_range_value(end) else {
return false;
};
let Some((value_plus, value_digits)) = parse_numeric_range_value(value) else {
return false;
};
if start_plus != end_plus || value_plus != start_plus {
return false;
}
if start_digits.len() != end_digits.len() || value_digits.len() != start_digits.len() {
return false;
}
if start_digits > end_digits {
return false;
}
value_digits >= start_digits && value_digits <= end_digits
}
/// Test a value against a pattern string.
/// - None/empty: matches everything (wildcard)
/// - `start..end`: numeric range match
/// - Trailing '*': prefix match
/// - Starts with '/': regex match
/// - Otherwise: exact match
@@ -244,6 +338,10 @@ pub fn matches_pattern(pattern: Option<&str>, value: &str) -> bool {
return value.starts_with(&pattern[..pattern.len() - 1]);
}
if matches_numeric_range_pattern(pattern, value) {
return true;
}
// Regex match: "/^\\+49/" or "/pattern/i"
if pattern.starts_with('/') {
if let Some(last_slash) = pattern[1..].rfind('/') {
@@ -363,7 +461,7 @@ impl AppConfig {
provider_id: &str,
called_number: &str,
caller_number: &str,
) -> InboundRouteResult {
) -> Option<InboundRouteResult> {
let mut routes: Vec<&Route> = self
.routing
.routes
@@ -387,22 +485,170 @@ impl AppConfig {
continue;
}
return InboundRouteResult {
return Some(InboundRouteResult {
device_ids: route.action.targets.clone().unwrap_or_default(),
ring_browsers: route.action.ring_browsers.unwrap_or(false),
voicemail_box: route.action.voicemail_box.clone(),
ivr_menu_id: route.action.ivr_menu_id.clone(),
no_answer_timeout: route.action.no_answer_timeout,
};
});
}
// Fallback: ring all devices + browsers.
InboundRouteResult {
device_ids: vec![],
ring_browsers: true,
voicemail_box: None,
ivr_menu_id: None,
no_answer_timeout: None,
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_app_config(routes: Vec<Route>) -> AppConfig {
AppConfig {
proxy: ProxyConfig {
lan_ip: "127.0.0.1".to_string(),
lan_port: 5070,
public_ip_seed: None,
rtp_port_range: RtpPortRange {
min: 20_000,
max: 20_100,
},
},
providers: vec![ProviderConfig {
id: "provider-a".to_string(),
display_name: "Provider A".to_string(),
domain: "example.com".to_string(),
outbound_proxy: Endpoint {
address: "example.com".to_string(),
port: 5060,
},
username: "user".to_string(),
password: "pass".to_string(),
register_interval_sec: 300,
codecs: vec![9],
quirks: Quirks {
early_media_silence: false,
silence_payload_type: None,
silence_max_packets: None,
},
}],
devices: vec![DeviceConfig {
id: "desk".to_string(),
display_name: "Desk".to_string(),
expected_address: "127.0.0.1".to_string(),
extension: "100".to_string(),
}],
routing: RoutingConfig { routes },
voiceboxes: vec![],
ivr: None,
}
}
#[test]
fn normalize_routing_identity_extracts_uri_user_and_phone_number() {
assert_eq!(
normalize_routing_identity("sip:0049 421 219694@voip.easybell.de"),
"+49421219694"
);
assert_eq!(
normalize_routing_identity("<tel:+49 (421) 219694>"),
"+49421219694"
);
assert_eq!(normalize_routing_identity("sip:100@pbx.local"), "100");
assert_eq!(normalize_routing_identity("sip:alice@pbx.local"), "alice");
}
#[test]
fn resolve_inbound_route_requires_explicit_match() {
let cfg = test_app_config(vec![]);
assert!(cfg
.resolve_inbound_route("provider-a", "+49421219694", "+491701234567")
.is_none());
}
#[test]
fn resolve_inbound_route_matches_per_number_on_shared_provider() {
let cfg = test_app_config(vec![
Route {
id: "main".to_string(),
name: "Main DID".to_string(),
priority: 200,
enabled: true,
match_criteria: RouteMatch {
direction: "inbound".to_string(),
number_pattern: Some("+49421219694".to_string()),
caller_pattern: None,
source_provider: Some("provider-a".to_string()),
source_device: None,
},
action: RouteAction {
targets: Some(vec!["desk".to_string()]),
ring_browsers: Some(true),
voicemail_box: None,
ivr_menu_id: None,
no_answer_timeout: None,
provider: None,
failover_providers: None,
strip_prefix: None,
prepend_prefix: None,
},
},
Route {
id: "support".to_string(),
name: "Support DID".to_string(),
priority: 100,
enabled: true,
match_criteria: RouteMatch {
direction: "inbound".to_string(),
number_pattern: Some("+49421219695".to_string()),
caller_pattern: None,
source_provider: Some("provider-a".to_string()),
source_device: None,
},
action: RouteAction {
targets: None,
ring_browsers: Some(false),
voicemail_box: Some("support-box".to_string()),
ivr_menu_id: None,
no_answer_timeout: Some(20),
provider: None,
failover_providers: None,
strip_prefix: None,
prepend_prefix: None,
},
},
]);
let main = cfg
.resolve_inbound_route("provider-a", "+49421219694", "+491701234567")
.expect("main DID should match");
assert_eq!(main.device_ids, vec!["desk".to_string()]);
assert!(main.ring_browsers);
let support = cfg
.resolve_inbound_route("provider-a", "+49421219695", "+491701234567")
.expect("support DID should match");
assert_eq!(support.voicemail_box.as_deref(), Some("support-box"));
assert_eq!(support.no_answer_timeout, Some(20));
assert!(!support.ring_browsers);
}
#[test]
fn matches_pattern_supports_numeric_ranges() {
assert!(matches_pattern(
Some("042116767546..042116767548"),
"042116767547"
));
assert!(!matches_pattern(
Some("042116767546..042116767548"),
"042116767549"
));
assert!(matches_pattern(
Some("+4942116767546..+4942116767548"),
"+4942116767547"
));
assert!(!matches_pattern(
Some("+4942116767546..+4942116767548"),
"042116767547"
));
}
}

View File

@@ -19,7 +19,13 @@ pub struct Command {
}
/// Send a response to a command.
pub fn respond(tx: &OutTx, id: &str, success: bool, result: Option<serde_json::Value>, error: Option<&str>) {
pub fn respond(
tx: &OutTx,
id: &str,
success: bool,
result: Option<serde_json::Value>,
error: Option<&str>,
) {
let mut resp = serde_json::json!({ "id": id, "success": success });
if let Some(r) = result {
resp["result"] = r;

View File

@@ -63,7 +63,8 @@ pub fn spawn_sip_inbound(
if offset + 4 > n {
continue; // Malformed: extension header truncated.
}
let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
let ext_len =
u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_len * 4;
}
if offset >= n {
@@ -74,7 +75,17 @@ pub fn spawn_sip_inbound(
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() {
if inbound_tx
.send(RtpPacket {
payload,
payload_type: pt,
marker,
seq,
timestamp,
})
.await
.is_err()
{
break; // Channel closed — leg removed.
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,8 @@
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
//! 1. Drain inbound channels, reorder RTP, decode variable-duration packets to 48kHz,
//! and queue them in per-leg PCM buffers
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
@@ -16,11 +17,12 @@
use crate::ipc::{emit_event, OutTx};
use crate::jitter_buffer::{JitterBuffer, JitterResult};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use crate::rtp::{build_rtp_header, rtp_clock_increment, rtp_clock_rate};
use crate::tts::TtsStreamMessage;
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
use nnnoiseless::DenoiseState;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
@@ -29,6 +31,12 @@ use tokio::time::{self, Duration, MissedTickBehavior};
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
/// Safety cap for how much timestamp-derived gap fill we synthesize at once.
const MAX_GAP_FILL_SAMPLES: usize = MIX_FRAME_SIZE * 6; // 120ms
/// Bound how many decode / concealment steps a leg can consume in one tick.
const MAX_PACKET_STEPS_PER_TICK: usize = 24;
/// Report the first output drop immediately, then every N drops.
const DROP_REPORT_INTERVAL: u64 = 50;
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
@@ -39,10 +47,6 @@ pub struct RtpPacket {
/// RTP sequence number for reordering.
pub seq: u16,
/// RTP timestamp from the original packet header.
///
/// Set on inbound RTP but not yet consumed downstream — reserved for
/// future jitter/sync work in the mixer.
#[allow(dead_code)]
pub timestamp: u32,
}
@@ -61,6 +65,12 @@ enum LegRole {
struct IsolationState {
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
prompt_frames: VecDeque<Vec<f32>>,
/// Live TTS frames arrive here while playback is already in progress.
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
/// Cancels the background TTS producer when the interaction ends early.
prompt_cancel_tx: Option<watch::Sender<bool>>,
/// Whether the live prompt stream has ended.
prompt_stream_finished: bool,
/// Digits that complete the interaction (e.g., ['1', '2']).
expected_digits: Vec<char>,
/// Ticks remaining before timeout (decremented each tick after prompt ends).
@@ -109,6 +119,7 @@ struct ToolLegSlot {
#[allow(dead_code)]
tool_type: ToolType,
audio_tx: mpsc::Sender<ToolAudioBatch>,
dropped_batches: u64,
}
// ---------------------------------------------------------------------------
@@ -136,6 +147,10 @@ pub enum MixerCommand {
leg_id: String,
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
prompt_pcm_frames: Vec<Vec<f32>>,
/// Optional live prompt stream. Frames are appended as they are synthesized.
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
/// Optional cancellation handle for the live prompt stream.
prompt_cancel_tx: Option<watch::Sender<bool>>,
expected_digits: Vec<char>,
timeout_ms: u32,
result_tx: oneshot::Sender<InteractionResult>,
@@ -163,8 +178,15 @@ struct MixerLegSlot {
denoiser: Box<DenoiseState<'static>>,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Decoded PCM waiting for playout. Variable-duration RTP packets are
/// decoded into this FIFO; the mixer consumes exactly one 20ms frame per tick.
pcm_buffer: VecDeque<f32>,
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
last_pcm_frame: Vec<f32>,
/// Next RTP timestamp expected from the inbound stream.
expected_rtp_timestamp: Option<u32>,
/// Best-effort estimate of packet duration in RTP clock units.
estimated_packet_ts: u32,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
/// Per-leg jitter buffer for packet reordering and timing.
@@ -173,15 +195,302 @@ struct MixerLegSlot {
rtp_seq: u16,
rtp_ts: u32,
rtp_ssrc: u32,
/// Dropped outbound frames for this leg (queue full / closed).
outbound_drops: u64,
/// Current role of this leg in the mixer.
role: LegRole,
}
fn mix_samples_to_rtp_ts(codec_pt: u8, mix_samples: usize) -> u32 {
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
(((mix_samples as u64 * clock_rate) + (MIX_RATE as u64 / 2)) / MIX_RATE as u64) as u32
}
fn rtp_ts_to_mix_samples(codec_pt: u8, rtp_ts: u32) -> usize {
let clock_rate = rtp_clock_rate(codec_pt).max(1) as u64;
(((rtp_ts as u64 * MIX_RATE as u64) + (clock_rate / 2)) / clock_rate) as usize
}
fn is_forward_rtp_delta(delta: u32) -> bool {
delta > 0 && delta < 0x8000_0000
}
fn should_emit_drop_event(total_drops: u64) -> bool {
total_drops == 1 || total_drops % DROP_REPORT_INTERVAL == 0
}
fn emit_output_drop_event(
out_tx: &OutTx,
call_id: &str,
leg_id: Option<&str>,
tool_leg_id: Option<&str>,
stream: &str,
reason: &str,
total_drops: u64,
) {
if !should_emit_drop_event(total_drops) {
return;
}
emit_event(
out_tx,
"mixer_output_drop",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"tool_leg_id": tool_leg_id,
"stream": stream,
"reason": reason,
"total_drops": total_drops,
}),
);
}
fn fade_concealment_from_last_frame(slot: &mut MixerLegSlot, samples: usize, decay: f32) {
let mut template = if slot.last_pcm_frame.is_empty() {
vec![0.0f32; MIX_FRAME_SIZE]
} else {
slot.last_pcm_frame.clone()
};
let mut remaining = samples;
while remaining > 0 {
for sample in &mut template {
*sample *= decay;
}
let take = remaining.min(template.len());
slot.pcm_buffer.extend(template.iter().take(take).copied());
remaining -= take;
}
}
fn append_packet_loss_concealment(slot: &mut MixerLegSlot, samples: usize) {
let mut remaining = samples.max(1);
while remaining > 0 {
let chunk = remaining.min(MIX_FRAME_SIZE);
if slot.codec_pt == codec_lib::PT_OPUS {
match slot.transcoder.opus_plc(chunk) {
Ok(mut pcm) => {
pcm.resize(chunk, 0.0);
slot.pcm_buffer.extend(pcm);
}
Err(_) => fade_concealment_from_last_frame(slot, chunk, 0.8),
}
} else {
fade_concealment_from_last_frame(slot, chunk, 0.85);
}
remaining -= chunk;
}
}
fn decode_packet_to_mix_pcm(slot: &mut MixerLegSlot, pkt: &RtpPacket) -> Option<Vec<f32>> {
let (pcm, rate) = slot
.transcoder
.decode_to_f32(&pkt.payload, pkt.payload_type)
.ok()?;
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
Some(processed)
}
fn queue_inbound_packet(slot: &mut MixerLegSlot, pkt: RtpPacket) {
if let Some(pcm_48k) = decode_packet_to_mix_pcm(slot, &pkt) {
if pcm_48k.is_empty() {
return;
}
if let Some(expected_ts) = slot.expected_rtp_timestamp {
let gap_ts = pkt.timestamp.wrapping_sub(expected_ts);
if is_forward_rtp_delta(gap_ts) {
let gap_samples = rtp_ts_to_mix_samples(slot.codec_pt, gap_ts);
if gap_samples <= MAX_GAP_FILL_SAMPLES {
append_packet_loss_concealment(slot, gap_samples);
} else {
slot.pcm_buffer.clear();
}
}
}
let packet_ts = mix_samples_to_rtp_ts(slot.codec_pt, pcm_48k.len());
if packet_ts > 0 {
slot.estimated_packet_ts = packet_ts;
slot.expected_rtp_timestamp = Some(pkt.timestamp.wrapping_add(packet_ts));
}
slot.pcm_buffer.extend(pcm_48k);
}
}
fn fill_leg_playout_buffer(slot: &mut MixerLegSlot) {
let mut steps = 0usize;
while slot.pcm_buffer.len() < MIX_FRAME_SIZE && steps < MAX_PACKET_STEPS_PER_TICK {
steps += 1;
match slot.jitter.consume() {
JitterResult::Packet(pkt) => queue_inbound_packet(slot, pkt),
JitterResult::Missing => {
let conceal_ts = slot
.estimated_packet_ts
.max(rtp_clock_increment(slot.codec_pt));
let conceal_samples =
rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts).clamp(1, MAX_GAP_FILL_SAMPLES);
append_packet_loss_concealment(slot, conceal_samples);
if let Some(expected_ts) = slot.expected_rtp_timestamp {
slot.expected_rtp_timestamp = Some(expected_ts.wrapping_add(conceal_ts));
}
}
JitterResult::Filling => break,
}
}
}
fn take_mix_frame(slot: &mut MixerLegSlot) -> Vec<f32> {
let mut frame = Vec::with_capacity(MIX_FRAME_SIZE);
while frame.len() < MIX_FRAME_SIZE {
if let Some(sample) = slot.pcm_buffer.pop_front() {
frame.push(sample);
} else {
frame.push(0.0);
}
}
frame
}
fn soft_limit_sample(sample: f32) -> f32 {
const KNEE: f32 = 0.85;
let abs = sample.abs();
if abs <= KNEE {
sample
} else {
let excess = abs - KNEE;
let compressed = KNEE + (excess / (1.0 + (excess / (1.0 - KNEE))));
sample.signum() * compressed.min(1.0)
}
}
fn try_send_leg_output(
out_tx: &OutTx,
call_id: &str,
leg_id: &str,
slot: &mut MixerLegSlot,
rtp: Vec<u8>,
stream: &str,
) {
let reason = match slot.outbound_tx.try_send(rtp) {
Ok(()) => return,
Err(mpsc::error::TrySendError::Full(_)) => "full",
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
};
slot.outbound_drops += 1;
emit_output_drop_event(
out_tx,
call_id,
Some(leg_id),
None,
stream,
reason,
slot.outbound_drops,
);
}
fn try_send_tool_output(
out_tx: &OutTx,
call_id: &str,
tool_leg_id: &str,
tool: &mut ToolLegSlot,
batch: ToolAudioBatch,
) {
let reason = match tool.audio_tx.try_send(batch) {
Ok(()) => return,
Err(mpsc::error::TrySendError::Full(_)) => "full",
Err(mpsc::error::TrySendError::Closed(_)) => "closed",
};
tool.dropped_batches += 1;
emit_output_drop_event(
out_tx,
call_id,
None,
Some(tool_leg_id),
"tool-batch",
reason,
tool.dropped_batches,
);
}
fn cancel_prompt_producer(state: &mut IsolationState) {
if let Some(cancel_tx) = state.prompt_cancel_tx.take() {
let _ = cancel_tx.send(true);
}
}
fn cancel_isolated_interaction(state: &mut IsolationState) {
cancel_prompt_producer(state);
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
fn drain_prompt_stream(
out_tx: &OutTx,
call_id: &str,
leg_id: &str,
state: &mut IsolationState,
) {
loop {
let Some(mut stream_rx) = state.prompt_stream_rx.take() else {
return;
};
match stream_rx.try_recv() {
Ok(TtsStreamMessage::Frames(frames)) => {
state.prompt_frames.extend(frames);
state.prompt_stream_rx = Some(stream_rx);
}
Ok(TtsStreamMessage::Finished) => {
state.prompt_stream_finished = true;
return;
}
Ok(TtsStreamMessage::Failed(error)) => {
emit_event(
out_tx,
"mixer_error",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"error": format!("tts stream failed: {error}"),
}),
);
state.prompt_stream_finished = true;
return;
}
Err(mpsc::error::TryRecvError::Empty) => {
state.prompt_stream_rx = Some(stream_rx);
return;
}
Err(mpsc::error::TryRecvError::Disconnected) => {
state.prompt_stream_finished = true;
return;
}
}
}
}
/// Spawn the mixer task for a call. Returns the command sender and task handle.
pub fn spawn_mixer(
call_id: String,
out_tx: OutTx,
) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
pub fn spawn_mixer(call_id: String, out_tx: OutTx) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
let handle = tokio::spawn(async move {
@@ -192,11 +501,7 @@ pub fn spawn_mixer(
}
/// The 20ms mixing loop.
async fn mixer_loop(
call_id: String,
mut cmd_rx: mpsc::Receiver<MixerCommand>,
out_tx: OutTx,
) {
async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, out_tx: OutTx) {
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
let mut tool_legs: HashMap<String, ToolLegSlot> = HashMap::new();
let mut interval = time::interval(Duration::from_millis(20));
@@ -237,11 +542,15 @@ async fn mixer_loop(
denoiser: new_denoiser(),
inbound_rx,
outbound_tx,
pcm_buffer: VecDeque::new(),
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
expected_rtp_timestamp: None,
estimated_packet_ts: rtp_clock_increment(codec_pt),
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
rtp_ssrc: rand::random(),
outbound_drops: 0,
role: LegRole::Participant,
jitter: JitterBuffer::new(),
},
@@ -251,9 +560,7 @@ async fn mixer_loop(
// If the leg is isolated, send Cancelled before dropping.
if let Some(slot) = legs.get_mut(&leg_id) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
cancel_isolated_interaction(state);
}
}
legs.remove(&leg_id);
@@ -263,9 +570,7 @@ async fn mixer_loop(
// Cancel all outstanding interactions before shutting down.
for slot in legs.values_mut() {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
cancel_isolated_interaction(state);
}
}
return;
@@ -273,6 +578,8 @@ async fn mixer_loop(
Ok(MixerCommand::StartInteraction {
leg_id,
prompt_pcm_frames,
prompt_stream_rx,
prompt_cancel_tx,
expected_digits,
timeout_ms,
result_tx,
@@ -280,13 +587,14 @@ async fn mixer_loop(
if let Some(slot) = legs.get_mut(&leg_id) {
// Cancel any existing interaction first.
if let LegRole::Isolated(ref mut old_state) = slot.role {
if let Some(tx) = old_state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
cancel_isolated_interaction(old_state);
}
let timeout_ticks = timeout_ms / 20;
slot.role = LegRole::Isolated(IsolationState {
prompt_frames: VecDeque::from(prompt_pcm_frames),
prompt_stream_rx,
prompt_cancel_tx,
prompt_stream_finished: false,
expected_digits,
timeout_ticks_remaining: timeout_ticks,
prompt_done: false,
@@ -294,6 +602,9 @@ async fn mixer_loop(
});
} else {
// Leg not found — immediately cancel.
if let Some(cancel_tx) = prompt_cancel_tx {
let _ = cancel_tx.send(true);
}
let _ = result_tx.send(InteractionResult::Cancelled);
}
}
@@ -302,7 +613,14 @@ async fn mixer_loop(
tool_type,
audio_tx,
}) => {
tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx });
tool_legs.insert(
leg_id,
ToolLegSlot {
tool_type,
audio_tx,
dropped_batches: 0,
},
);
}
Ok(MixerCommand::RemoveToolLeg { leg_id }) => {
tool_legs.remove(&leg_id);
@@ -343,54 +661,11 @@ async fn mixer_loop(
}
}
// Step 2b: Consume exactly one frame from the jitter buffer.
match slot.jitter.consume() {
JitterResult::Packet(pkt) => {
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
let mut frame = processed;
frame.resize(MIX_FRAME_SIZE, 0.0);
slot.last_pcm_frame = frame;
}
Err(_) => {}
}
}
JitterResult::Missing => {
// Invoke Opus PLC or fade for non-Opus codecs.
if slot.codec_pt == codec_lib::PT_OPUS {
match slot.transcoder.opus_plc(MIX_FRAME_SIZE) {
Ok(pcm) => {
slot.last_pcm_frame = pcm;
}
Err(_) => {
for s in slot.last_pcm_frame.iter_mut() {
*s *= 0.8;
}
}
}
} else {
// Non-Opus: fade last frame toward silence.
for s in slot.last_pcm_frame.iter_mut() {
*s *= 0.85;
}
}
}
JitterResult::Filling => {
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
}
}
// Step 2b: Decode enough RTP to cover one 20ms playout frame.
// Variable-duration packets (10ms, 20ms, 60ms, ...) accumulate in
// the per-leg PCM FIFO; we pop exactly one 20ms frame below.
fill_leg_playout_buffer(slot);
slot.last_pcm_frame = take_mix_frame(slot);
// Run jitter adaptation + prune stale packets.
slot.jitter.adapt();
@@ -404,6 +679,9 @@ async fn mixer_loop(
}
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
slot.pcm_buffer.clear();
slot.expected_rtp_timestamp = None;
slot.estimated_packet_ts = rtp_clock_increment(slot.codec_pt);
}
}
@@ -426,12 +704,12 @@ async fn mixer_loop(
for (lid, slot) in legs.iter_mut() {
match &mut slot.role {
LegRole::Participant => {
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
// Mix-minus: total minus this leg's own contribution.
// Apply a light soft limiter instead of hard clipping the sum.
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(sample.clamp(-1.0, 1.0));
let sample = (total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(soft_limit_sample(sample));
}
// Resample from 48kHz to the leg's codec native rate.
@@ -445,11 +723,10 @@ async fn mixer_loop(
};
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
let encoded =
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
let encoded = match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
// Build RTP packet with header.
let header =
@@ -460,10 +737,11 @@ async fn mixer_loop(
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
// Non-blocking send — drop frame if channel is full.
let _ = slot.outbound_tx.try_send(rtp);
try_send_leg_output(&out_tx, &call_id, lid, slot, rtp, "participant-audio");
}
LegRole::Isolated(state) => {
drain_prompt_stream(&out_tx, &call_id, lid, state);
// Check for DTMF digit from this leg.
let mut matched_digit: Option<char> = None;
for (src_lid, dtmf_pkt) in &dtmf_forward {
@@ -487,12 +765,14 @@ async fn mixer_loop(
if let Some(digit) = matched_digit {
// Interaction complete — digit matched.
completed_interactions
.push((lid.clone(), InteractionResult::Digit(digit)));
completed_interactions.push((lid.clone(), InteractionResult::Digit(digit)));
} else {
// Play prompt frame or silence.
// Play prompt frame, wait for live TTS, or move to timeout once the
// prompt stream has fully drained.
let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() {
frame
} else if !state.prompt_stream_finished {
vec![0.0f32; MIX_FRAME_SIZE]
} else {
state.prompt_done = true;
vec![0.0f32; MIX_FRAME_SIZE]
@@ -508,6 +788,7 @@ async fn mixer_loop(
.unwrap_or_default()
};
let mut prompt_rtp: Option<Vec<u8>> = None;
if let Ok(encoded) =
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
{
@@ -521,10 +802,9 @@ async fn mixer_loop(
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot
.rtp_ts
.wrapping_add(rtp_clock_increment(slot.codec_pt));
let _ = slot.outbound_tx.try_send(rtp);
slot.rtp_ts =
slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
prompt_rtp = Some(rtp);
}
}
@@ -537,6 +817,17 @@ async fn mixer_loop(
state.timeout_ticks_remaining -= 1;
}
}
if let Some(rtp) = prompt_rtp {
try_send_leg_output(
&out_tx,
&call_id,
lid,
slot,
rtp,
"isolated-prompt",
);
}
}
}
}
@@ -546,6 +837,7 @@ async fn mixer_loop(
for (lid, result) in completed_interactions {
if let Some(slot) = legs.get_mut(&lid) {
if let LegRole::Isolated(ref mut state) = slot.role {
cancel_prompt_producer(state);
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(result);
}
@@ -566,7 +858,7 @@ async fn mixer_loop(
})
.collect();
for tool in tool_legs.values() {
for (tool_leg_id, tool) in tool_legs.iter_mut() {
let batch = ToolAudioBatch {
sources: sources
.iter()
@@ -576,8 +868,7 @@ async fn mixer_loop(
})
.collect(),
};
// Non-blocking send — drop batch if tool can't keep up.
let _ = tool.audio_tx.try_send(batch);
try_send_tool_output(&out_tx, &call_id, tool_leg_id, tool, batch);
}
}
@@ -610,7 +901,7 @@ async fn mixer_loop(
rtp_out.extend_from_slice(&dtmf_pkt.payload);
target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1);
// Don't increment rtp_ts for DTMF — it shares timestamp context with audio.
let _ = target_slot.outbound_tx.try_send(rtp_out);
try_send_leg_output(&out_tx, &call_id, target_lid, target_slot, rtp_out, "dtmf");
}
}
}

View File

@@ -267,11 +267,7 @@ impl ProviderManager {
/// Try to handle a SIP response as a provider registration response.
/// Returns true if consumed.
pub async fn handle_response(
&self,
msg: &SipMessage,
socket: &UdpSocket,
) -> bool {
pub async fn handle_response(&self, msg: &SipMessage, socket: &UdpSocket) -> bool {
for ps_arc in &self.providers {
let mut ps = ps_arc.lock().await;
let was_registered = ps.is_registered;
@@ -322,7 +318,10 @@ impl ProviderManager {
}
/// Find a provider by its config ID (e.g. "easybell").
pub async fn find_by_provider_id(&self, provider_id: &str) -> Option<Arc<Mutex<ProviderState>>> {
pub async fn find_by_provider_id(
&self,
provider_id: &str,
) -> Option<Arc<Mutex<ProviderState>>> {
for ps_arc in &self.providers {
let ps = ps_arc.lock().await;
if ps.config.id == provider_id {

View File

@@ -25,8 +25,7 @@ impl Recorder {
) -> Result<Self, String> {
// Ensure parent directory exists.
if let Some(parent) = Path::new(file_path).parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create dir: {e}"))?;
std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?;
}
let sample_rate = 8000u32; // Record at 8kHz (standard telephony)
@@ -57,10 +56,13 @@ impl Recorder {
/// Create a recorder that writes raw PCM at a given sample rate.
/// Used by tool legs that already have decoded PCM (no RTP processing needed).
pub fn new_pcm(file_path: &str, sample_rate: u32, max_duration_ms: Option<u64>) -> Result<Self, String> {
pub fn new_pcm(
file_path: &str,
sample_rate: u32,
max_duration_ms: Option<u64>,
) -> Result<Self, String> {
if let Some(parent) = Path::new(file_path).parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create dir: {e}"))?;
std::fs::create_dir_all(parent).map_err(|e| format!("create dir: {e}"))?;
}
let spec = hound::WavSpec {

View File

@@ -60,18 +60,17 @@ impl Registrar {
/// Try to handle a SIP REGISTER from a device.
/// Returns Some(response_bytes) if handled, None if not a known device.
pub fn handle_register(
&mut self,
msg: &SipMessage,
from_addr: SocketAddr,
) -> Option<Vec<u8>> {
pub fn handle_register(&mut self, msg: &SipMessage, from_addr: SocketAddr) -> Option<Vec<u8>> {
if msg.method() != Some("REGISTER") {
return None;
}
// Find the device by matching the source IP against expectedAddress.
let from_ip = from_addr.ip().to_string();
let device = self.devices.iter().find(|d| d.expected_address == from_ip)?;
let device = self
.devices
.iter()
.find(|d| d.expected_address == from_ip)?;
let from_header = msg.get_header("From").unwrap_or("");
let aor = SipMessage::extract_uri(from_header)
@@ -79,9 +78,7 @@ impl Registrar {
.unwrap_or_else(|| format!("sip:{}@{}", device.extension, from_ip));
let expires_header = msg.get_header("Expires");
let requested: u32 = expires_header
.and_then(|s| s.parse().ok())
.unwrap_or(3600);
let requested: u32 = expires_header.and_then(|s| s.parse().ok()).unwrap_or(3600);
let expires = requested.min(MAX_EXPIRES);
let entry = RegisteredDevice {
@@ -122,10 +119,7 @@ impl Registrar {
Some(ResponseOptions {
to_tag: Some(generate_tag()),
contact: Some(contact),
extra_headers: Some(vec![(
"Expires".to_string(),
expires.to_string(),
)]),
extra_headers: Some(vec![("Expires".to_string(), expires.to_string())]),
..Default::default()
}),
);
@@ -145,8 +139,8 @@ impl Registrar {
/// Find a registered device by its source IP address.
pub fn find_by_address(&self, addr: &SocketAddr) -> Option<&RegisteredDevice> {
let ip = addr.ip().to_string();
self.registered.values().find(|e| {
e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at
})
self.registered
.values()
.find(|e| e.contact_addr.ip().to_string() == ip && Instant::now() <= e.expires_at)
}
}

View File

@@ -82,10 +82,15 @@ pub fn build_rtp_header(pt: u8, seq: u16, timestamp: u32, ssrc: u32) -> [u8; 12]
/// Get the RTP clock increment per 20ms frame for a payload type.
pub fn rtp_clock_increment(pt: u8) -> u32 {
rtp_clock_rate(pt) / 50
}
/// Get the RTP clock rate for a payload type.
pub fn rtp_clock_rate(pt: u8) -> u32 {
match pt {
9 => 160, // G.722: 8000 Hz clock rate (despite 16kHz audio) × 0.02s
0 | 8 => 160, // PCMU/PCMA: 8000 × 0.02
111 => 960, // Opus: 48000 × 0.02
_ => 160,
9 => 8000, // G.722 uses an 8kHz RTP clock despite 16kHz audio.
0 | 8 => 8000, // PCMU/PCMA
111 => 48000, // Opus
_ => 8000,
}
}

View File

@@ -128,17 +128,24 @@ impl SipLeg {
max_forwards: Some(70),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: Some(vec![
("User-Agent".to_string(), "SipRouter/1.0".to_string()),
]),
extra_headers: Some(vec![(
"User-Agent".to_string(),
"SipRouter/1.0".to_string(),
)]),
},
);
self.dialog = Some(SipDialog::from_uac_invite(&invite, ip, self.config.lan_port));
self.dialog = Some(SipDialog::from_uac_invite(
&invite,
ip,
self.config.lan_port,
));
self.invite = Some(invite.clone());
self.state = LegState::Inviting;
let _ = socket.send_to(&invite.serialize(), self.config.sip_target).await;
let _ = socket
.send_to(&invite.serialize(), self.config.sip_target)
.await;
}
/// Handle an incoming SIP message routed to this leg.
@@ -443,10 +450,7 @@ pub enum SipLegAction {
/// Build an ACK for a non-2xx response (same transaction as the INVITE).
fn build_non_2xx_ack(original_invite: &SipMessage, response: &SipMessage) -> SipMessage {
let via = original_invite.get_header("Via").unwrap_or("").to_string();
let from = original_invite
.get_header("From")
.unwrap_or("")
.to_string();
let from = original_invite.get_header("From").unwrap_or("").to_string();
let to = response.get_header("To").unwrap_or("").to_string();
let call_id = original_invite.call_id().to_string();
let cseq_num: u32 = original_invite

View File

@@ -28,10 +28,8 @@ impl SipTransport {
}
/// Spawn the UDP receive loop. Calls the handler for every received packet.
pub fn spawn_receiver<F>(
&self,
handler: F,
) where
pub fn spawn_receiver<F>(&self, handler: F)
where
F: Fn(&[u8], SocketAddr) + Send + 'static,
{
let socket = self.socket.clone();

View File

@@ -51,7 +51,8 @@ pub fn spawn_recording_tool(
});
// Convert f32 [-1.0, 1.0] to i16 for WAV writing.
let pcm_i16: Vec<i16> = source.pcm_48k
let pcm_i16: Vec<i16> = source
.pcm_48k
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();

View File

@@ -9,12 +9,41 @@
//! Callers never need to check for cached files — that is entirely this module's
//! responsibility.
use crate::audio_player::pcm_to_mix_frames;
use kokoro_tts::{KokoroTts, Voice};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
pub const DEFAULT_MODEL_PATH: &str = ".nogit/tts/kokoro-v1.0.onnx";
pub const DEFAULT_VOICES_PATH: &str = ".nogit/tts/voices.bin";
const TTS_OUTPUT_RATE: u32 = 24000;
const MAX_CHUNK_CHARS: usize = 220;
const MIN_CHUNK_CHARS: usize = 80;
pub enum TtsStreamMessage {
Frames(Vec<Vec<f32>>),
Finished,
Failed(String),
}
pub struct TtsLivePrompt {
pub initial_frames: Vec<Vec<f32>>,
pub stream_rx: mpsc::Receiver<TtsStreamMessage>,
pub cancel_tx: watch::Sender<bool>,
}
#[derive(Clone)]
pub struct TtsPromptRequest {
pub model_path: String,
pub voices_path: String,
pub voice_name: String,
pub text: String,
}
/// Wraps the Kokoro TTS engine with lazy model loading.
pub struct TtsEngine {
tts: Option<KokoroTts>,
tts: Option<Arc<KokoroTts>>,
/// Path that was used to load the current model (for cache invalidation).
loaded_model_path: String,
loaded_voices_path: String,
@@ -29,6 +58,69 @@ impl TtsEngine {
}
}
async fn ensure_loaded(
&mut self,
model_path: &str,
voices_path: &str,
) -> Result<Arc<KokoroTts>, String> {
if !Path::new(model_path).exists() {
return Err(format!("model not found: {model_path}"));
}
if !Path::new(voices_path).exists() {
return Err(format!("voices not found: {voices_path}"));
}
if self.tts.is_none()
|| self.loaded_model_path != model_path
|| self.loaded_voices_path != voices_path
{
eprintln!("[tts] loading model: {model_path}");
let tts = Arc::new(
KokoroTts::new(model_path, voices_path)
.await
.map_err(|e| format!("model load failed: {e:?}"))?,
);
self.tts = Some(tts);
self.loaded_model_path = model_path.to_string();
self.loaded_voices_path = voices_path.to_string();
}
Ok(self.tts.as_ref().unwrap().clone())
}
pub async fn start_live_prompt(
&mut self,
request: TtsPromptRequest,
) -> Result<TtsLivePrompt, String> {
if request.text.trim().is_empty() {
return Err("empty text".into());
}
let tts = self
.ensure_loaded(&request.model_path, &request.voices_path)
.await?;
let voice = select_voice(&request.voice_name);
let chunks = chunk_text(&request.text);
if chunks.is_empty() {
return Err("empty text".into());
}
let initial_frames = synth_text_to_mix_frames(&tts, chunks[0].as_str(), voice).await?;
let remaining_chunks: Vec<String> = chunks.into_iter().skip(1).collect();
let (stream_tx, stream_rx) = mpsc::channel(8);
let (cancel_tx, cancel_rx) = watch::channel(false);
tokio::spawn(async move {
stream_live_prompt_chunks(tts, voice, remaining_chunks, stream_tx, cancel_rx).await;
});
Ok(TtsLivePrompt {
initial_frames,
stream_rx,
cancel_tx,
})
}
/// Generate a WAV file from text.
///
/// Params (from IPC JSON):
@@ -39,18 +131,33 @@ impl TtsEngine {
/// - `output`: output WAV file path
/// - `cacheable`: if true, skip synthesis when the output WAV already
/// matches the same text+voice (checked via a `.meta` sidecar file)
pub async fn generate(&mut self, params: &serde_json::Value) -> Result<serde_json::Value, String> {
let model_path = params.get("model").and_then(|v| v.as_str())
pub async fn generate(
&mut self,
params: &serde_json::Value,
) -> Result<serde_json::Value, String> {
let model_path = params
.get("model")
.and_then(|v| v.as_str())
.ok_or("missing 'model' param")?;
let voices_path = params.get("voices").and_then(|v| v.as_str())
let voices_path = params
.get("voices")
.and_then(|v| v.as_str())
.ok_or("missing 'voices' param")?;
let voice_name = params.get("voice").and_then(|v| v.as_str())
let voice_name = params
.get("voice")
.and_then(|v| v.as_str())
.unwrap_or("af_bella");
let text = params.get("text").and_then(|v| v.as_str())
let text = params
.get("text")
.and_then(|v| v.as_str())
.ok_or("missing 'text' param")?;
let output_path = params.get("output").and_then(|v| v.as_str())
let output_path = params
.get("output")
.and_then(|v| v.as_str())
.ok_or("missing 'output' param")?;
let cacheable = params.get("cacheable").and_then(|v| v.as_bool())
let cacheable = params
.get("cacheable")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if text.is_empty() {
@@ -63,41 +170,23 @@ impl TtsEngine {
return Ok(serde_json::json!({ "output": output_path }));
}
// Check that model/voices files exist.
if !Path::new(model_path).exists() {
return Err(format!("model not found: {model_path}"));
}
if !Path::new(voices_path).exists() {
return Err(format!("voices not found: {voices_path}"));
}
// Ensure parent directory exists.
if let Some(parent) = Path::new(output_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
// Lazy-load or reload if paths changed.
if self.tts.is_none()
|| self.loaded_model_path != model_path
|| self.loaded_voices_path != voices_path
{
eprintln!("[tts] loading model: {model_path}");
let tts = KokoroTts::new(model_path, voices_path)
.await
.map_err(|e| format!("model load failed: {e:?}"))?;
self.tts = Some(tts);
self.loaded_model_path = model_path.to_string();
self.loaded_voices_path = voices_path.to_string();
}
let tts = self.tts.as_ref().unwrap();
let tts = self.ensure_loaded(model_path, voices_path).await?;
let voice = select_voice(voice_name);
eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\"");
let (samples, duration) = tts.synth(text, voice)
eprintln!("[tts] synthesizing WAV voice '{voice_name}' to {output_path}");
let (samples, duration) = tts
.synth(text, voice)
.await
.map_err(|e| format!("synthesis failed: {e:?}"))?;
eprintln!("[tts] synthesized {} samples in {duration:?}", samples.len());
eprintln!(
"[tts] synthesized {} samples in {duration:?}",
samples.len()
);
// Write 24kHz 16-bit mono WAV.
let spec = hound::WavSpec {
@@ -111,9 +200,13 @@ impl TtsEngine {
.map_err(|e| format!("WAV create failed: {e}"))?;
for &sample in &samples {
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
writer.write_sample(s16).map_err(|e| format!("WAV write: {e}"))?;
writer
.write_sample(s16)
.map_err(|e| format!("WAV write: {e}"))?;
}
writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?;
writer
.finalize()
.map_err(|e| format!("WAV finalize: {e}"))?;
// Write sidecar for future cache checks.
if cacheable {
@@ -152,6 +245,106 @@ impl TtsEngine {
}
}
async fn synth_text_to_mix_frames(
tts: &Arc<KokoroTts>,
text: &str,
voice: Voice,
) -> Result<Vec<Vec<f32>>, String> {
let (samples, duration) = tts
.synth(text, voice)
.await
.map_err(|e| format!("synthesis failed: {e:?}"))?;
eprintln!(
"[tts] synthesized chunk ({} chars, {} samples) in {duration:?}",
text.chars().count(),
samples.len()
);
pcm_to_mix_frames(&samples, TTS_OUTPUT_RATE)
}
async fn stream_live_prompt_chunks(
tts: Arc<KokoroTts>,
voice: Voice,
chunks: Vec<String>,
stream_tx: mpsc::Sender<TtsStreamMessage>,
mut cancel_rx: watch::Receiver<bool>,
) {
for chunk in chunks {
if *cancel_rx.borrow() {
break;
}
match synth_text_to_mix_frames(&tts, &chunk, voice).await {
Ok(frames) => {
if *cancel_rx.borrow() {
break;
}
if stream_tx.send(TtsStreamMessage::Frames(frames)).await.is_err() {
return;
}
}
Err(error) => {
let _ = stream_tx.send(TtsStreamMessage::Failed(error)).await;
return;
}
}
if cancel_rx.has_changed().unwrap_or(false) && *cancel_rx.borrow_and_update() {
break;
}
}
let _ = stream_tx.send(TtsStreamMessage::Finished).await;
}
fn chunk_text(text: &str) -> Vec<String> {
let mut chunks = Vec::new();
let mut current = String::new();
for ch in text.chars() {
current.push(ch);
let len = current.chars().count();
let hard_split = len >= MAX_CHUNK_CHARS && (ch.is_whitespace() || is_soft_boundary(ch));
let natural_split = len >= MIN_CHUNK_CHARS && is_sentence_boundary(ch);
if natural_split || hard_split {
push_chunk(&mut chunks, &mut current);
}
}
push_chunk(&mut chunks, &mut current);
if chunks.len() >= 2 {
let last_len = chunks.last().unwrap().chars().count();
if last_len < (MIN_CHUNK_CHARS / 2) {
let tail = chunks.pop().unwrap();
if let Some(prev) = chunks.last_mut() {
prev.push(' ');
prev.push_str(tail.trim());
}
}
}
chunks
}
fn push_chunk(chunks: &mut Vec<String>, current: &mut String) {
let trimmed = current.trim();
if !trimmed.is_empty() {
chunks.push(trimmed.to_string());
}
current.clear();
}
fn is_sentence_boundary(ch: char) -> bool {
matches!(ch, '.' | '!' | '?' | '\n' | ';' | ':')
}
fn is_soft_boundary(ch: char) -> bool {
matches!(ch, ',' | ';' | ':' | ')' | ']' | '\n')
}
/// Map voice name string to Kokoro Voice enum variant.
fn select_voice(name: &str) -> Voice {
match name {

View File

@@ -128,8 +128,8 @@ async fn record_from_socket(
break; // Max duration reached.
}
}
Ok(Err(_)) => break, // Socket error (closed).
Err(_) => break, // Timeout (max duration + grace).
Ok(Err(_)) => break, // Socket error (closed).
Err(_) => break, // Timeout (max duration + grace).
}
}

View File

@@ -58,9 +58,7 @@ impl WebRtcEngine {
.register_default_codecs()
.map_err(|e| format!("register codecs: {e}"))?;
let api = APIBuilder::new()
.with_media_engine(media_engine)
.build();
let api = APIBuilder::new().with_media_engine(media_engine).build();
let config = RTCConfiguration {
ice_servers: vec![],
@@ -91,8 +89,7 @@ impl WebRtcEngine {
.map_err(|e| format!("add track: {e}"))?;
// Shared mixer channel sender (populated when linked to a call).
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> =
Arc::new(Mutex::new(None));
let mixer_tx: Arc<Mutex<Option<mpsc::Sender<RtpPacket>>>> = Arc::new(Mutex::new(None));
// ICE candidate handler.
let out_tx_ice = self.out_tx.clone();
@@ -256,7 +253,11 @@ impl WebRtcEngine {
pub async fn close_session(&mut self, session_id: &str) -> Result<(), String> {
if let Some(session) = self.sessions.remove(session_id) {
session.pc.close().await.map_err(|e| format!("close: {e}"))?;
session
.pc
.close()
.await
.map_err(|e| format!("close: {e}"))?;
}
Ok(())
}

View File

@@ -51,9 +51,7 @@ impl SipDialog {
.map(|s| s.to_string())
.unwrap_or_else(generate_tag),
remote_tag: None,
local_uri: SipMessage::extract_uri(from)
.unwrap_or("")
.to_string(),
local_uri: SipMessage::extract_uri(from).unwrap_or("").to_string(),
remote_uri: SipMessage::extract_uri(to).unwrap_or("").to_string(),
local_cseq,
remote_cseq: 0,
@@ -181,10 +179,7 @@ impl SipDialog {
format!("<{}>{remote_tag_str}", self.remote_uri),
),
("Call-ID".to_string(), self.call_id.clone()),
(
"CSeq".to_string(),
format!("{} {method}", self.local_cseq),
),
("CSeq".to_string(), format!("{} {method}", self.local_cseq)),
("Max-Forwards".to_string(), "70".to_string()),
];
@@ -243,10 +238,7 @@ impl SipDialog {
format!("<{}>{remote_tag_str}", self.remote_uri),
),
("Call-ID".to_string(), self.call_id.clone()),
(
"CSeq".to_string(),
format!("{} ACK", self.local_cseq),
),
("CSeq".to_string(), format!("{} ACK", self.local_cseq)),
("Max-Forwards".to_string(), "70".to_string()),
];
@@ -271,10 +263,7 @@ impl SipDialog {
("From".to_string(), from),
("To".to_string(), to),
("Call-ID".to_string(), self.call_id.clone()),
(
"CSeq".to_string(),
format!("{} CANCEL", self.local_cseq),
),
("CSeq".to_string(), format!("{} CANCEL", self.local_cseq)),
("Max-Forwards".to_string(), "70".to_string()),
("Content-Length".to_string(), "0".to_string()),
];
@@ -284,11 +273,7 @@ impl SipDialog {
.unwrap_or(&self.remote_target)
.to_string();
SipMessage::new(
format!("CANCEL {ruri} SIP/2.0"),
headers,
String::new(),
)
SipMessage::new(format!("CANCEL {ruri} SIP/2.0"), headers, String::new())
}
/// Transition the dialog to terminated state.

View File

@@ -27,7 +27,9 @@ pub fn generate_branch() -> String {
fn random_hex(bytes: usize) -> String {
let mut rng = rand::thread_rng();
(0..bytes).map(|_| format!("{:02x}", rng.gen::<u8>())).collect()
(0..bytes)
.map(|_| format!("{:02x}", rng.gen::<u8>()))
.collect()
}
// ---- Codec registry --------------------------------------------------------
@@ -142,7 +144,9 @@ pub fn parse_digest_challenge(header: &str) -> Option<DigestChallenge> {
return Some(after[1..1 + end].to_string());
}
// Unquoted value.
let end = after.find(|c: char| c == ',' || c.is_whitespace()).unwrap_or(after.len());
let end = after
.find(|c: char| c == ',' || c.is_whitespace())
.unwrap_or(after.len());
return Some(after[..end].to_string());
}
None
@@ -241,11 +245,7 @@ pub struct MwiResult {
pub extra_headers: Vec<(String, String)>,
}
pub fn build_mwi_body(
new_messages: u32,
old_messages: u32,
account_uri: &str,
) -> MwiResult {
pub fn build_mwi_body(new_messages: u32, old_messages: u32, account_uri: &str) -> MwiResult {
let waiting = if new_messages > 0 { "yes" } else { "no" };
let body = format!(
"Messages-Waiting: {waiting}\r\n\

View File

@@ -4,9 +4,9 @@
//! SDP handling, Digest authentication, and URI rewriting.
//! Ported from the TypeScript `ts/sip/` library.
pub mod message;
pub mod dialog;
pub mod helpers;
pub mod message;
pub mod rewrite;
/// Network endpoint (address + port + optional negotiated codec).

View File

@@ -14,7 +14,11 @@ pub struct SipMessage {
impl SipMessage {
pub fn new(start_line: String, headers: Vec<(String, String)>, body: String) -> Self {
Self { start_line, headers, body }
Self {
start_line,
headers,
body,
}
}
// ---- Parsing -----------------------------------------------------------
@@ -175,7 +179,8 @@ impl SipMessage {
/// Inserts a header at the top of the header list.
pub fn prepend_header(&mut self, name: &str, value: &str) -> &mut Self {
self.headers.insert(0, (name.to_string(), value.to_string()));
self.headers
.insert(0, (name.to_string(), value.to_string()));
self
}
@@ -233,10 +238,7 @@ impl SipMessage {
.to_display_name
.map(|d| format!("\"{d}\" "))
.unwrap_or_default();
let to_tag_str = opts
.to_tag
.map(|t| format!(";tag={t}"))
.unwrap_or_default();
let to_tag_str = opts.to_tag.map(|t| format!(";tag={t}")).unwrap_or_default();
let mut headers = vec![
(
@@ -364,7 +366,43 @@ impl SipMessage {
.find(|c: char| c == ';' || c == '>')
.unwrap_or(trimmed.len());
let result = &trimmed[..end];
if result.is_empty() { None } else { Some(result) }
if result.is_empty() {
None
} else {
Some(result)
}
}
}
/// Extract the user part from a SIP/TEL URI or header value.
pub fn extract_uri_user(uri_or_header_value: &str) -> Option<&str> {
let raw = Self::extract_uri(uri_or_header_value).unwrap_or(uri_or_header_value);
let raw = raw.trim();
if raw.is_empty() {
return None;
}
let user_part = if raw
.get(..5)
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("sips:"))
{
&raw[5..]
} else if raw.get(..4).is_some_and(|prefix| {
prefix.eq_ignore_ascii_case("sip:") || prefix.eq_ignore_ascii_case("tel:")
}) {
&raw[4..]
} else {
raw
};
let end = user_part
.find(|c: char| matches!(c, '@' | ';' | '?' | '>'))
.unwrap_or(user_part.len());
let result = &user_part[..end];
if result.is_empty() {
None
} else {
Some(result)
}
}
}
@@ -506,6 +544,19 @@ mod tests {
SipMessage::extract_uri("\"Name\" <sip:user@host>;tag=abc"),
Some("sip:user@host")
);
assert_eq!(
SipMessage::extract_uri_user("\"Name\" <sip:+49 421 219694@host>;tag=abc"),
Some("+49 421 219694")
);
assert_eq!(
SipMessage::extract_uri_user("sip:0049421219694@voip.easybell.de"),
Some("0049421219694")
);
assert_eq!(
SipMessage::extract_uri_user("tel:+49421219694;phone-context=example.com"),
Some("+49421219694")
);
assert_eq!(SipMessage::extract_uri_user("SIP:user@host"), Some("user"));
}
#[test]
@@ -535,7 +586,10 @@ mod tests {
);
assert_eq!(invite.method(), Some("INVITE"));
assert_eq!(invite.call_id(), "test-123");
assert!(invite.get_header("Via").unwrap().contains("192.168.1.1:5070"));
assert!(invite
.get_header("Via")
.unwrap()
.contains("192.168.1.1:5070"));
let response = SipMessage::create_response(
200,

View File

@@ -92,7 +92,11 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option<Endpoint>
.collect();
let original = match (orig_addr, orig_port) {
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }),
(Some(a), Some(p)) => Some(Endpoint {
address: a,
port: p,
codec_pt: None,
}),
_ => None,
};

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.23.0',
version: '1.25.0',
description: 'undefined'
}

View File

@@ -48,6 +48,24 @@ export interface IDeviceConfig {
extension: string;
}
export type TIncomingNumberMode = 'single' | 'range' | 'regex';
export interface IIncomingNumberConfig {
id: string;
label: string;
providerId?: string;
mode: TIncomingNumberMode;
countryCode?: string;
areaCode?: string;
localNumber?: string;
rangeEnd?: string;
pattern?: string;
// Legacy persisted fields kept for migration compatibility.
number?: string;
rangeStart?: string;
}
// ---------------------------------------------------------------------------
// Match/Action routing model
// ---------------------------------------------------------------------------
@@ -62,8 +80,11 @@ export interface ISipRouteMatch {
direction: 'inbound' | 'outbound';
/**
* Match the dialed/called number (To/Request-URI for inbound DID, dialed digits for outbound).
* Supports: exact string, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/").
* Match the normalized called number.
*
* Inbound: matches the provider-delivered DID / Request-URI user part.
* Outbound: matches the normalized dialed digits.
* Supports: exact string, numeric range `start..end`, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/").
*/
numberPattern?: string;
@@ -89,13 +110,13 @@ export interface ISipRouteAction {
// --- Inbound actions (IVR / voicemail) ---
/** Route directly to a voicemail box (skip ringing devices). */
/** Voicemail fallback for matched inbound routes. */
voicemailBox?: string;
/** Route to an IVR menu by menu ID (skip ringing devices). */
ivrMenuId?: string;
/** Override no-answer timeout (seconds) before routing to voicemail. */
/** Reserved for future no-answer handling. */
noAnswerTimeout?: number;
// --- Outbound actions (provider selection) ---
@@ -231,6 +252,7 @@ export interface IAppConfig {
proxy: IProxyConfig;
providers: IProviderConfig[];
devices: IDeviceConfig[];
incomingNumbers?: IIncomingNumberConfig[];
routing: IRoutingConfig;
contacts: IContact[];
voiceboxes?: IVoiceboxConfig[];
@@ -285,6 +307,14 @@ export function loadConfig(): IAppConfig {
d.extension ??= '100';
}
cfg.incomingNumbers ??= [];
for (const incoming of cfg.incomingNumbers) {
if (!incoming.id) incoming.id = `incoming-${Date.now()}`;
incoming.label ??= incoming.id;
incoming.mode ??= incoming.pattern ? 'regex' : incoming.rangeStart || incoming.rangeEnd ? 'range' : 'single';
incoming.countryCode ??= incoming.mode === 'regex' ? undefined : '+49';
}
cfg.routing ??= { routes: [] };
cfg.routing.routes ??= [];

View File

@@ -266,6 +266,7 @@ async function handleRequest(
if (existing && ud.displayName !== undefined) existing.displayName = ud.displayName;
}
}
if (updates.incomingNumbers !== undefined) cfg.incomingNumbers = updates.incomingNumbers;
if (updates.routing) {
if (updates.routing.routes) {
cfg.routing.routes = updates.routing.routes;

View File

@@ -82,6 +82,19 @@ type TProxyCommands = {
};
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
};
start_tts_interaction: {
params: {
call_id: string;
leg_id: string;
text: string;
voice?: string;
model?: string;
voices?: string;
expected_digits: string;
timeout_ms: number;
};
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
};
add_tool_leg: {
params: {
call_id: string;
@@ -446,6 +459,40 @@ export async function startInteraction(
}
}
/**
* Start a live TTS interaction on a specific leg. The first chunk is rendered
* up front and the rest streams into the mixer while playback is already live.
*/
export async function startTtsInteraction(
callId: string,
legId: string,
text: string,
expectedDigits: string,
timeoutMs: number,
options?: {
voice?: string;
model?: string;
voices?: string;
},
): Promise<{ result: 'digit' | 'timeout' | 'cancelled'; digit?: string } | null> {
if (!bridge || !initialized) return null;
try {
return await sendProxyCommand('start_tts_interaction', {
call_id: callId,
leg_id: legId,
text,
expected_digits: expectedDigits,
timeout_ms: timeoutMs,
voice: options?.voice,
model: options?.model,
voices: options?.voices,
});
} catch (error: unknown) {
logFn?.(`[proxy-engine] start_tts_interaction error: ${errorMessage(error)}`);
return null;
}
}
/**
* Add a tool leg (recording or transcription) to a call.
* Tool legs receive per-source unmerged audio from all participants.

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.23.0',
version: '1.25.0',
description: 'undefined'
}

File diff suppressed because it is too large Load Diff