feat(proxy-engine): add live TTS streaming interactions and incoming number range support

This commit is contained in:
2026-04-14 18:52:13 +00:00
parent adfc4726fd
commit feb3514de4
13 changed files with 1476 additions and 358 deletions

View File

@@ -152,6 +152,7 @@ async fn handle_command(
"replace_leg" => handle_replace_leg(engine, out_tx, &cmd).await,
// Leg interaction and tool leg commands.
"start_interaction" => handle_start_interaction(engine, out_tx, &cmd).await,
"start_tts_interaction" => handle_start_tts_interaction(engine, out_tx, &cmd).await,
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
@@ -1138,6 +1139,79 @@ async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, c
// Leg interaction & tool leg commands
// ---------------------------------------------------------------------------
async fn run_interaction_command(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
call_id: String,
leg_id: String,
prompt_pcm_frames: Vec<Vec<f32>>,
prompt_stream_rx: Option<tokio::sync::mpsc::Receiver<tts::TtsStreamMessage>>,
prompt_cancel_tx: Option<tokio::sync::watch::Sender<bool>>,
expected_digits: Vec<char>,
timeout_ms: u32,
) {
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: leg_id.clone(),
prompt_pcm_frames,
prompt_stream_rx,
prompt_cancel_tx,
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
})
.await;
}
let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
let result = match tokio::time::timeout(safety_timeout, result_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled,
Err(_) => crate::mixer::InteractionResult::Timeout,
};
let (result_str, digit_str) = match &result {
crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())),
crate::mixer::InteractionResult::Timeout => ("timeout", None),
crate::mixer::InteractionResult::Cancelled => ("cancelled", None),
};
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
if let Some(leg) = call.legs.get_mut(&leg_id) {
leg.metadata.insert(
"last_interaction_result".to_string(),
serde_json::json!(result_str),
);
if let Some(ref d) = digit_str {
leg.metadata
.insert("last_interaction_digit".to_string(), serde_json::json!(d));
}
}
}
}
let mut resp = serde_json::json!({ "result": result_str });
if let Some(d) = digit_str {
resp["digit"] = serde_json::json!(d);
}
respond_ok(out_tx, &cmd.id, resp);
}
/// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF.
/// This command blocks until the interaction completes (digit, timeout, or cancel).
async fn handle_start_interaction(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
@@ -1175,7 +1249,6 @@ async fn handle_start_interaction(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutT
.and_then(|v| v.as_u64())
.unwrap_or(15000) as u32;
// Load prompt audio from WAV file.
let prompt_frames = match crate::audio_player::load_prompt_pcm_frames(&prompt_wav) {
Ok(f) => f,
Err(e) => {
@@ -1184,67 +1257,113 @@ async fn handle_start_interaction(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutT
}
};
// Create oneshot channel for the result.
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
run_interaction_command(
engine,
out_tx,
cmd,
call_id,
leg_id,
prompt_frames,
None,
None,
expected_digits,
timeout_ms,
)
.await;
}
// Send StartInteraction to the mixer.
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
/// Handle `start_tts_interaction` — isolate a leg, stream chunked TTS, and
/// start playback before the full prompt has rendered.
async fn handle_start_tts_interaction(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => {
respond_err(out_tx, &cmd.id, "missing call_id");
return;
}
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => {
respond_err(out_tx, &cmd.id, "missing leg_id");
return;
}
};
let text = match cmd.params.get("text").and_then(|v| v.as_str()) {
Some(s) if !s.trim().is_empty() => s.to_string(),
_ => {
respond_err(out_tx, &cmd.id, "missing text");
return;
}
};
let voice = cmd
.params
.get("voice")
.and_then(|v| v.as_str())
.unwrap_or("af_bella")
.to_string();
let model = cmd
.params
.get("model")
.and_then(|v| v.as_str())
.unwrap_or(tts::DEFAULT_MODEL_PATH)
.to_string();
let voices = cmd
.params
.get("voices")
.and_then(|v| v.as_str())
.unwrap_or(tts::DEFAULT_VOICES_PATH)
.to_string();
let expected_digits: Vec<char> = cmd
.params
.get("expected_digits")
.and_then(|v| v.as_str())
.unwrap_or("12")
.chars()
.collect();
let timeout_ms = cmd
.params
.get("timeout_ms")
.and_then(|v| v.as_u64())
.unwrap_or(15000) as u32;
let tts_engine = engine.lock().await.tts_engine.clone();
let live_prompt = {
let mut tts = tts_engine.lock().await;
match tts
.start_live_prompt(tts::TtsPromptRequest {
model_path: model,
voices_path: voices,
voice_name: voice,
text,
})
.await
{
Ok(prompt) => prompt,
Err(e) => {
respond_err(out_tx, &cmd.id, &e);
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: leg_id.clone(),
prompt_pcm_frames: prompt_frames,
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
})
.await;
} // engine lock released — we block on the oneshot, not the lock.
// Await the interaction result (blocks this task until complete).
let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
let result = match tokio::time::timeout(safety_timeout, result_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled, // oneshot dropped
Err(_) => crate::mixer::InteractionResult::Timeout, // safety timeout
};
// Store consent result in leg metadata.
let (result_str, digit_str) = match &result {
crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())),
crate::mixer::InteractionResult::Timeout => ("timeout", None),
crate::mixer::InteractionResult::Cancelled => ("cancelled", None),
};
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
if let Some(leg) = call.legs.get_mut(&leg_id) {
leg.metadata.insert(
"last_interaction_result".to_string(),
serde_json::json!(result_str),
);
if let Some(ref d) = digit_str {
leg.metadata
.insert("last_interaction_digit".to_string(), serde_json::json!(d));
}
}
}
}
};
let mut resp = serde_json::json!({ "result": result_str });
if let Some(d) = digit_str {
resp["digit"] = serde_json::json!(d);
}
respond_ok(out_tx, &cmd.id, resp);
run_interaction_command(
engine,
out_tx,
cmd,
call_id,
leg_id,
live_prompt.initial_frames,
Some(live_prompt.stream_rx),
Some(live_prompt.cancel_tx),
expected_digits,
timeout_ms,
)
.await;
}
/// Handle `add_tool_leg` — add a recording or transcription tool leg to a call.