feat(proxy-engine): add on-demand TTS caching for voicemail and IVR prompts

This commit is contained in:
2026-04-12 20:45:08 +00:00
parent cfadd7a2b6
commit 59d8c2557c
17 changed files with 460 additions and 488 deletions

10
rust/Cargo.lock generated
View File

@@ -532,6 +532,15 @@ dependencies = [
"cc",
]
[[package]]
name = "cmudict-fast"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9f73004e928ed46c3e7fd7406d2b12c8674153295f08af084b49860276dc02"
dependencies = [
"thiserror",
]
[[package]]
name = "codec-lib"
version = "0.1.0"
@@ -1730,6 +1739,7 @@ dependencies = [
"bincode 2.0.1",
"cc",
"chinese-number",
"cmudict-fast",
"futures",
"jieba-rs",
"log",

View File

@@ -19,7 +19,7 @@ regex-lite = "0.1"
webrtc = "0.8"
rand = "0.8"
hound = "3.5"
kokoro-tts = { version = "0.3", default-features = false }
kokoro-tts = { version = "0.3", default-features = false, features = ["use-cmudict"] }
ort = { version = "=2.0.0-rc.11", default-features = false, features = [
"std", "download-binaries", "copy-dylibs", "ndarray",
"tls-native-vendored"

View File

@@ -23,6 +23,7 @@ pub enum CallState {
Ringing,
Connected,
Voicemail,
Ivr,
Terminated,
}
@@ -37,6 +38,7 @@ impl CallState {
Self::Ringing => "ringing",
Self::Connected => "connected",
Self::Voicemail => "voicemail",
Self::Ivr => "ivr",
Self::Terminated => "terminated",
}
}

View File

@@ -12,13 +12,16 @@ use crate::mixer::spawn_mixer;
use crate::registrar::Registrar;
use crate::rtp::RtpPortPool;
use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig};
use crate::tts::TtsEngine;
use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions};
use sip_proto::message::{ResponseOptions, SipMessage};
use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
/// Result of creating an inbound call — carries both the call id and
/// whether browsers should be notified (flows from the matched inbound
@@ -681,6 +684,7 @@ impl CallManager {
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
tts_engine: Arc<Mutex<TtsEngine>>,
) -> Option<InboundCallCreated> {
let call_id = self.next_call_id();
let lan_ip = &config.proxy.lan_ip;
@@ -710,10 +714,27 @@ impl CallManager {
// - `ring_browsers` is informational only — browsers see a toast but
// do not race the SIP device. First-to-answer-wins requires a
// multi-leg fork + per-leg CANCEL, which is not built yet.
// - `voicemail_box`, `ivr_menu_id`, `no_answer_timeout` are not honored.
let route = config.resolve_inbound_route(provider_id, &called_number, &caller_number);
let ring_browsers = route.ring_browsers;
// IVR routing: if the route targets an IVR menu, go there directly.
if let Some(ref ivr_menu_id) = route.ivr_menu_id {
if let Some(ivr) = &config.ivr {
if ivr.enabled {
if let Some(menu) = ivr.menus.iter().find(|m| m.id == *ivr_menu_id) {
let call_id = self
.route_to_ivr(
&call_id, invite, from_addr, &caller_number,
provider_id, provider_config, config, rtp_pool, socket,
public_ip, menu, &tts_engine,
)
.await?;
return Some(InboundCallCreated { call_id, ring_browsers });
}
}
}
}
// Pick the first registered device from the matched targets, or fall
// back to any-registered-device if the route has no resolved targets.
let device_addr = route
@@ -726,10 +747,17 @@ impl CallManager {
Some(addr) => addr,
None => {
// No device registered → voicemail.
// Resolve greeting WAV on-demand (may trigger TTS generation).
let greeting_wav = resolve_greeting_wav(
config,
route.voicemail_box.as_deref(),
&tts_engine,
).await;
let call_id = self
.route_to_voicemail(
&call_id, invite, from_addr, &caller_number,
provider_id, provider_config, config, rtp_pool, socket, public_ip,
greeting_wav,
)
.await?;
return Some(InboundCallCreated { call_id, ring_browsers });
@@ -1536,6 +1564,7 @@ impl CallManager {
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
greeting_wav: Option<String>,
) -> Option<String> {
let lan_ip = &config.proxy.lan_ip;
let pub_ip = public_ip.unwrap_or(lan_ip.as_str());
@@ -1630,8 +1659,6 @@ impl CallManager {
.as_millis();
let recording_dir = "nogit/voicemail/default".to_string();
let recording_path = format!("{recording_dir}/msg-{timestamp}.wav");
let greeting_wav = find_greeting_wav();
let out_tx = self.out_tx.clone();
let call_id_owned = call_id.to_string();
let caller_owned = caller_number.to_string();
@@ -1648,6 +1675,211 @@ impl CallManager {
Some(call_id.to_string())
}
// -----------------------------------------------------------------------
// IVR routing
// -----------------------------------------------------------------------
#[allow(clippy::too_many_arguments)]
async fn route_to_ivr(
&mut self,
call_id: &str,
invite: &SipMessage,
from_addr: SocketAddr,
caller_number: &str,
provider_id: &str,
provider_config: &ProviderConfig,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
menu: &crate::config::IvrMenuConfig,
tts_engine: &Arc<Mutex<TtsEngine>>,
) -> Option<String> {
let lan_ip = &config.proxy.lan_ip;
let rtp_alloc = match rtp_pool.allocate().await {
Some(a) => a,
None => {
let resp = SipMessage::create_response(503, "Service Unavailable", invite, None);
let _ = socket.send_to(&resp.serialize(), from_addr).await;
return None;
}
};
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
let pub_ip = public_ip.unwrap_or(lan_ip.as_str());
let sdp = sip_proto::helpers::build_sdp(&sip_proto::helpers::SdpOptions {
ip: pub_ip,
port: rtp_alloc.port,
payload_types: &provider_config.codecs,
..Default::default()
});
let response = SipMessage::create_response(
200, "OK", invite,
Some(sip_proto::message::ResponseOptions {
to_tag: Some(sip_proto::helpers::generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip, config.proxy.lan_port)),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
..Default::default()
}),
);
let _ = socket.send_to(&response.serialize(), from_addr).await;
let provider_media = if invite.has_sdp_body() {
parse_sdp_endpoint(&invite.body)
.and_then(|ep| format!("{}:{}", ep.address, ep.port).parse().ok())
} else {
Some(from_addr)
};
let provider_media = provider_media.unwrap_or(from_addr);
// Create call with IVR state.
let (mixer_cmd_tx, mixer_task) = spawn_mixer(call_id.to_string(), self.out_tx.clone());
let mut call = Call::new(
call_id.to_string(),
CallDirection::Inbound,
provider_id.to_string(),
mixer_cmd_tx.clone(),
mixer_task,
);
call.state = CallState::Ivr;
call.caller_number = Some(caller_number.to_string());
let provider_leg_id = format!("{call_id}-prov");
call.legs.insert(
provider_leg_id.clone(),
LegInfo {
id: provider_leg_id.clone(),
kind: LegKind::SipProvider,
state: LegState::Connected,
codec_pt,
sip_leg: None,
sip_call_id: Some(invite.call_id().to_string()),
webrtc_session_id: None,
rtp_socket: Some(rtp_alloc.socket.clone()),
rtp_port: rtp_alloc.port,
public_ip: public_ip.map(|s| s.to_string()),
remote_media: Some(provider_media),
signaling_addr: Some(from_addr),
metadata: HashMap::new(),
},
);
self.sip_index.insert(
invite.call_id().to_string(),
(call_id.to_string(), provider_leg_id.clone()),
);
self.calls.insert(call_id.to_string(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
// Generate IVR prompt on-demand via TTS (cached).
let voice = menu.prompt_voice.as_deref().unwrap_or("af_bella");
let prompt_output = format!(".nogit/tts/ivr-menu-{}.wav", menu.id);
let prompt_params = serde_json::json!({
"model": ".nogit/tts/kokoro-v1.0.onnx",
"voices": ".nogit/tts/voices.bin",
"voice": voice,
"text": &menu.prompt_text,
"output": &prompt_output,
"cacheable": true,
});
let prompt_wav = {
let mut tts = tts_engine.lock().await;
match tts.generate(&prompt_params).await {
Ok(_) => Some(prompt_output),
Err(e) => {
eprintln!("[ivr] TTS generation failed: {e}");
None
}
}
};
// Load prompt and run interaction via the mixer.
let out_tx = self.out_tx.clone();
let call_id_owned = call_id.to_string();
let expected_digits: Vec<char> = menu
.entries
.iter()
.filter_map(|e| e.digit.chars().next())
.collect();
let timeout_ms = menu.timeout_sec.unwrap_or(5) * 1000;
tokio::spawn(async move {
// Load prompt PCM frames if available.
let prompt_frames = prompt_wav.as_ref().and_then(|wav| {
crate::audio_player::load_prompt_pcm_frames(wav).ok()
});
if let Some(frames) = prompt_frames {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
let _ = mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: provider_leg_id.clone(),
prompt_pcm_frames: frames,
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
})
.await;
// Wait for digit or timeout.
let safety = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
let result = match tokio::time::timeout(safety, result_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled,
Err(_) => crate::mixer::InteractionResult::Timeout,
};
match &result {
crate::mixer::InteractionResult::Digit(d) => {
eprintln!("[ivr] caller pressed '{d}' on call {call_id_owned}");
emit_event(
&out_tx,
"ivr_digit",
serde_json::json!({
"call_id": call_id_owned,
"digit": d.to_string(),
}),
);
}
crate::mixer::InteractionResult::Timeout => {
eprintln!("[ivr] timeout on call {call_id_owned}");
emit_event(
&out_tx,
"ivr_timeout",
serde_json::json!({ "call_id": call_id_owned }),
);
}
crate::mixer::InteractionResult::Cancelled => {
eprintln!("[ivr] cancelled on call {call_id_owned}");
}
}
} else {
eprintln!("[ivr] no prompt available for call {call_id_owned}, ending");
emit_event(
&out_tx,
"ivr_error",
serde_json::json!({
"call_id": call_id_owned,
"error": "no prompt available",
}),
);
}
});
Some(call_id.to_string())
}
// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
@@ -1662,13 +1894,56 @@ impl CallManager {
}
}
fn find_greeting_wav() -> Option<String> {
let candidates = [
/// Resolve the greeting WAV for a voicemail box.
///
/// Priority:
/// 1. Pre-recorded WAV from voicebox config (`greetingWavPath`)
/// 2. On-demand TTS generation from greeting text (cached via `cacheable: true`)
/// 3. Legacy hardcoded paths (`.nogit/voicemail/default/greeting.wav`, etc.)
/// 4. None — voicemail session plays beep only
async fn resolve_greeting_wav(
config: &AppConfig,
voicebox_id: Option<&str>,
tts_engine: &Arc<Mutex<TtsEngine>>,
) -> Option<String> {
// 1. Look up voicebox config.
let vb = voicebox_id
.and_then(|id| config.voiceboxes.iter().find(|v| v.id == id && v.enabled));
if let Some(vb) = vb {
// 2. Pre-recorded WAV takes priority.
if let Some(ref wav) = vb.greeting_wav_path {
if Path::new(wav).exists() {
return Some(wav.clone());
}
}
// 3. TTS on-demand with caching.
let text = vb.greeting_text.as_deref().unwrap_or(
"The person you are trying to reach is not available. Please leave a message after the tone.",
);
let voice = vb.greeting_voice.as_deref().unwrap_or("af_bella");
let output = format!(".nogit/tts/voicemail-greeting-{}.wav", vb.id);
let params = serde_json::json!({
"model": ".nogit/tts/kokoro-v1.0.onnx",
"voices": ".nogit/tts/voices.bin",
"voice": voice,
"text": text,
"output": &output,
"cacheable": true,
});
let mut tts = tts_engine.lock().await;
if tts.generate(&params).await.is_ok() {
return Some(output);
}
}
// 4. Fallback: legacy hardcoded paths.
for path in &[
".nogit/voicemail/default/greeting.wav",
".nogit/voicemail/greeting.wav",
];
for path in &candidates {
if std::path::Path::new(path).exists() {
] {
if Path::new(path).exists() {
return Some(path.to_string());
}
}

View File

@@ -159,6 +159,10 @@ pub struct AppConfig {
pub providers: Vec<ProviderConfig>,
pub devices: Vec<DeviceConfig>,
pub routing: RoutingConfig,
#[serde(default)]
pub voiceboxes: Vec<VoiceboxConfig>,
#[serde(default)]
pub ivr: Option<IvrConfig>,
}
#[derive(Debug, Clone, Deserialize)]
@@ -166,6 +170,59 @@ pub struct RoutingConfig {
pub routes: Vec<Route>,
}
// ---------------------------------------------------------------------------
// Voicebox config
// ---------------------------------------------------------------------------
#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize)]
pub struct VoiceboxConfig {
pub id: String,
#[serde(default)]
pub enabled: bool,
#[serde(rename = "greetingText")]
pub greeting_text: Option<String>,
#[serde(rename = "greetingVoice")]
pub greeting_voice: Option<String>,
#[serde(rename = "greetingWavPath")]
pub greeting_wav_path: Option<String>,
#[serde(rename = "maxRecordingSec")]
pub max_recording_sec: Option<u32>,
}
// ---------------------------------------------------------------------------
// IVR config
// ---------------------------------------------------------------------------
#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize)]
pub struct IvrConfig {
pub enabled: bool,
pub menus: Vec<IvrMenuConfig>,
#[serde(rename = "entryMenuId")]
pub entry_menu_id: String,
}
#[derive(Debug, Clone, Deserialize)]
pub struct IvrMenuConfig {
pub id: String,
#[serde(rename = "promptText")]
pub prompt_text: String,
#[serde(rename = "promptVoice")]
pub prompt_voice: Option<String>,
pub entries: Vec<IvrMenuEntry>,
#[serde(rename = "timeoutSec")]
pub timeout_sec: Option<u32>,
}
#[allow(dead_code)]
#[derive(Debug, Clone, Deserialize)]
pub struct IvrMenuEntry {
pub digit: String,
pub action: String,
pub target: Option<String>,
}
// ---------------------------------------------------------------------------
// Pattern matching (ported from ts/config.ts)
// ---------------------------------------------------------------------------

View File

@@ -50,11 +50,12 @@ struct ProxyEngine {
registrar: Registrar,
call_mgr: CallManager,
rtp_pool: Option<RtpPortPool>,
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: OutTx,
}
impl ProxyEngine {
fn new(out_tx: OutTx) -> Self {
fn new(out_tx: OutTx, tts_engine: Arc<Mutex<tts::TtsEngine>>) -> Self {
Self {
config: None,
transport: None,
@@ -62,6 +63,7 @@ impl ProxyEngine {
registrar: Registrar::new(out_tx.clone()),
call_mgr: CallManager::new(out_tx.clone()),
rtp_pool: None,
tts_engine,
out_tx,
}
}
@@ -88,15 +90,16 @@ async fn main() {
// Emit ready event.
emit_event(&out_tx, "ready", serde_json::json!({}));
// Shared engine state (SIP side).
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone())));
// TTS engine — separate internal lock, lazy-loads model on first use.
let tts_engine = Arc::new(Mutex::new(tts::TtsEngine::new()));
// Shared engine state (SIP side). TTS engine is stored inside so the
// SIP packet handler path can reach it for on-demand voicemail/IVR generation.
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone(), tts_engine)));
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
// TTS engine — separate lock, lazy-loads model on first use.
let tts_engine = Arc::new(Mutex::new(tts::TtsEngine::new()));
// Read commands from stdin.
let stdin = tokio::io::stdin();
let reader = BufReader::new(stdin);
@@ -117,12 +120,11 @@ async fn main() {
let engine = engine.clone();
let webrtc = webrtc.clone();
let tts_engine = tts_engine.clone();
let out_tx = out_tx.clone();
// Handle commands — some are async, so we spawn.
tokio::spawn(async move {
handle_command(engine, webrtc, tts_engine, &out_tx, cmd).await;
handle_command(engine, webrtc, &out_tx, cmd).await;
});
}
}
@@ -130,7 +132,6 @@ async fn main() {
async fn handle_command(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: &OutTx,
cmd: Command,
) {
@@ -155,8 +156,8 @@ async fn handle_command(
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
// TTS command — lock tts_engine only (no SIP/WebRTC contention).
"generate_tts" => handle_generate_tts(tts_engine, out_tx, &cmd).await,
// TTS command — gets tts_engine from inside ProxyEngine.
"generate_tts" => handle_generate_tts(engine, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
}
@@ -325,8 +326,10 @@ async fn handle_sip_packet(
ref registrar,
ref mut call_mgr,
ref mut rtp_pool,
ref tts_engine,
..
} = *eng;
let tts_clone = tts_engine.clone();
let rtp_pool = rtp_pool.as_mut().unwrap();
let inbound = call_mgr
.create_inbound_call(
@@ -339,6 +342,7 @@ async fn handle_sip_packet(
rtp_pool,
socket,
public_ip.as_deref(),
tts_clone,
)
.await;
@@ -1231,10 +1235,11 @@ async fn handle_set_leg_metadata(
/// Handle `generate_tts` — synthesize text to a WAV file using Kokoro TTS.
async fn handle_generate_tts(
tts_engine: Arc<Mutex<tts::TtsEngine>>,
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let tts_engine = engine.lock().await.tts_engine.clone();
let mut tts = tts_engine.lock().await;
match tts.generate(&cmd.params).await {
Ok(result) => respond_ok(out_tx, &cmd.id, result),

View File

@@ -1,8 +1,13 @@
//! Text-to-speech engine — synthesizes text to WAV files using Kokoro neural TTS.
//!
//! The model is loaded lazily on first use. If the model/voices files are not
//! present, the generate command returns an error and the TS side falls back
//! to espeak-ng.
//! present, the generate command returns an error and the caller skips the prompt.
//!
//! Caching is handled internally via a `.meta` sidecar file next to each WAV.
//! When `cacheable` is true, the engine checks whether the existing WAV was
//! generated from the same text+voice; if so it returns immediately (cache hit).
//! Callers never need to check for cached files — that is entirely this module's
//! responsibility.
use kokoro_tts::{KokoroTts, Voice};
use std::path::Path;
@@ -32,6 +37,8 @@ impl TtsEngine {
/// - `voice`: voice name (e.g. "af_bella")
/// - `text`: text to synthesize
/// - `output`: output WAV file path
/// - `cacheable`: if true, skip synthesis when the output WAV already
/// matches the same text+voice (checked via a `.meta` sidecar file)
pub async fn generate(&mut self, params: &serde_json::Value) -> Result<serde_json::Value, String> {
let model_path = params.get("model").and_then(|v| v.as_str())
.ok_or("missing 'model' param")?;
@@ -43,11 +50,19 @@ impl TtsEngine {
.ok_or("missing 'text' param")?;
let output_path = params.get("output").and_then(|v| v.as_str())
.ok_or("missing 'output' param")?;
let cacheable = params.get("cacheable").and_then(|v| v.as_bool())
.unwrap_or(false);
if text.is_empty() {
return Err("empty text".into());
}
// Cache check: if cacheable and the sidecar matches, return immediately.
if cacheable && self.is_cache_hit(output_path, text, voice_name) {
eprintln!("[tts] cache hit: {output_path}");
return Ok(serde_json::json!({ "output": output_path }));
}
// Check that model/voices files exist.
if !Path::new(model_path).exists() {
return Err(format!("model not found: {model_path}"));
@@ -56,6 +71,11 @@ impl TtsEngine {
return Err(format!("voices not found: {voices_path}"));
}
// Ensure parent directory exists.
if let Some(parent) = Path::new(output_path).parent() {
let _ = std::fs::create_dir_all(parent);
}
// Lazy-load or reload if paths changed.
if self.tts.is_none()
|| self.loaded_model_path != model_path
@@ -95,9 +115,41 @@ impl TtsEngine {
}
writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?;
// Write sidecar for future cache checks.
if cacheable {
self.write_cache_meta(output_path, text, voice_name);
}
eprintln!("[tts] wrote {output_path}");
Ok(serde_json::json!({ "output": output_path }))
}
// -----------------------------------------------------------------------
// Cache helpers
// -----------------------------------------------------------------------
/// Check if the WAV + sidecar on disk match the given text+voice.
fn is_cache_hit(&self, output_path: &str, text: &str, voice: &str) -> bool {
let meta_path = format!("{output_path}.meta");
if !Path::new(output_path).exists() || !Path::new(&meta_path).exists() {
return false;
}
match std::fs::read_to_string(&meta_path) {
Ok(contents) => contents == Self::cache_key(text, voice),
Err(_) => false,
}
}
/// Write the sidecar `.meta` file next to the WAV.
fn write_cache_meta(&self, output_path: &str, text: &str, voice: &str) {
let meta_path = format!("{output_path}.meta");
let _ = std::fs::write(&meta_path, Self::cache_key(text, voice));
}
/// Build the cache key from text + voice.
fn cache_key(text: &str, voice: &str) -> String {
format!("{}\0{}", text, voice)
}
}
/// Map voice name string to Kokoro Voice enum variant.