4 Commits

Author SHA1 Message Date
0d82a626b5 v1.25.1
Some checks failed
Docker (tags) / release (push) Failing after 3s
2026-04-14 18:58:48 +00:00
30d056f376 fix(proxy-engine): respect explicit inbound route targets and store voicemail in the configured mailbox 2026-04-14 18:58:48 +00:00
89ae12318e v1.25.0
Some checks failed
Docker (tags) / release (push) Failing after 3s
2026-04-14 18:52:13 +00:00
feb3514de4 feat(proxy-engine): add live TTS streaming interactions and incoming number range support 2026-04-14 18:52:13 +00:00
17 changed files with 1569 additions and 372 deletions

View File

@@ -1,5 +1,21 @@
# Changelog
## 2026-04-14 - 1.25.1 - fix(proxy-engine)
respect explicit inbound route targets and store voicemail in the configured mailbox
- Prevent inbound routes with an explicit empty target list from ringing arbitrary registered devices by distinguishing omitted targets from empty targets.
- Route unrouted or no-target inbound calls to voicemail with a generated unrouted greeting instead of falling back to random devices.
- Pass voicemail box identifiers through proxy events and runtime handling so recordings are saved and indexed under the correct mailbox instead of always using default.
## 2026-04-14 - 1.25.0 - feat(proxy-engine)
add live TTS streaming interactions and incoming number range support
- add a new start_tts_interaction command and bridge API to begin IVR or leg interactions before full TTS rendering completes
- stream synthesized TTS chunks into the mixer with cancellation handling so prompts can stop cleanly on digit match, leg removal, or shutdown
- extract PCM-to-mixer frame conversion for reusable live prompt processing
- extend routing pattern matching to support numeric number ranges like start..end, including + prefixed values
- add incomingNumbers config typing and frontend config update support for single, range, and regex number modes
## 2026-04-14 - 1.24.0 - feat(routing)
require explicit inbound DID routes and normalize SIP identities for provider-based number matching

View File

@@ -1,6 +1,6 @@
{
"name": "siprouter",
"version": "1.24.0",
"version": "1.25.1",
"private": true,
"type": "module",
"scripts": {

View File

@@ -208,14 +208,23 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
return Ok(vec![]);
}
pcm_to_mix_frames(&samples, wav_rate)
}
/// Convert PCM samples at an arbitrary rate into 48kHz 20ms mixer frames.
pub fn pcm_to_mix_frames(samples: &[f32], sample_rate: u32) -> Result<Vec<Vec<f32>>, String> {
if samples.is_empty() {
return Ok(vec![]);
}
// Resample to MIX_RATE (48kHz) if needed.
let resampled = if wav_rate != MIX_RATE {
let resampled = if sample_rate != MIX_RATE {
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
transcoder
.resample_f32(&samples, wav_rate, MIX_RATE)
.resample_f32(samples, sample_rate, MIX_RATE)
.map_err(|e| format!("resample: {e}"))?
} else {
samples
samples.to_vec()
};
// Split into MIX_FRAME_SIZE (960) sample frames.

View File

@@ -863,13 +863,48 @@ impl CallManager {
}
}
// Pick the first registered device from the matched targets, or fall
// back to any-registered-device if the route has no resolved targets.
let device_addr = route
.device_ids
.iter()
.find_map(|id| registrar.get_device_contact(id))
.or_else(|| self.resolve_first_device(config, registrar));
// Explicit no-target inbound routes do not fall back to random devices.
// They either go to the configured voicemail box or play the unrouted
// greeting via the default voicemail flow.
if !route.ring_all_devices && route.device_ids.is_empty() && !route.ring_browsers {
let greeting_wav = if route.voicemail_box.is_some() {
resolve_greeting_wav(config, route.voicemail_box.as_deref(), &tts_engine).await
} else {
resolve_unrouted_greeting_wav(&tts_engine).await
};
let call_id = self
.route_to_voicemail(
&call_id,
invite,
from_addr,
&caller_number,
provider_id,
provider_config,
config,
rtp_pool,
socket,
public_ip,
route.voicemail_box.clone(),
greeting_wav,
)
.await?;
return Some(InboundCallCreated {
call_id,
ring_browsers,
});
}
// Device targeting is explicit: omitted targets ring any registered
// device, an empty target list rings nobody, and a populated list rings
// only those device IDs.
let device_addr = if route.ring_all_devices {
self.resolve_first_device(config, registrar)
} else {
route
.device_ids
.iter()
.find_map(|id| registrar.get_device_contact(id))
};
let device_addr = match device_addr {
Some(addr) => addr,
@@ -890,6 +925,7 @@ impl CallManager {
rtp_pool,
socket,
public_ip,
route.voicemail_box.clone(),
greeting_wav,
)
.await?;
@@ -1715,6 +1751,7 @@ impl CallManager {
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
voicebox_id: Option<String>,
greeting_wav: Option<String>,
) -> Option<String> {
let lan_ip = &config.proxy.lan_ip;
@@ -1810,17 +1847,22 @@ impl CallManager {
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis();
let recording_dir = "nogit/voicemail/default".to_string();
let recording_dir = format!(
".nogit/voicemail/{}",
voicebox_id.as_deref().unwrap_or("default")
);
let recording_path = format!("{recording_dir}/msg-{timestamp}.wav");
let out_tx = self.out_tx.clone();
let call_id_owned = call_id.to_string();
let caller_owned = caller_number.to_string();
let voicebox_id_owned = voicebox_id.clone();
let rtp_socket = rtp_alloc.socket;
tokio::spawn(async move {
crate::voicemail::run_voicemail_session(
rtp_socket,
provider_media,
codec_pt,
voicebox_id_owned,
greeting_wav,
recording_path,
120_000,
@@ -1942,24 +1984,23 @@ impl CallManager {
}
}
// Generate IVR prompt on-demand via TTS (cached).
// Generate the IVR prompt as a live chunked TTS stream so playback can
// start after the first chunk instead of waiting for a full WAV render.
let voice = menu.prompt_voice.as_deref().unwrap_or("af_bella");
let prompt_output = format!(".nogit/tts/ivr-menu-{}.wav", menu.id);
let prompt_params = serde_json::json!({
"model": ".nogit/tts/kokoro-v1.0.onnx",
"voices": ".nogit/tts/voices.bin",
"voice": voice,
"text": &menu.prompt_text,
"output": &prompt_output,
"cacheable": true,
});
let prompt_wav = {
let live_prompt = {
let mut tts = tts_engine.lock().await;
match tts.generate(&prompt_params).await {
Ok(_) => Some(prompt_output),
match tts
.start_live_prompt(crate::tts::TtsPromptRequest {
model_path: crate::tts::DEFAULT_MODEL_PATH.to_string(),
voices_path: crate::tts::DEFAULT_VOICES_PATH.to_string(),
voice_name: voice.to_string(),
text: menu.prompt_text.clone(),
})
.await
{
Ok(prompt) => Some(prompt),
Err(e) => {
eprintln!("[ivr] TTS generation failed: {e}");
eprintln!("[ivr] live TTS setup failed: {e}");
None
}
}
@@ -1976,17 +2017,14 @@ impl CallManager {
let timeout_ms = menu.timeout_sec.unwrap_or(5) * 1000;
tokio::spawn(async move {
// Load prompt PCM frames if available.
let prompt_frames = prompt_wav
.as_ref()
.and_then(|wav| crate::audio_player::load_prompt_pcm_frames(wav).ok());
if let Some(frames) = prompt_frames {
if let Some(prompt) = live_prompt {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let _ = mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: provider_leg_id.clone(),
prompt_pcm_frames: frames,
prompt_pcm_frames: prompt.initial_frames,
prompt_stream_rx: Some(prompt.stream_rx),
prompt_cancel_tx: Some(prompt.cancel_tx),
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
@@ -2113,3 +2151,22 @@ async fn resolve_greeting_wav(
}
None
}
async fn resolve_unrouted_greeting_wav(tts_engine: &Arc<Mutex<TtsEngine>>) -> Option<String> {
let output = ".nogit/tts/unrouted-number.wav";
let params = serde_json::json!({
"model": crate::tts::DEFAULT_MODEL_PATH,
"voices": crate::tts::DEFAULT_VOICES_PATH,
"voice": "af_bella",
"text": "This number is currently not being routed by siprouter.",
"output": output,
"cacheable": true,
});
let mut tts = tts_engine.lock().await;
if tts.generate(&params).await.is_ok() {
Some(output.to_string())
} else {
None
}
}

View File

@@ -273,8 +273,56 @@ pub fn normalize_routing_identity(value: &str) -> String {
digits
}
fn parse_numeric_range_value(value: &str) -> Option<(bool, &str)> {
let trimmed = value.trim();
if trimmed.is_empty() {
return None;
}
let (has_plus, digits) = if let Some(rest) = trimmed.strip_prefix('+') {
(true, rest)
} else {
(false, trimmed)
};
if digits.is_empty() || !digits.chars().all(|c| c.is_ascii_digit()) {
return None;
}
Some((has_plus, digits))
}
fn matches_numeric_range_pattern(pattern: &str, value: &str) -> bool {
let Some((start, end)) = pattern.split_once("..") else {
return false;
};
let Some((start_plus, start_digits)) = parse_numeric_range_value(start) else {
return false;
};
let Some((end_plus, end_digits)) = parse_numeric_range_value(end) else {
return false;
};
let Some((value_plus, value_digits)) = parse_numeric_range_value(value) else {
return false;
};
if start_plus != end_plus || value_plus != start_plus {
return false;
}
if start_digits.len() != end_digits.len() || value_digits.len() != start_digits.len() {
return false;
}
if start_digits > end_digits {
return false;
}
value_digits >= start_digits && value_digits <= end_digits
}
/// Test a value against a pattern string.
/// - None/empty: matches everything (wildcard)
/// - `start..end`: numeric range match
/// - Trailing '*': prefix match
/// - Starts with '/': regex match
/// - Otherwise: exact match
@@ -290,6 +338,10 @@ pub fn matches_pattern(pattern: Option<&str>, value: &str) -> bool {
return value.starts_with(&pattern[..pattern.len() - 1]);
}
if matches_numeric_range_pattern(pattern, value) {
return true;
}
// Regex match: "/^\\+49/" or "/pattern/i"
if pattern.starts_with('/') {
if let Some(last_slash) = pattern[1..].rfind('/') {
@@ -321,12 +373,14 @@ pub struct OutboundRouteResult {
/// Result of resolving an inbound route.
//
// `device_ids` and `ring_browsers` are consumed by create_inbound_call.
// `device_ids`, `ring_all_devices`, and `ring_browsers` are consumed by
// create_inbound_call.
// The remaining fields (voicemail_box, ivr_menu_id, no_answer_timeout)
// are resolved but not yet acted upon — see the multi-target TODO.
#[allow(dead_code)]
pub struct InboundRouteResult {
pub device_ids: Vec<String>,
pub ring_all_devices: bool,
pub ring_browsers: bool,
pub voicemail_box: Option<String>,
pub ivr_menu_id: Option<String>,
@@ -433,8 +487,10 @@ impl AppConfig {
continue;
}
let explicit_targets = route.action.targets.clone();
return Some(InboundRouteResult {
device_ids: route.action.targets.clone().unwrap_or_default(),
device_ids: explicit_targets.clone().unwrap_or_default(),
ring_all_devices: explicit_targets.is_none(),
ring_browsers: route.action.ring_browsers.unwrap_or(false),
voicemail_box: route.action.voicemail_box.clone(),
ivr_menu_id: route.action.ivr_menu_id.clone(),
@@ -579,4 +635,24 @@ mod tests {
assert_eq!(support.no_answer_timeout, Some(20));
assert!(!support.ring_browsers);
}
#[test]
fn matches_pattern_supports_numeric_ranges() {
assert!(matches_pattern(
Some("042116767546..042116767548"),
"042116767547"
));
assert!(!matches_pattern(
Some("042116767546..042116767548"),
"042116767549"
));
assert!(matches_pattern(
Some("+4942116767546..+4942116767548"),
"+4942116767547"
));
assert!(!matches_pattern(
Some("+4942116767546..+4942116767548"),
"042116767547"
));
}
}

View File

@@ -152,6 +152,7 @@ async fn handle_command(
"replace_leg" => handle_replace_leg(engine, out_tx, &cmd).await,
// Leg interaction and tool leg commands.
"start_interaction" => handle_start_interaction(engine, out_tx, &cmd).await,
"start_tts_interaction" => handle_start_tts_interaction(engine, out_tx, &cmd).await,
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
@@ -1138,6 +1139,79 @@ async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, c
// Leg interaction & tool leg commands
// ---------------------------------------------------------------------------
async fn run_interaction_command(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
call_id: String,
leg_id: String,
prompt_pcm_frames: Vec<Vec<f32>>,
prompt_stream_rx: Option<tokio::sync::mpsc::Receiver<tts::TtsStreamMessage>>,
prompt_cancel_tx: Option<tokio::sync::watch::Sender<bool>>,
expected_digits: Vec<char>,
timeout_ms: u32,
) {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: leg_id.clone(),
prompt_pcm_frames,
prompt_stream_rx,
prompt_cancel_tx,
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
})
.await;
}
let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
let result = match tokio::time::timeout(safety_timeout, result_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled,
Err(_) => crate::mixer::InteractionResult::Timeout,
};
let (result_str, digit_str) = match &result {
crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())),
crate::mixer::InteractionResult::Timeout => ("timeout", None),
crate::mixer::InteractionResult::Cancelled => ("cancelled", None),
};
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
if let Some(leg) = call.legs.get_mut(&leg_id) {
leg.metadata.insert(
"last_interaction_result".to_string(),
serde_json::json!(result_str),
);
if let Some(ref d) = digit_str {
leg.metadata
.insert("last_interaction_digit".to_string(), serde_json::json!(d));
}
}
}
}
let mut resp = serde_json::json!({ "result": result_str });
if let Some(d) = digit_str {
resp["digit"] = serde_json::json!(d);
}
respond_ok(out_tx, &cmd.id, resp);
}
/// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF.
/// This command blocks until the interaction completes (digit, timeout, or cancel).
async fn handle_start_interaction(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
@@ -1175,7 +1249,6 @@ async fn handle_start_interaction(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutT
.and_then(|v| v.as_u64())
.unwrap_or(15000) as u32;
// Load prompt audio from WAV file.
let prompt_frames = match crate::audio_player::load_prompt_pcm_frames(&prompt_wav) {
Ok(f) => f,
Err(e) => {
@@ -1184,67 +1257,113 @@ async fn handle_start_interaction(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutT
}
};
// Create oneshot channel for the result.
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
run_interaction_command(
engine,
out_tx,
cmd,
call_id,
leg_id,
prompt_frames,
None,
None,
expected_digits,
timeout_ms,
)
.await;
}
// Send StartInteraction to the mixer.
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
/// Handle `start_tts_interaction` — isolate a leg, stream chunked TTS, and
/// start playback before the full prompt has rendered.
async fn handle_start_tts_interaction(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => {
respond_err(out_tx, &cmd.id, "missing leg_id");
return;
}
};
let text = match cmd.params.get("text").and_then(|v| v.as_str()) {
Some(s) if !s.trim().is_empty() => s.to_string(),
_ => {
respond_err(out_tx, &cmd.id, "missing text");
return;
}
};
let voice = cmd
.params
.get("voice")
.and_then(|v| v.as_str())
.unwrap_or("af_bella")
.to_string();
let model = cmd
.params
.get("model")
.and_then(|v| v.as_str())
.unwrap_or(tts::DEFAULT_MODEL_PATH)
.to_string();
let voices = cmd
.params
.get("voices")
.and_then(|v| v.as_str())
.unwrap_or(tts::DEFAULT_VOICES_PATH)
.to_string();
let expected_digits: Vec<char> = cmd
.params
.get("expected_digits")
.and_then(|v| v.as_str())
.unwrap_or("12")
.chars()
.collect();
let timeout_ms = cmd
.params
.get("timeout_ms")
.and_then(|v| v.as_u64())
.unwrap_or(15000) as u32;
let tts_engine = engine.lock().await.tts_engine.clone();
let live_prompt = {
let mut tts = tts_engine.lock().await;
match tts
.start_live_prompt(tts::TtsPromptRequest {
model_path: model,
voices_path: voices,
voice_name: voice,
text,
})
.await
{
Ok(prompt) => prompt,
Err(e) => {
respond_err(out_tx, &cmd.id, &e);
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: leg_id.clone(),
prompt_pcm_frames: prompt_frames,
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
})
.await;
} // engine lock released — we block on the oneshot, not the lock.
// Await the interaction result (blocks this task until complete).
let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
let result = match tokio::time::timeout(safety_timeout, result_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled, // oneshot dropped
Err(_) => crate::mixer::InteractionResult::Timeout, // safety timeout
};
// Store consent result in leg metadata.
let (result_str, digit_str) = match &result {
crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())),
crate::mixer::InteractionResult::Timeout => ("timeout", None),
crate::mixer::InteractionResult::Cancelled => ("cancelled", None),
};
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
if let Some(leg) = call.legs.get_mut(&leg_id) {
leg.metadata.insert(
"last_interaction_result".to_string(),
serde_json::json!(result_str),
);
if let Some(ref d) = digit_str {
leg.metadata
.insert("last_interaction_digit".to_string(), serde_json::json!(d));
}
}
}
}
};
let mut resp = serde_json::json!({ "result": result_str });
if let Some(d) = digit_str {
resp["digit"] = serde_json::json!(d);
}
respond_ok(out_tx, &cmd.id, resp);
run_interaction_command(
engine,
out_tx,
cmd,
call_id,
leg_id,
live_prompt.initial_frames,
Some(live_prompt.stream_rx),
Some(live_prompt.cancel_tx),
expected_digits,
timeout_ms,
)
.await;
}
/// Handle `add_tool_leg` — add a recording or transcription tool leg to a call.

View File

@@ -18,10 +18,11 @@
use crate::ipc::{emit_event, OutTx};
use crate::jitter_buffer::{JitterBuffer, JitterResult};
use crate::rtp::{build_rtp_header, rtp_clock_increment, rtp_clock_rate};
use crate::tts::TtsStreamMessage;
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
use nnnoiseless::DenoiseState;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
@@ -64,6 +65,12 @@ enum LegRole {
struct IsolationState {
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
prompt_frames: VecDeque<Vec<f32>>,
/// Live TTS frames arrive here while playback is already in progress.
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
/// Cancels the background TTS producer when the interaction ends early.
prompt_cancel_tx: Option<watch::Sender<bool>>,
/// Whether the live prompt stream has ended.
prompt_stream_finished: bool,
/// Digits that complete the interaction (e.g., ['1', '2']).
expected_digits: Vec<char>,
/// Ticks remaining before timeout (decremented each tick after prompt ends).
@@ -140,6 +147,10 @@ pub enum MixerCommand {
leg_id: String,
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
prompt_pcm_frames: Vec<Vec<f32>>,
/// Optional live prompt stream. Frames are appended as they are synthesized.
prompt_stream_rx: Option<mpsc::Receiver<TtsStreamMessage>>,
/// Optional cancellation handle for the live prompt stream.
prompt_cancel_tx: Option<watch::Sender<bool>>,
expected_digits: Vec<char>,
timeout_ms: u32,
result_tx: oneshot::Sender<InteractionResult>,
@@ -329,9 +340,11 @@ fn fill_leg_playout_buffer(slot: &mut MixerLegSlot) {
match slot.jitter.consume() {
JitterResult::Packet(pkt) => queue_inbound_packet(slot, pkt),
JitterResult::Missing => {
let conceal_ts = slot.estimated_packet_ts.max(rtp_clock_increment(slot.codec_pt));
let conceal_samples = rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts)
.clamp(1, MAX_GAP_FILL_SAMPLES);
let conceal_ts = slot
.estimated_packet_ts
.max(rtp_clock_increment(slot.codec_pt));
let conceal_samples =
rtp_ts_to_mix_samples(slot.codec_pt, conceal_ts).clamp(1, MAX_GAP_FILL_SAMPLES);
append_packet_loss_concealment(slot, conceal_samples);
if let Some(expected_ts) = slot.expected_rtp_timestamp {
slot.expected_rtp_timestamp = Some(expected_ts.wrapping_add(conceal_ts));
@@ -418,6 +431,64 @@ fn try_send_tool_output(
);
}
fn cancel_prompt_producer(state: &mut IsolationState) {
if let Some(cancel_tx) = state.prompt_cancel_tx.take() {
let _ = cancel_tx.send(true);
}
}
fn cancel_isolated_interaction(state: &mut IsolationState) {
cancel_prompt_producer(state);
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
fn drain_prompt_stream(
out_tx: &OutTx,
call_id: &str,
leg_id: &str,
state: &mut IsolationState,
) {
loop {
let Some(mut stream_rx) = state.prompt_stream_rx.take() else {
return;
};
match stream_rx.try_recv() {
Ok(TtsStreamMessage::Frames(frames)) => {
state.prompt_frames.extend(frames);
state.prompt_stream_rx = Some(stream_rx);
}
Ok(TtsStreamMessage::Finished) => {
state.prompt_stream_finished = true;
return;
}
Ok(TtsStreamMessage::Failed(error)) => {
emit_event(
out_tx,
"mixer_error",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"error": format!("tts stream failed: {error}"),
}),
);
state.prompt_stream_finished = true;
return;
}
Err(mpsc::error::TryRecvError::Empty) => {
state.prompt_stream_rx = Some(stream_rx);
return;
}
Err(mpsc::error::TryRecvError::Disconnected) => {
state.prompt_stream_finished = true;
return;
}
}
}
}
/// Spawn the mixer task for a call. Returns the command sender and task handle.
pub fn spawn_mixer(call_id: String, out_tx: OutTx) -> (mpsc::Sender<MixerCommand>, JoinHandle<()>) {
let (cmd_tx, cmd_rx) = mpsc::channel::<MixerCommand>(32);
@@ -489,9 +560,7 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
// If the leg is isolated, send Cancelled before dropping.
if let Some(slot) = legs.get_mut(&leg_id) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
cancel_isolated_interaction(state);
}
}
legs.remove(&leg_id);
@@ -501,9 +570,7 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
// Cancel all outstanding interactions before shutting down.
for slot in legs.values_mut() {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
cancel_isolated_interaction(state);
}
}
return;
@@ -511,6 +578,8 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
Ok(MixerCommand::StartInteraction {
leg_id,
prompt_pcm_frames,
prompt_stream_rx,
prompt_cancel_tx,
expected_digits,
timeout_ms,
result_tx,
@@ -518,13 +587,14 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
if let Some(slot) = legs.get_mut(&leg_id) {
// Cancel any existing interaction first.
if let LegRole::Isolated(ref mut old_state) = slot.role {
if let Some(tx) = old_state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
cancel_isolated_interaction(old_state);
}
let timeout_ticks = timeout_ms / 20;
slot.role = LegRole::Isolated(IsolationState {
prompt_frames: VecDeque::from(prompt_pcm_frames),
prompt_stream_rx,
prompt_cancel_tx,
prompt_stream_finished: false,
expected_digits,
timeout_ticks_remaining: timeout_ticks,
prompt_done: false,
@@ -532,6 +602,9 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
});
} else {
// Leg not found — immediately cancel.
if let Some(cancel_tx) = prompt_cancel_tx {
let _ = cancel_tx.send(true);
}
let _ = result_tx.send(InteractionResult::Cancelled);
}
}
@@ -667,6 +740,8 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
try_send_leg_output(&out_tx, &call_id, lid, slot, rtp, "participant-audio");
}
LegRole::Isolated(state) => {
drain_prompt_stream(&out_tx, &call_id, lid, state);
// Check for DTMF digit from this leg.
let mut matched_digit: Option<char> = None;
for (src_lid, dtmf_pkt) in &dtmf_forward {
@@ -692,9 +767,12 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
// Interaction complete — digit matched.
completed_interactions.push((lid.clone(), InteractionResult::Digit(digit)));
} else {
// Play prompt frame or silence.
// Play prompt frame, wait for live TTS, or move to timeout once the
// prompt stream has fully drained.
let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() {
frame
} else if !state.prompt_stream_finished {
vec![0.0f32; MIX_FRAME_SIZE]
} else {
state.prompt_done = true;
vec![0.0f32; MIX_FRAME_SIZE]
@@ -759,6 +837,7 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
for (lid, result) in completed_interactions {
if let Some(slot) = legs.get_mut(&lid) {
if let LegRole::Isolated(ref mut state) = slot.role {
cancel_prompt_producer(state);
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(result);
}
@@ -822,14 +901,7 @@ async fn mixer_loop(call_id: String, mut cmd_rx: mpsc::Receiver<MixerCommand>, o
rtp_out.extend_from_slice(&dtmf_pkt.payload);
target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1);
// Don't increment rtp_ts for DTMF — it shares timestamp context with audio.
try_send_leg_output(
&out_tx,
&call_id,
target_lid,
target_slot,
rtp_out,
"dtmf",
);
try_send_leg_output(&out_tx, &call_id, target_lid, target_slot, rtp_out, "dtmf");
}
}
}

View File

@@ -9,12 +9,41 @@
//! Callers never need to check for cached files — that is entirely this module's
//! responsibility.
use crate::audio_player::pcm_to_mix_frames;
use kokoro_tts::{KokoroTts, Voice};
use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, watch};
pub const DEFAULT_MODEL_PATH: &str = ".nogit/tts/kokoro-v1.0.onnx";
pub const DEFAULT_VOICES_PATH: &str = ".nogit/tts/voices.bin";
const TTS_OUTPUT_RATE: u32 = 24000;
const MAX_CHUNK_CHARS: usize = 220;
const MIN_CHUNK_CHARS: usize = 80;
pub enum TtsStreamMessage {
Frames(Vec<Vec<f32>>),
Finished,
Failed(String),
}
pub struct TtsLivePrompt {
pub initial_frames: Vec<Vec<f32>>,
pub stream_rx: mpsc::Receiver<TtsStreamMessage>,
pub cancel_tx: watch::Sender<bool>,
}
#[derive(Clone)]
pub struct TtsPromptRequest {
pub model_path: String,
pub voices_path: String,
pub voice_name: String,
pub text: String,
}
/// Wraps the Kokoro TTS engine with lazy model loading.
pub struct TtsEngine {
tts: Option<KokoroTts>,
tts: Option<Arc<KokoroTts>>,
/// Path that was used to load the current model (for cache invalidation).
loaded_model_path: String,
loaded_voices_path: String,
@@ -29,6 +58,69 @@ impl TtsEngine {
}
}
async fn ensure_loaded(
&mut self,
model_path: &str,
voices_path: &str,
) -> Result<Arc<KokoroTts>, String> {
if !Path::new(model_path).exists() {
return Err(format!("model not found: {model_path}"));
}
if !Path::new(voices_path).exists() {
return Err(format!("voices not found: {voices_path}"));
}
if self.tts.is_none()
|| self.loaded_model_path != model_path
|| self.loaded_voices_path != voices_path
{
eprintln!("[tts] loading model: {model_path}");
let tts = Arc::new(
KokoroTts::new(model_path, voices_path)
.await
.map_err(|e| format!("model load failed: {e:?}"))?,
);
self.tts = Some(tts);
self.loaded_model_path = model_path.to_string();
self.loaded_voices_path = voices_path.to_string();
}
Ok(self.tts.as_ref().unwrap().clone())
}
pub async fn start_live_prompt(
&mut self,
request: TtsPromptRequest,
) -> Result<TtsLivePrompt, String> {
if request.text.trim().is_empty() {
return Err("empty text".into());
}
let tts = self
.ensure_loaded(&request.model_path, &request.voices_path)
.await?;
let voice = select_voice(&request.voice_name);
let chunks = chunk_text(&request.text);
if chunks.is_empty() {
return Err("empty text".into());
}
let initial_frames = synth_text_to_mix_frames(&tts, chunks[0].as_str(), voice).await?;
let remaining_chunks: Vec<String> = chunks.into_iter().skip(1).collect();
let (stream_tx, stream_rx) = mpsc::channel(8);
let (cancel_tx, cancel_rx) = watch::channel(false);
tokio::spawn(async move {
stream_live_prompt_chunks(tts, voice, remaining_chunks, stream_tx, cancel_rx).await;
});
Ok(TtsLivePrompt {
initial_frames,
stream_rx,
cancel_tx,
})
}
/// Generate a WAV file from text.
///
/// Params (from IPC JSON):
@@ -78,37 +170,15 @@ impl TtsEngine {
return Ok(serde_json::json!({ "output": output_path }));
}
// Check that model/voices files exist.
if !Path::new(model_path).exists() {
return Err(format!("model not found: {model_path}"));
}
if !Path::new(voices_path).exists() {
return Err(format!("voices not found: {voices_path}"));
}
// Ensure parent directory exists.
if let Some(parent) = Path::new(output_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
// Lazy-load or reload if paths changed.
if self.tts.is_none()
|| self.loaded_model_path != model_path
|| self.loaded_voices_path != voices_path
{
eprintln!("[tts] loading model: {model_path}");
let tts = KokoroTts::new(model_path, voices_path)
.await
.map_err(|e| format!("model load failed: {e:?}"))?;
self.tts = Some(tts);
self.loaded_model_path = model_path.to_string();
self.loaded_voices_path = voices_path.to_string();
}
let tts = self.tts.as_ref().unwrap();
let tts = self.ensure_loaded(model_path, voices_path).await?;
let voice = select_voice(voice_name);
eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\"");
eprintln!("[tts] synthesizing WAV voice '{voice_name}' to {output_path}");
let (samples, duration) = tts
.synth(text, voice)
.await
@@ -175,6 +245,106 @@ impl TtsEngine {
}
}
async fn synth_text_to_mix_frames(
tts: &Arc<KokoroTts>,
text: &str,
voice: Voice,
) -> Result<Vec<Vec<f32>>, String> {
let (samples, duration) = tts
.synth(text, voice)
.await
.map_err(|e| format!("synthesis failed: {e:?}"))?;
eprintln!(
"[tts] synthesized chunk ({} chars, {} samples) in {duration:?}",
text.chars().count(),
samples.len()
);
pcm_to_mix_frames(&samples, TTS_OUTPUT_RATE)
}
async fn stream_live_prompt_chunks(
tts: Arc<KokoroTts>,
voice: Voice,
chunks: Vec<String>,
stream_tx: mpsc::Sender<TtsStreamMessage>,
mut cancel_rx: watch::Receiver<bool>,
) {
for chunk in chunks {
if *cancel_rx.borrow() {
break;
}
match synth_text_to_mix_frames(&tts, &chunk, voice).await {
Ok(frames) => {
if *cancel_rx.borrow() {
break;
}
if stream_tx.send(TtsStreamMessage::Frames(frames)).await.is_err() {
return;
}
}
Err(error) => {
let _ = stream_tx.send(TtsStreamMessage::Failed(error)).await;
return;
}
}
if cancel_rx.has_changed().unwrap_or(false) && *cancel_rx.borrow_and_update() {
break;
}
}
let _ = stream_tx.send(TtsStreamMessage::Finished).await;
}
fn chunk_text(text: &str) -> Vec<String> {
let mut chunks = Vec::new();
let mut current = String::new();
for ch in text.chars() {
current.push(ch);
let len = current.chars().count();
let hard_split = len >= MAX_CHUNK_CHARS && (ch.is_whitespace() || is_soft_boundary(ch));
let natural_split = len >= MIN_CHUNK_CHARS && is_sentence_boundary(ch);
if natural_split || hard_split {
push_chunk(&mut chunks, &mut current);
}
}
push_chunk(&mut chunks, &mut current);
if chunks.len() >= 2 {
let last_len = chunks.last().unwrap().chars().count();
if last_len < (MIN_CHUNK_CHARS / 2) {
let tail = chunks.pop().unwrap();
if let Some(prev) = chunks.last_mut() {
prev.push(' ');
prev.push_str(tail.trim());
}
}
}
chunks
}
fn push_chunk(chunks: &mut Vec<String>, current: &mut String) {
let trimmed = current.trim();
if !trimmed.is_empty() {
chunks.push(trimmed.to_string());
}
current.clear();
}
fn is_sentence_boundary(ch: char) -> bool {
matches!(ch, '.' | '!' | '?' | '\n' | ';' | ':')
}
fn is_soft_boundary(ch: char) -> bool {
matches!(ch, ',' | ';' | ':' | ')' | ']' | '\n')
}
/// Map voice name string to Kokoro Voice enum variant.
fn select_voice(name: &str) -> Voice {
match name {

View File

@@ -19,6 +19,7 @@ pub async fn run_voicemail_session(
rtp_socket: Arc<UdpSocket>,
provider_media: SocketAddr,
codec_pt: u8,
voicebox_id: Option<String>,
greeting_wav: Option<String>,
recording_path: String,
max_recording_ms: u64,
@@ -33,6 +34,7 @@ pub async fn run_voicemail_session(
"voicemail_started",
serde_json::json!({
"call_id": call_id,
"voicebox_id": voicebox_id,
"caller_number": caller_number,
}),
);
@@ -102,6 +104,7 @@ pub async fn run_voicemail_session(
"recording_done",
serde_json::json!({
"call_id": call_id,
"voicebox_id": voicebox_id,
"file_path": result.file_path,
"duration_ms": result.duration_ms,
"caller_number": caller_number,

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.24.0',
version: '1.25.1',
description: 'undefined'
}

View File

@@ -48,6 +48,24 @@ export interface IDeviceConfig {
extension: string;
}
export type TIncomingNumberMode = 'single' | 'range' | 'regex';
export interface IIncomingNumberConfig {
id: string;
label: string;
providerId?: string;
mode: TIncomingNumberMode;
countryCode?: string;
areaCode?: string;
localNumber?: string;
rangeEnd?: string;
pattern?: string;
// Legacy persisted fields kept for migration compatibility.
number?: string;
rangeStart?: string;
}
// ---------------------------------------------------------------------------
// Match/Action routing model
// ---------------------------------------------------------------------------
@@ -66,7 +84,7 @@ export interface ISipRouteMatch {
*
* Inbound: matches the provider-delivered DID / Request-URI user part.
* Outbound: matches the normalized dialed digits.
* Supports: exact string, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/").
* Supports: exact string, numeric range `start..end`, prefix with trailing '*' (e.g. "+4930*"), or regex ("/^\\+49/").
*/
numberPattern?: string;
@@ -234,6 +252,7 @@ export interface IAppConfig {
proxy: IProxyConfig;
providers: IProviderConfig[];
devices: IDeviceConfig[];
incomingNumbers?: IIncomingNumberConfig[];
routing: IRoutingConfig;
contacts: IContact[];
voiceboxes?: IVoiceboxConfig[];
@@ -288,6 +307,14 @@ export function loadConfig(): IAppConfig {
d.extension ??= '100';
}
cfg.incomingNumbers ??= [];
for (const incoming of cfg.incomingNumbers) {
if (!incoming.id) incoming.id = `incoming-${Date.now()}`;
incoming.label ??= incoming.id;
incoming.mode ??= incoming.pattern ? 'regex' : incoming.rangeStart || incoming.rangeEnd ? 'range' : 'single';
incoming.countryCode ??= incoming.mode === 'regex' ? undefined : '+49';
}
cfg.routing ??= { routes: [] };
cfg.routing.routes ??= [];

View File

@@ -266,6 +266,7 @@ async function handleRequest(
if (existing && ud.displayName !== undefined) existing.displayName = ud.displayName;
}
}
if (updates.incomingNumbers !== undefined) cfg.incomingNumbers = updates.incomingNumbers;
if (updates.routing) {
if (updates.routing.routes) {
cfg.routing.routes = updates.routing.routes;

View File

@@ -82,6 +82,19 @@ type TProxyCommands = {
};
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
};
start_tts_interaction: {
params: {
call_id: string;
leg_id: string;
text: string;
voice?: string;
model?: string;
voices?: string;
expected_digits: string;
timeout_ms: number;
};
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
};
add_tool_leg: {
params: {
call_id: string;
@@ -446,6 +459,40 @@ export async function startInteraction(
}
}
/**
* Start a live TTS interaction on a specific leg. The first chunk is rendered
* up front and the rest streams into the mixer while playback is already live.
*/
export async function startTtsInteraction(
callId: string,
legId: string,
text: string,
expectedDigits: string,
timeoutMs: number,
options?: {
voice?: string;
model?: string;
voices?: string;
},
): Promise<{ result: 'digit' | 'timeout' | 'cancelled'; digit?: string } | null> {
if (!bridge || !initialized) return null;
try {
return await sendProxyCommand('start_tts_interaction', {
call_id: callId,
leg_id: legId,
text,
expected_digits: expectedDigits,
timeout_ms: timeoutMs,
voice: options?.voice,
model: options?.model,
voices: options?.voices,
});
} catch (error: unknown) {
logFn?.(`[proxy-engine] start_tts_interaction error: ${errorMessage(error)}`);
return null;
}
}
/**
* Add a tool leg (recording or transcription) to a call.
* Tool legs receive per-source unmerged audio from all participants.

View File

@@ -168,12 +168,13 @@ export function registerProxyEventHandlers(options: IRegisterProxyEventHandlersO
});
onProxyEvent('voicemail_started', (data) => {
log(`[voicemail] started for call ${data.call_id} caller=${data.caller_number}`);
log(`[voicemail] started for call ${data.call_id} box=${data.voicebox_id || 'default'} caller=${data.caller_number}`);
});
onProxyEvent('recording_done', (data) => {
log(`[voicemail] recording done: ${data.file_path} (${data.duration_ms}ms) caller=${data.caller_number}`);
voiceboxManager.addMessage('default', {
const boxId = data.voicebox_id || 'default';
log(`[voicemail] recording done: ${data.file_path} (${data.duration_ms}ms) box=${boxId} caller=${data.caller_number}`);
voiceboxManager.addMessage(boxId, {
callerNumber: data.caller_number || 'Unknown',
callerName: null,
fileName: data.file_path,

View File

@@ -108,10 +108,13 @@ export interface IWebRtcAudioRxEvent {
export interface IVoicemailStartedEvent {
call_id: string;
voicebox_id?: string;
caller_number?: string;
}
export interface IRecordingDoneEvent {
call_id?: string;
voicebox_id?: string;
file_path: string;
duration_ms: number;
caller_number?: string;

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.24.0',
version: '1.25.1',
description: 'undefined'
}

File diff suppressed because it is too large Load Diff