feat(mixer): enhance mixer functionality with interaction and tool legs

- Updated mixer to handle participant and isolated leg roles, allowing for IVR and consent interactions.
- Introduced commands for starting and canceling interactions, managing tool legs for recording and transcription.
- Implemented per-source audio handling for tool legs, enabling separate audio processing.
- Enhanced DTMF handling to forward events between participant legs only.
- Added support for PCM recording directly from tool legs, with WAV file generation.
- Updated TypeScript definitions and functions to support new interaction and tool leg features.
This commit is contained in:
2026-04-10 14:54:21 +00:00
parent 6a130db7c7
commit 7d59361352
13 changed files with 1448 additions and 94 deletions

View File

@@ -1,4 +1,5 @@
//! Audio player — reads a WAV file and streams it as RTP packets.
//! Also provides prompt preparation for the leg interaction system.
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, TranscodeState};
@@ -8,6 +9,11 @@ use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::time::{self, Duration};
/// Mixing sample rate used by the mixer (must stay in sync with mixer::MIX_RATE).
const MIX_RATE: u32 = 16000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320;
/// Play a WAV file as RTP to a destination.
/// Returns when playback is complete.
pub async fn play_wav_file(
@@ -171,3 +177,64 @@ pub async fn play_beep(
Ok((seq, ts))
}
/// Load a WAV file and split it into 20ms PCM frames at 16kHz.
/// Used by the leg interaction system to prepare prompt audio for the mixer.
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
let path = Path::new(wav_path);
if !path.exists() {
return Err(format!("WAV file not found: {wav_path}"));
}
let mut reader =
hound::WavReader::open(path).map_err(|e| format!("open WAV {wav_path}: {e}"))?;
let spec = reader.spec();
let wav_rate = spec.sample_rate;
// Read all samples as i16.
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
reader
.samples::<i16>()
.filter_map(|s| s.ok())
.collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
.filter_map(|s| s.ok())
.map(|s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect()
} else {
return Err(format!(
"unsupported WAV format: {}bit {:?}",
spec.bits_per_sample, spec.sample_format
));
};
if samples.is_empty() {
return Ok(vec![]);
}
// Resample to MIX_RATE (16kHz) if needed.
let resampled = if wav_rate != MIX_RATE {
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
transcoder
.resample(&samples, wav_rate, MIX_RATE)
.map_err(|e| format!("resample: {e}"))?
} else {
samples
};
// Split into MIX_FRAME_SIZE (320) sample frames.
let mut frames = Vec::new();
let mut offset = 0;
while offset < resampled.len() {
let end = (offset + MIX_FRAME_SIZE).min(resampled.len());
let mut frame = resampled[offset..end].to_vec();
// Pad short final frame with silence.
frame.resize(MIX_FRAME_SIZE, 0);
frames.push(frame);
offset += MIX_FRAME_SIZE;
}
Ok(frames)
}

View File

@@ -5,6 +5,7 @@
use crate::mixer::{MixerCommand, RtpPacket};
use crate::sip_leg::SipLeg;
use sip_proto::message::SipMessage;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
@@ -59,6 +60,7 @@ pub enum LegKind {
SipDevice,
WebRtc,
Media, // voicemail playback, IVR, recording
Tool, // observer leg for recording, transcription, etc.
}
impl LegKind {
@@ -68,6 +70,7 @@ impl LegKind {
Self::SipDevice => "sip-device",
Self::WebRtc => "webrtc",
Self::Media => "media",
Self::Tool => "tool",
}
}
}
@@ -113,6 +116,10 @@ pub struct LegInfo {
pub remote_media: Option<SocketAddr>,
/// SIP signaling address (provider or device).
pub signaling_addr: Option<SocketAddr>,
/// Flexible key-value metadata (consent state, tool config, etc.).
/// Persisted into call history on call end.
pub metadata: HashMap<String, serde_json::Value>,
}
/// A multiparty call with N legs and a central mixer.
@@ -127,6 +134,10 @@ pub struct Call {
pub callee_number: Option<String>,
pub provider_id: String,
/// Original INVITE from the device (for device-originated outbound calls).
/// Used to construct proper 180/200/error responses back to the device.
pub device_invite: Option<SipMessage>,
/// All legs in this call, keyed by leg ID.
pub legs: HashMap<LegId, LegInfo>,
@@ -153,6 +164,7 @@ impl Call {
caller_number: None,
callee_number: None,
provider_id,
device_invite: None,
legs: HashMap::new(),
mixer_cmd_tx,
mixer_task: Some(mixer_task),
@@ -207,6 +219,13 @@ impl Call {
.values()
.filter(|l| l.state != LegState::Terminated)
.map(|l| {
let metadata: serde_json::Value = if l.metadata.is_empty() {
serde_json::json!({})
} else {
serde_json::Value::Object(
l.metadata.iter().map(|(k, v)| (k.clone(), v.clone())).collect(),
)
};
serde_json::json!({
"id": l.id,
"type": l.kind.as_str(),
@@ -214,6 +233,7 @@ impl Call {
"codec": sip_proto::helpers::codec_name(l.codec_pt),
"rtpPort": l.rtp_port,
"remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
"metadata": metadata,
})
})
.collect();

View File

@@ -12,8 +12,8 @@ use crate::mixer::spawn_mixer;
use crate::registrar::Registrar;
use crate::rtp::RtpPortPool;
use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig};
use sip_proto::helpers::{generate_call_id, parse_sdp_endpoint};
use sip_proto::message::SipMessage;
use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions};
use sip_proto::message::{ResponseOptions, SipMessage};
use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri};
use std::collections::HashMap;
use std::net::SocketAddr;
@@ -167,6 +167,16 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(leg_id) {
leg.state = LegState::Ringing;
}
// Forward 180 Ringing to device if this is a device-originated call.
if let Some(device_invite) = &call.device_invite {
let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice);
if let Some(dev) = device_leg {
if let Some(dev_addr) = dev.signaling_addr {
let ringing = SipMessage::create_response(180, "Ringing", device_invite, None);
let _ = socket.send_to(&ringing.serialize(), dev_addr).await;
}
}
}
}
emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id }));
emit_event(&self.out_tx, "leg_state_changed",
@@ -187,7 +197,7 @@ impl CallManager {
remote
};
// Wire the leg to the mixer if remote media is known.
// Wire the provider leg to the mixer if remote media is known.
if let (Some(remote_addr), Some(rtp_socket)) = (remote, rtp_socket_clone) {
let channels = create_leg_channels();
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
@@ -198,6 +208,66 @@ impl CallManager {
}
}
// For device-originated calls: send 200 OK to device and wire device leg.
if let Some(call) = self.calls.get(call_id) {
if let Some(device_invite) = call.device_invite.clone() {
let device_leg_info: Option<(SocketAddr, u16, Arc<UdpSocket>, Option<SocketAddr>, String)> =
call.legs.values().find(|l| l.kind == LegKind::SipDevice).and_then(|dev| {
Some((
dev.signaling_addr?,
dev.rtp_port,
dev.rtp_socket.clone()?,
dev.remote_media,
dev.id.clone(),
))
});
if let Some((dev_addr, dev_rtp_port, dev_rtp_socket, dev_remote, dev_leg_id)) = device_leg_info {
// Build SDP pointing device to our device_rtp port.
// Use LAN IP for the device (it's on the local network).
let call_ref = self.calls.get(call_id).unwrap();
let prov_leg = call_ref.legs.values().find(|l| l.kind == LegKind::SipProvider);
let lan_ip_str = prov_leg
.and_then(|l| l.sip_leg.as_ref())
.map(|sl| sl.config.lan_ip.clone())
.unwrap_or_else(|| "0.0.0.0".to_string());
let sdp = build_sdp(&SdpOptions {
ip: &lan_ip_str,
port: dev_rtp_port,
..Default::default()
});
let ok = SipMessage::create_response(200, "OK", &device_invite, Some(ResponseOptions {
to_tag: Some(generate_tag()),
contact: Some(format!("<sip:{}:{}>", lan_ip_str, 5060)),
body: Some(sdp),
content_type: Some("application/sdp".to_string()),
extra_headers: None,
}));
let _ = socket.send_to(&ok.serialize(), dev_addr).await;
// Update device leg state.
if let Some(call) = self.calls.get_mut(call_id) {
if let Some(dev_leg) = call.legs.get_mut(&dev_leg_id) {
dev_leg.state = LegState::Connected;
}
}
// Wire device leg to mixer.
if let Some(dev_remote_addr) = dev_remote {
let dev_channels = create_leg_channels();
spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx);
spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(&dev_leg_id, sip_pt, dev_channels.inbound_rx, dev_channels.outbound_tx)
.await;
}
}
}
}
}
emit_event(&self.out_tx, "call_answered", serde_json::json!({
"call_id": call_id,
"provider_media_addr": remote.map(|a| a.ip().to_string()),
@@ -209,6 +279,34 @@ impl CallManager {
}
SipLegAction::Terminated(reason) => {
let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0);
// Notify device if this is a device-originated outbound call.
if let Some(call) = self.calls.get(call_id) {
if let Some(device_invite) = &call.device_invite {
let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice);
if let Some(dev) = device_leg {
if let Some(dev_addr) = dev.signaling_addr {
// Map reason to SIP response code.
let code: u16 = if reason.starts_with("rejected_") {
reason.strip_prefix("rejected_")
.and_then(|s| s.parse().ok())
.unwrap_or(503)
} else if reason == "bye" {
// Provider sent BYE — send BYE to device too.
// (200 OK already connected; just let terminate_call handle it)
0
} else {
503
};
if code > 0 && dev.state != LegState::Connected {
let resp = SipMessage::create_response(code, "Service Unavailable", device_invite, None);
let _ = socket.send_to(&resp.serialize(), dev_addr).await;
}
}
}
}
}
if let Some(call) = self.calls.get_mut(call_id) {
if let Some(leg) = call.legs.get_mut(leg_id) {
leg.state = LegState::Terminated;
@@ -277,13 +375,48 @@ impl CallManager {
// Get this leg's RTP port (for SDP rewriting — tell the other side to send RTP here).
let this_rtp_port = call.legs.get(this_leg_id).map(|l| l.rtp_port).unwrap_or(0);
// Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt).
let other_has_sip_leg = call.legs.get(&other_leg_id)
.map(|l| l.sip_leg.is_some())
.unwrap_or(false);
if msg.is_request() {
let method = msg.method().unwrap_or("");
// ACK: In hybrid B2BUA mode, the device's ACK for our 200 OK
// is absorbed silently (provider's 200 was already ACKed by SipLeg).
if method == "ACK" {
if other_has_sip_leg {
return true; // Absorb — provider ACK handled by SipLeg.
}
// Pure passthrough: forward ACK normally.
let _ = socket.send_to(&msg.serialize(), forward_to).await;
return true;
}
// INVITE retransmit: the call already exists, re-send 100 Trying.
if method == "INVITE" {
let trying = SipMessage::create_response(100, "Trying", msg, None);
let _ = socket.send_to(&trying.serialize(), from_addr).await;
return true;
}
if method == "BYE" {
let ok = SipMessage::create_response(200, "OK", msg, None);
let _ = socket.send_to(&ok.serialize(), from_addr).await;
let _ = socket.send_to(&msg.serialize(), forward_to).await;
// If other leg has SipLeg, use build_hangup for proper dialog teardown.
if other_has_sip_leg {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
} else {
let _ = socket.send_to(&msg.serialize(), forward_to).await;
}
let duration = call.duration_secs();
emit_event(
@@ -302,7 +435,19 @@ impl CallManager {
if method == "CANCEL" {
let ok = SipMessage::create_response(200, "OK", msg, None);
let _ = socket.send_to(&ok.serialize(), from_addr).await;
let _ = socket.send_to(&msg.serialize(), forward_to).await;
// If other leg has SipLeg, use build_hangup (produces CANCEL for early dialog).
if other_has_sip_leg {
if let Some(other) = call.legs.get_mut(&other_leg_id) {
if let Some(sip_leg) = &mut other.sip_leg {
if let Some(hangup_buf) = sip_leg.build_hangup() {
let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await;
}
}
}
} else {
let _ = socket.send_to(&msg.serialize(), forward_to).await;
}
let duration = call.duration_secs();
emit_event(
@@ -559,6 +704,7 @@ impl CallManager {
rtp_port: provider_rtp.port,
remote_media: provider_media,
signaling_addr: Some(from_addr),
metadata: HashMap::new(),
},
);
@@ -578,6 +724,7 @@ impl CallManager {
rtp_port: device_rtp.port,
remote_media: None, // Learned from device's 200 OK.
signaling_addr: Some(device_addr),
metadata: HashMap::new(),
},
);
@@ -686,6 +833,7 @@ impl CallManager {
rtp_port: rtp_alloc.port,
remote_media: None,
signaling_addr: Some(provider_dest),
metadata: HashMap::new(),
},
);
@@ -697,8 +845,12 @@ impl CallManager {
Some(call_id)
}
/// Create an outbound passthrough call (device → provider).
pub async fn create_outbound_passthrough(
/// Create a device-originated outbound call (device → provider) using hybrid B2BUA.
///
/// The device side is a simple passthrough leg (no SipLeg needed).
/// The provider side uses a full SipLeg for proper dialog management,
/// 407 auth, correct From URI, and public IP in SDP.
pub async fn create_device_outbound_call(
&mut self,
invite: &SipMessage,
from_addr: SocketAddr,
@@ -707,19 +859,28 @@ impl CallManager {
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
registered_aor: &str,
) -> Option<String> {
let call_id = self.next_call_id();
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let pub_ip = public_ip.unwrap_or(lan_ip.as_str());
let sip_call_id = invite.call_id().to_string();
let callee = invite.request_uri().unwrap_or("").to_string();
let device_sip_call_id = invite.call_id().to_string();
let dialed_number = invite
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or(invite.request_uri().unwrap_or(""))
.to_string();
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
Some(a) => a,
None => return None,
};
// Send 100 Trying to device immediately to stop retransmissions.
let trying = SipMessage::create_response(100, "Trying", invite, None);
let _ = socket.send_to(&trying.serialize(), from_addr).await;
// Allocate RTP ports for both legs.
let device_rtp = match rtp_pool.allocate().await {
Some(a) => a,
@@ -741,9 +902,10 @@ impl CallManager {
mixer_cmd_tx,
mixer_task,
);
call.callee_number = Some(callee);
call.callee_number = Some(dialed_number.clone());
call.device_invite = Some(invite.clone());
// Device leg.
// --- Device leg (passthrough, no SipLeg) ---
let device_leg_id = format!("{call_id}-dev");
let mut device_media: Option<SocketAddr> = None;
if invite.has_sdp_body() {
@@ -759,20 +921,45 @@ impl CallManager {
LegInfo {
id: device_leg_id.clone(),
kind: LegKind::SipDevice,
state: LegState::Connected,
state: LegState::Inviting, // Not connected yet — waiting for provider answer
codec_pt,
sip_leg: None,
sip_call_id: Some(sip_call_id.clone()),
sip_call_id: Some(device_sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(device_rtp.socket.clone()),
rtp_port: device_rtp.port,
remote_media: device_media,
signaling_addr: Some(from_addr),
metadata: HashMap::new(),
},
);
// Provider leg.
// Register device's SIP Call-ID → device leg.
self.sip_index
.insert(device_sip_call_id, (call_id.clone(), device_leg_id));
// --- Provider leg (B2BUA with SipLeg) ---
let provider_leg_id = format!("{call_id}-prov");
let provider_sip_call_id = generate_call_id(None);
let leg_config = SipLegConfig {
lan_ip: lan_ip.clone(),
lan_port,
public_ip: public_ip.map(|s| s.to_string()),
sip_target: provider_dest,
username: Some(provider_config.username.clone()),
password: Some(provider_config.password.clone()),
registered_aor: Some(registered_aor.to_string()),
codecs: provider_config.codecs.clone(),
rtp_port: provider_rtp.port,
};
let mut sip_leg = SipLeg::new(provider_leg_id.clone(), leg_config);
// Build proper To URI and send INVITE.
let to_uri = format!("sip:{}@{}", dialed_number, provider_config.domain);
sip_leg.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket).await;
call.legs.insert(
provider_leg_id.clone(),
LegInfo {
@@ -780,38 +967,20 @@ impl CallManager {
kind: LegKind::SipProvider,
state: LegState::Inviting,
codec_pt,
sip_leg: None,
sip_call_id: Some(sip_call_id.clone()),
sip_leg: Some(sip_leg),
sip_call_id: Some(provider_sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(provider_rtp.socket.clone()),
rtp_port: provider_rtp.port,
remote_media: None,
signaling_addr: Some(provider_dest),
metadata: HashMap::new(),
},
);
// Register provider's SIP Call-ID → provider leg.
self.sip_index
.insert(sip_call_id.clone(), (call_id.clone(), device_leg_id));
// Forward INVITE to provider with SDP rewriting.
let mut fwd_invite = invite.clone();
fwd_invite.prepend_header("Record-Route", &format!("<sip:{lan_ip}:{lan_port};lr>"));
if let Some(contact) = fwd_invite.get_header("Contact").map(|s| s.to_string()) {
let new_contact = rewrite_sip_uri(&contact, pub_ip, lan_port);
if new_contact != contact {
fwd_invite.set_header("Contact", &new_contact);
}
}
// Tell provider to send RTP to our provider_rtp port.
if fwd_invite.has_sdp_body() {
let (new_body, _) = rewrite_sdp(&fwd_invite.body, pub_ip, provider_rtp.port);
fwd_invite.body = new_body;
fwd_invite.update_content_length();
}
let _ = socket.send_to(&fwd_invite.serialize(), provider_dest).await;
.insert(provider_sip_call_id, (call_id.clone(), provider_leg_id));
self.calls.insert(call_id.clone(), call);
Some(call_id)
@@ -872,6 +1041,7 @@ impl CallManager {
rtp_port: rtp_alloc.port,
remote_media: None,
signaling_addr: Some(provider_dest),
metadata: HashMap::new(),
};
self.sip_index
@@ -1099,6 +1269,7 @@ impl CallManager {
rtp_port: rtp_alloc.port,
remote_media: Some(provider_media),
signaling_addr: Some(from_addr),
metadata: HashMap::new(),
},
);

View File

@@ -50,11 +50,13 @@ pub fn spawn_sip_inbound(
continue; // Too small for RTP header.
}
let pt = buf[1] & 0x7F;
let marker = (buf[1] & 0x80) != 0;
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
let payload = buf[12..n].to_vec();
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt }).await.is_err() {
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, timestamp }).await.is_err() {
break; // Channel closed — leg removed.
}
}

View File

@@ -20,6 +20,7 @@ mod registrar;
mod rtp;
mod sip_leg;
mod sip_transport;
mod tool_leg;
mod voicemail;
mod webrtc_engine;
@@ -141,6 +142,11 @@ async fn handle_command(
"webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await,
// webrtc_link needs both: engine (for mixer channels) and webrtc (for session).
"webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await,
// Leg interaction and tool leg commands.
"start_interaction" => handle_start_interaction(engine, out_tx, &cmd).await,
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
}
@@ -373,11 +379,14 @@ async fn handle_sip_packet(
);
if let Some(route) = route_result {
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
// Look up provider state by config ID (not by device address).
let (public_ip, registered_aor) = if let Some(ps_arc) =
eng.provider_mgr.find_by_provider_id(&route.provider.id).await
{
let ps = ps_arc.lock().await;
ps.public_ip.clone()
(ps.public_ip.clone(), ps.registered_aor.clone())
} else {
None
(None, format!("sip:{}@{}", route.provider.username, route.provider.domain))
};
let ProxyEngine {
@@ -387,7 +396,7 @@ async fn handle_sip_packet(
} = *eng;
let rtp_pool = rtp_pool.as_mut().unwrap();
let call_id = call_mgr
.create_outbound_passthrough(
.create_device_outbound_call(
&msg,
from_addr,
&route.provider,
@@ -395,6 +404,7 @@ async fn handle_sip_packet(
rtp_pool,
socket,
public_ip.as_deref(),
&registered_aor,
)
.await;
@@ -645,6 +655,7 @@ async fn handle_webrtc_link(
rtp_port: 0,
remote_media: None,
signaling_addr: None,
metadata: std::collections::HashMap::new(),
},
);
}
@@ -773,3 +784,319 @@ async fn handle_webrtc_close(webrtc: Arc<Mutex<WebRtcEngine>>, out_tx: &OutTx, c
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}
// ---------------------------------------------------------------------------
// Leg interaction & tool leg commands
// ---------------------------------------------------------------------------
/// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF.
/// This command blocks until the interaction completes (digit, timeout, or cancel).
async fn handle_start_interaction(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let prompt_wav = match cmd.params.get("prompt_wav").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing prompt_wav"); return; }
};
let expected_digits: Vec<char> = cmd
.params
.get("expected_digits")
.and_then(|v| v.as_str())
.unwrap_or("12")
.chars()
.collect();
let timeout_ms = cmd
.params
.get("timeout_ms")
.and_then(|v| v.as_u64())
.unwrap_or(15000) as u32;
// Load prompt audio from WAV file.
let prompt_frames = match crate::audio_player::load_prompt_pcm_frames(&prompt_wav) {
Ok(f) => f,
Err(e) => {
respond_err(out_tx, &cmd.id, &format!("prompt load failed: {e}"));
return;
}
};
// Create oneshot channel for the result.
let (result_tx, result_rx) = tokio::sync::oneshot::channel();
// Send StartInteraction to the mixer.
{
let eng = engine.lock().await;
let call = match eng.call_mgr.calls.get(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::StartInteraction {
leg_id: leg_id.clone(),
prompt_pcm_frames: prompt_frames,
expected_digits: expected_digits.clone(),
timeout_ms,
result_tx,
})
.await;
} // engine lock released — we block on the oneshot, not the lock.
// Await the interaction result (blocks this task until complete).
let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000);
let result = match tokio::time::timeout(safety_timeout, result_rx).await {
Ok(Ok(r)) => r,
Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled, // oneshot dropped
Err(_) => crate::mixer::InteractionResult::Timeout, // safety timeout
};
// Store consent result in leg metadata.
let (result_str, digit_str) = match &result {
crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())),
crate::mixer::InteractionResult::Timeout => ("timeout", None),
crate::mixer::InteractionResult::Cancelled => ("cancelled", None),
};
{
let mut eng = engine.lock().await;
if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) {
if let Some(leg) = call.legs.get_mut(&leg_id) {
leg.metadata.insert(
"last_interaction_result".to_string(),
serde_json::json!(result_str),
);
if let Some(ref d) = digit_str {
leg.metadata.insert(
"last_interaction_digit".to_string(),
serde_json::json!(d),
);
}
}
}
}
let mut resp = serde_json::json!({ "result": result_str });
if let Some(d) = digit_str {
resp["digit"] = serde_json::json!(d);
}
respond_ok(out_tx, &cmd.id, resp);
}
/// Handle `add_tool_leg` — add a recording or transcription tool leg to a call.
async fn handle_add_tool_leg(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let tool_type_str = match cmd.params.get("tool_type").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing tool_type"); return; }
};
let tool_type = match tool_type_str.as_str() {
"recording" => crate::mixer::ToolType::Recording,
"transcription" => crate::mixer::ToolType::Transcription,
other => {
respond_err(out_tx, &cmd.id, &format!("unknown tool_type: {other}"));
return;
}
};
let tool_leg_id = format!("{call_id}-tool-{}", rand::random::<u32>());
// Spawn the appropriate background task.
let (audio_tx, _task_handle) = match tool_type {
crate::mixer::ToolType::Recording => {
let base_dir = cmd
.params
.get("config")
.and_then(|c| c.get("base_dir"))
.and_then(|v| v.as_str())
.unwrap_or(".nogit/recordings")
.to_string();
crate::tool_leg::spawn_recording_tool(
tool_leg_id.clone(),
call_id.clone(),
base_dir,
out_tx.clone(),
)
}
crate::mixer::ToolType::Transcription => {
crate::tool_leg::spawn_transcription_tool(
tool_leg_id.clone(),
call_id.clone(),
out_tx.clone(),
)
}
};
// Send AddToolLeg to the mixer and register in call.
{
let mut eng = engine.lock().await;
let call = match eng.call_mgr.calls.get_mut(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::AddToolLeg {
leg_id: tool_leg_id.clone(),
tool_type,
audio_tx,
})
.await;
// Register tool leg in the call's leg map.
let mut metadata = std::collections::HashMap::new();
metadata.insert(
"tool_type".to_string(),
serde_json::json!(tool_type_str),
);
call.legs.insert(
tool_leg_id.clone(),
crate::call::LegInfo {
id: tool_leg_id.clone(),
kind: crate::call::LegKind::Tool,
state: crate::call::LegState::Connected,
codec_pt: 0,
sip_leg: None,
sip_call_id: None,
webrtc_session_id: None,
rtp_socket: None,
rtp_port: 0,
remote_media: None,
signaling_addr: None,
metadata,
},
);
}
emit_event(
out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": tool_leg_id,
"kind": "tool",
"tool_type": tool_type_str,
"state": "connected",
}),
);
respond_ok(
out_tx,
&cmd.id,
serde_json::json!({ "tool_leg_id": tool_leg_id }),
);
}
/// Handle `remove_tool_leg` — remove a tool leg from a call.
async fn handle_remove_tool_leg(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let tool_leg_id = match cmd.params.get("tool_leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing tool_leg_id"); return; }
};
let mut eng = engine.lock().await;
let call = match eng.call_mgr.calls.get_mut(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
// Remove from mixer (drops audio_tx → background task finalizes).
let _ = call
.mixer_cmd_tx
.send(crate::mixer::MixerCommand::RemoveToolLeg {
leg_id: tool_leg_id.clone(),
})
.await;
// Remove from call's leg map.
call.legs.remove(&tool_leg_id);
emit_event(
out_tx,
"leg_removed",
serde_json::json!({
"call_id": call_id,
"leg_id": tool_leg_id,
}),
);
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
}
/// Handle `set_leg_metadata` — set a metadata key on a leg.
async fn handle_set_leg_metadata(
engine: Arc<Mutex<ProxyEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; }
};
let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; }
};
let key = match cmd.params.get("key").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => { respond_err(out_tx, &cmd.id, "missing key"); return; }
};
let value = match cmd.params.get("value") {
Some(v) => v.clone(),
None => { respond_err(out_tx, &cmd.id, "missing value"); return; }
};
let mut eng = engine.lock().await;
let call = match eng.call_mgr.calls.get_mut(&call_id) {
Some(c) => c,
None => {
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
return;
}
};
let leg = match call.legs.get_mut(&leg_id) {
Some(l) => l,
None => {
respond_err(out_tx, &cmd.id, &format!("leg {leg_id} not found"));
return;
}
};
leg.metadata.insert(key, value);
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
}

View File

@@ -5,14 +5,17 @@
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
//! 2. Compute total mix (sum of all legs' PCM as i32)
//! 3. For each leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 2. Compute total mix (sum of all **participant** legs' PCM as i32)
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
//! 5. For each tool leg: send per-source unmerged audio batch
//! 6. Forward DTMF between participant legs only
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, TranscodeState};
use std::collections::HashMap;
use tokio::sync::mpsc;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
@@ -25,11 +28,84 @@ const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
pub struct RtpPacket {
pub payload: Vec<u8>,
pub payload_type: u8,
/// RTP marker bit (first packet of a DTMF event, etc.).
pub marker: bool,
/// RTP timestamp from the original packet header.
pub timestamp: u32,
}
// ---------------------------------------------------------------------------
// Leg roles
// ---------------------------------------------------------------------------
/// What role a leg currently plays in the mixer.
enum LegRole {
/// Normal participant: contributes to mix, receives mix-minus.
Participant,
/// Temporarily isolated for IVR/consent interaction.
Isolated(IsolationState),
}
struct IsolationState {
/// PCM frames at MIX_RATE (320 samples each) queued for playback.
prompt_frames: VecDeque<Vec<i16>>,
/// Digits that complete the interaction (e.g., ['1', '2']).
expected_digits: Vec<char>,
/// Ticks remaining before timeout (decremented each tick after prompt ends).
timeout_ticks_remaining: u32,
/// Whether we've finished playing the prompt.
prompt_done: bool,
/// Channel to send the result back to the command handler.
result_tx: Option<oneshot::Sender<InteractionResult>>,
}
/// Result of a leg interaction (consent prompt, IVR, etc.).
pub enum InteractionResult {
/// The participant pressed one of the expected digits.
Digit(char),
/// No digit was received within the timeout.
Timeout,
/// The leg was removed or the call tore down before completion.
Cancelled,
}
// ---------------------------------------------------------------------------
// Tool legs
// ---------------------------------------------------------------------------
/// Type of tool leg.
#[derive(Debug, Clone, Copy)]
pub enum ToolType {
Recording,
Transcription,
}
/// Per-source audio delivered to a tool leg each mixer tick.
pub struct ToolAudioBatch {
pub sources: Vec<ToolAudioSource>,
}
/// One participant's 20ms audio frame.
pub struct ToolAudioSource {
pub leg_id: String,
/// PCM at 16kHz, MIX_FRAME_SIZE (320) samples.
pub pcm_16k: Vec<i16>,
}
/// Internal storage for a tool leg inside the mixer.
struct ToolLegSlot {
#[allow(dead_code)]
tool_type: ToolType,
audio_tx: mpsc::Sender<ToolAudioBatch>,
}
// ---------------------------------------------------------------------------
// Commands
// ---------------------------------------------------------------------------
/// Commands sent to the mixer task via a control channel.
pub enum MixerCommand {
/// Add a new leg to the mix.
/// Add a new participant leg to the mix.
AddLeg {
leg_id: String,
codec_pt: u8,
@@ -40,8 +116,35 @@ pub enum MixerCommand {
RemoveLeg { leg_id: String },
/// Shut down the mixer.
Shutdown,
/// Isolate a leg and start an interaction (consent prompt, IVR).
/// The leg is removed from the mix and hears the prompt instead.
/// DTMF from the leg is checked against expected_digits.
StartInteraction {
leg_id: String,
/// PCM frames at MIX_RATE (16kHz), each 320 samples.
prompt_pcm_frames: Vec<Vec<i16>>,
expected_digits: Vec<char>,
timeout_ms: u32,
result_tx: oneshot::Sender<InteractionResult>,
},
/// Cancel an in-progress interaction (e.g., leg being removed).
CancelInteraction { leg_id: String },
/// Add a tool leg that receives per-source unmerged audio.
AddToolLeg {
leg_id: String,
tool_type: ToolType,
audio_tx: mpsc::Sender<ToolAudioBatch>,
},
/// Remove a tool leg (drops the channel, background task finalizes).
RemoveToolLeg { leg_id: String },
}
// ---------------------------------------------------------------------------
// Mixer internals
// ---------------------------------------------------------------------------
/// Internal per-leg state inside the mixer.
struct MixerLegSlot {
codec_pt: u8,
@@ -56,6 +159,8 @@ struct MixerLegSlot {
rtp_seq: u16,
rtp_ts: u32,
rtp_ssrc: u32,
/// Current role of this leg in the mixer.
role: LegRole,
}
/// Spawn the mixer task for a call. Returns the command sender and task handle.
@@ -79,13 +184,14 @@ async fn mixer_loop(
out_tx: OutTx,
) {
let mut legs: HashMap<String, MixerLegSlot> = HashMap::new();
let mut tool_legs: HashMap<String, ToolLegSlot> = HashMap::new();
let mut interval = time::interval(Duration::from_millis(20));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
interval.tick().await;
// 1. Process control commands (non-blocking).
// ── 1. Process control commands (non-blocking). ─────────────
loop {
match cmd_rx.try_recv() {
Ok(MixerCommand::AddLeg {
@@ -121,38 +227,115 @@ async fn mixer_loop(
rtp_seq: 0,
rtp_ts: 0,
rtp_ssrc: rand::random(),
role: LegRole::Participant,
},
);
}
Ok(MixerCommand::RemoveLeg { leg_id }) => {
// If the leg is isolated, send Cancelled before dropping.
if let Some(slot) = legs.get_mut(&leg_id) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
}
legs.remove(&leg_id);
// Channels drop → I/O tasks exit cleanly.
}
Ok(MixerCommand::Shutdown) => return,
Ok(MixerCommand::Shutdown) => {
// Cancel all outstanding interactions before shutting down.
for slot in legs.values_mut() {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
}
return;
}
Ok(MixerCommand::StartInteraction {
leg_id,
prompt_pcm_frames,
expected_digits,
timeout_ms,
result_tx,
}) => {
if let Some(slot) = legs.get_mut(&leg_id) {
// Cancel any existing interaction first.
if let LegRole::Isolated(ref mut old_state) = slot.role {
if let Some(tx) = old_state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
let timeout_ticks = timeout_ms / 20;
slot.role = LegRole::Isolated(IsolationState {
prompt_frames: VecDeque::from(prompt_pcm_frames),
expected_digits,
timeout_ticks_remaining: timeout_ticks,
prompt_done: false,
result_tx: Some(result_tx),
});
} else {
// Leg not found — immediately cancel.
let _ = result_tx.send(InteractionResult::Cancelled);
}
}
Ok(MixerCommand::CancelInteraction { leg_id }) => {
if let Some(slot) = legs.get_mut(&leg_id) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(InteractionResult::Cancelled);
}
}
slot.role = LegRole::Participant;
}
}
Ok(MixerCommand::AddToolLeg {
leg_id,
tool_type,
audio_tx,
}) => {
tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx });
}
Ok(MixerCommand::RemoveToolLeg { leg_id }) => {
tool_legs.remove(&leg_id);
// Dropping the ToolLegSlot drops audio_tx → background task sees channel close.
}
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => return,
}
}
if legs.is_empty() {
if legs.is_empty() && tool_legs.is_empty() {
continue;
}
// 2. Drain inbound packets, decode to 16kHz PCM.
// ── 2. Drain inbound packets, decode to 16kHz PCM. ─────────
// DTMF (PT 101) packets are collected separately.
let leg_ids: Vec<String> = legs.keys().cloned().collect();
let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new();
for lid in &leg_ids {
let slot = legs.get_mut(lid).unwrap();
// Drain channel, keep only the latest packet (simple jitter handling).
let mut latest: Option<RtpPacket> = None;
// Drain channel — collect DTMF packets separately, keep latest audio.
let mut latest_audio: Option<RtpPacket> = None;
loop {
match slot.inbound_rx.try_recv() {
Ok(pkt) => latest = Some(pkt),
Ok(pkt) => {
if pkt.payload_type == 101 {
// DTMF telephone-event: collect for processing.
dtmf_forward.push((lid.clone(), pkt));
} else {
latest_audio = Some(pkt);
}
}
Err(_) => break,
}
}
if let Some(pkt) = latest {
if let Some(pkt) = latest_audio {
slot.silent_ticks = 0;
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
@@ -174,6 +357,9 @@ async fn mixer_loop(
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
}
}
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
// Got DTMF but no audio — don't bump silent_ticks (DTMF counts as activity).
slot.silent_ticks = 0;
} else {
slot.silent_ticks += 1;
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
@@ -183,50 +369,210 @@ async fn mixer_loop(
}
}
// 3. Compute total mix (sum of all legs as i32 to avoid overflow).
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
for slot in legs.values() {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as i32;
if matches!(slot.role, LegRole::Participant) {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as i32;
}
}
}
// 4. For each leg: mix-minus, resample, encode, send.
for slot in legs.values_mut() {
// Mix-minus: total minus this leg's own contribution.
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as i32).clamp(-32768, 32767) as i16;
mix_minus.push(sample);
// ── 4. Per-leg output. ──────────────────────────────────────
// Collect interaction completions to apply after the loop
// (can't mutate role while iterating mutably for encode).
let mut completed_interactions: Vec<(String, InteractionResult)> = Vec::new();
for (lid, slot) in legs.iter_mut() {
match &mut slot.role {
LegRole::Participant => {
// Mix-minus: total minus this leg's own contribution.
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32)
.clamp(-32768, 32767) as i16;
mix_minus.push(sample);
}
// Resample from 16kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Encode to the leg's codec.
let encoded =
match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
// Build RTP packet with header.
let header =
build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc);
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
// Non-blocking send — drop frame if channel is full.
let _ = slot.outbound_tx.try_send(rtp);
}
LegRole::Isolated(state) => {
// Check for DTMF digit from this leg.
let mut matched_digit: Option<char> = None;
for (src_lid, dtmf_pkt) in &dtmf_forward {
if src_lid == lid && dtmf_pkt.payload.len() >= 4 {
let event_id = dtmf_pkt.payload[0];
let end_bit = (dtmf_pkt.payload[1] & 0x80) != 0;
if end_bit {
const EVENT_CHARS: &[char] = &[
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#',
'A', 'B', 'C', 'D',
];
if let Some(&ch) = EVENT_CHARS.get(event_id as usize) {
if state.expected_digits.contains(&ch) {
matched_digit = Some(ch);
break;
}
}
}
}
}
if let Some(digit) = matched_digit {
// Interaction complete — digit matched.
completed_interactions
.push((lid.clone(), InteractionResult::Digit(digit)));
} else {
// Play prompt frame or silence.
let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() {
frame
} else {
state.prompt_done = true;
vec![0i16; MIX_FRAME_SIZE]
};
// Encode prompt frame to the leg's codec (reuses existing encode path).
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
pcm_frame
} else {
slot.transcoder
.resample(&pcm_frame, MIX_RATE, target_rate)
.unwrap_or_default()
};
if let Ok(encoded) =
slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt)
{
if !encoded.is_empty() {
let header = build_rtp_header(
slot.codec_pt,
slot.rtp_seq,
slot.rtp_ts,
slot.rtp_ssrc,
);
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot
.rtp_ts
.wrapping_add(rtp_clock_increment(slot.codec_pt));
let _ = slot.outbound_tx.try_send(rtp);
}
}
// Check timeout (only after prompt finishes).
if state.prompt_done {
if state.timeout_ticks_remaining == 0 {
completed_interactions
.push((lid.clone(), InteractionResult::Timeout));
} else {
state.timeout_ticks_remaining -= 1;
}
}
}
}
}
}
// Resample from 16kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Apply completed interactions — revert legs to Participant.
for (lid, result) in completed_interactions {
if let Some(slot) = legs.get_mut(&lid) {
if let LegRole::Isolated(ref mut state) = slot.role {
if let Some(tx) = state.result_tx.take() {
let _ = tx.send(result);
}
}
slot.role = LegRole::Participant;
}
}
// Encode to the leg's codec.
let encoded = match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
// ── 5. Distribute per-source audio to tool legs. ────────────
if !tool_legs.is_empty() {
// Collect participant PCM frames (computed in step 2).
let sources: Vec<ToolAudioSource> = legs
.iter()
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
.map(|(lid, s)| ToolAudioSource {
leg_id: lid.clone(),
pcm_16k: s.last_pcm_frame.clone(),
})
.collect();
// Build RTP packet with header.
let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc);
let mut rtp = header.to_vec();
rtp.extend_from_slice(&encoded);
for tool in tool_legs.values() {
let batch = ToolAudioBatch {
sources: sources
.iter()
.map(|s| ToolAudioSource {
leg_id: s.leg_id.clone(),
pcm_16k: s.pcm_16k.clone(),
})
.collect(),
};
// Non-blocking send — drop batch if tool can't keep up.
let _ = tool.audio_tx.try_send(batch);
}
}
slot.rtp_seq = slot.rtp_seq.wrapping_add(1);
slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt));
// Non-blocking send — drop frame if channel is full.
let _ = slot.outbound_tx.try_send(rtp);
// ── 6. Forward DTMF packets between participant legs only. ──
for (source_lid, dtmf_pkt) in &dtmf_forward {
// Skip if the source is an isolated leg (its DTMF was handled in step 4).
if let Some(src_slot) = legs.get(source_lid) {
if matches!(src_slot.role, LegRole::Isolated(_)) {
continue;
}
}
for (target_lid, target_slot) in legs.iter_mut() {
if target_lid == source_lid {
continue; // Don't echo DTMF back to sender.
}
// Don't forward to isolated legs.
if matches!(target_slot.role, LegRole::Isolated(_)) {
continue;
}
let mut header = build_rtp_header(
101,
target_slot.rtp_seq,
target_slot.rtp_ts,
target_slot.rtp_ssrc,
);
if dtmf_pkt.marker {
header[1] |= 0x80; // Set marker bit.
}
let mut rtp_out = header.to_vec();
rtp_out.extend_from_slice(&dtmf_pkt.payload);
target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1);
// Don't increment rtp_ts for DTMF — it shares timestamp context with audio.
let _ = target_slot.outbound_tx.try_send(rtp_out);
}
}
}
}

View File

@@ -321,6 +321,17 @@ impl ProviderManager {
None
}
/// Find a provider by its config ID (e.g. "easybell").
pub async fn find_by_provider_id(&self, provider_id: &str) -> Option<Arc<Mutex<ProviderState>>> {
for ps_arc in &self.providers {
let ps = ps_arc.lock().await;
if ps.config.id == provider_id {
return Some(ps_arc.clone());
}
}
None
}
/// Check if a provider is currently registered.
pub async fn is_registered(&self, provider_id: &str) -> bool {
for ps_arc in &self.providers {

View File

@@ -55,6 +55,56 @@ impl Recorder {
})
}
/// Create a recorder that writes raw PCM at a given sample rate.
/// Used by tool legs that already have decoded PCM (no RTP processing needed).
pub fn new_pcm(file_path: &str, sample_rate: u32, max_duration_ms: Option<u64>) -> Result<Self, String> {
if let Some(parent) = Path::new(file_path).parent() {
std::fs::create_dir_all(parent)
.map_err(|e| format!("create dir: {e}"))?;
}
let spec = hound::WavSpec {
channels: 1,
sample_rate,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let writer = hound::WavWriter::create(file_path, spec)
.map_err(|e| format!("create WAV {file_path}: {e}"))?;
// source_pt is unused for PCM recording; set to 0.
let transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
let max_samples = max_duration_ms.map(|ms| (sample_rate as u64 * ms) / 1000);
Ok(Self {
writer,
transcoder,
source_pt: 0,
total_samples: 0,
sample_rate,
max_samples,
file_path: file_path.to_string(),
})
}
/// Write raw PCM samples directly (no RTP decoding).
/// Returns true if recording should continue, false if max duration reached.
pub fn write_pcm(&mut self, samples: &[i16]) -> bool {
for &sample in samples {
if self.writer.write_sample(sample).is_err() {
return false;
}
self.total_samples += 1;
if let Some(max) = self.max_samples {
if self.total_samples >= max {
return false;
}
}
}
true
}
/// Process an incoming RTP packet (full packet with header).
/// Returns true if recording should continue, false if max duration reached.
pub fn process_rtp(&mut self, data: &[u8]) -> bool {

View File

@@ -0,0 +1,138 @@
//! Tool leg consumers — background tasks that process per-source unmerged audio.
//!
//! Tool legs are observer legs that receive individual audio streams from each
//! participant in a call. The mixer pipes `ToolAudioBatch` every 20ms containing
//! each participant's decoded PCM@16kHz tagged with source leg ID.
//!
//! Consumers:
//! - **Recording**: writes per-source WAV files for speaker-separated recording.
//! - **Transcription**: stub for future Whisper integration (accumulates audio in Rust).
use crate::ipc::{emit_event, OutTx};
use crate::mixer::ToolAudioBatch;
use crate::recorder::Recorder;
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
// ---------------------------------------------------------------------------
// Recording consumer
// ---------------------------------------------------------------------------
/// Spawn a recording tool leg that writes per-source WAV files.
///
/// Returns the channel sender (for the mixer to send batches) and the task handle.
/// When the channel is closed (tool leg removed), all WAV files are finalized
/// and a `tool_recording_done` event is emitted.
pub fn spawn_recording_tool(
tool_leg_id: String,
call_id: String,
base_dir: String,
out_tx: OutTx,
) -> (mpsc::Sender<ToolAudioBatch>, JoinHandle<()>) {
let (tx, mut rx) = mpsc::channel::<ToolAudioBatch>(64);
let handle = tokio::spawn(async move {
let mut recorders: HashMap<String, Recorder> = HashMap::new();
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
// Skip silence-only frames (all zeros = no audio activity).
let has_audio = source.pcm_16k.iter().any(|&s| s != 0);
if !has_audio && !recorders.contains_key(&source.leg_id) {
continue; // Don't create a file for silence-only sources.
}
let recorder = recorders.entry(source.leg_id.clone()).or_insert_with(|| {
let path = format!("{}/{}-{}.wav", base_dir, call_id, source.leg_id);
Recorder::new_pcm(&path, 16000, None).unwrap_or_else(|e| {
panic!("failed to create recorder for {}: {e}", source.leg_id);
})
});
if !recorder.write_pcm(&source.pcm_16k) {
// Max duration reached — stop recording this source.
break;
}
}
}
// Channel closed — finalize all recordings.
let mut files = Vec::new();
for (leg_id, rec) in recorders {
let result = rec.stop();
files.push(serde_json::json!({
"source_leg_id": leg_id,
"file_path": result.file_path,
"duration_ms": result.duration_ms,
}));
}
emit_event(
&out_tx,
"tool_recording_done",
serde_json::json!({
"call_id": call_id,
"tool_leg_id": tool_leg_id,
"files": files,
}),
);
});
(tx, handle)
}
// ---------------------------------------------------------------------------
// Transcription consumer (stub — real plumbing, stub consumer)
// ---------------------------------------------------------------------------
/// Spawn a transcription tool leg.
///
/// The plumbing is fully real: it receives per-source unmerged PCM@16kHz from
/// the mixer every 20ms. The consumer is a stub that accumulates audio and
/// reports metadata on close. Future: will stream to a Whisper HTTP endpoint.
pub fn spawn_transcription_tool(
tool_leg_id: String,
call_id: String,
out_tx: OutTx,
) -> (mpsc::Sender<ToolAudioBatch>, JoinHandle<()>) {
let (tx, mut rx) = mpsc::channel::<ToolAudioBatch>(64);
let handle = tokio::spawn(async move {
// Track per-source sample counts for duration reporting.
let mut source_samples: HashMap<String, u64> = HashMap::new();
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
*source_samples.entry(source.leg_id.clone()).or_insert(0) +=
source.pcm_16k.len() as u64;
// TODO: Future — accumulate chunks and stream to Whisper endpoint.
// For now, the audio is received and counted but not processed.
}
}
// Channel closed — report metadata.
let sources: Vec<serde_json::Value> = source_samples
.iter()
.map(|(leg_id, samples)| {
serde_json::json!({
"source_leg_id": leg_id,
"duration_ms": (samples * 1000) / 16000,
})
})
.collect();
emit_event(
&out_tx,
"tool_transcription_done",
serde_json::json!({
"call_id": call_id,
"tool_leg_id": tool_leg_id,
"sources": sources,
}),
);
});
(tx, handle)
}

View File

@@ -290,6 +290,8 @@ async fn browser_to_mixer_loop(
.send(RtpPacket {
payload: payload.to_vec(),
payload_type: PT_OPUS,
marker: false,
timestamp: 0,
})
.await;
}

View File

@@ -41,6 +41,32 @@ type TProxyCommands = {
params: { call_id: string };
result: { file_path: string; duration_ms: number };
};
start_interaction: {
params: {
call_id: string;
leg_id: string;
prompt_wav: string;
expected_digits: string;
timeout_ms: number;
};
result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string };
};
add_tool_leg: {
params: {
call_id: string;
tool_type: 'recording' | 'transcription';
config?: Record<string, unknown>;
};
result: { tool_leg_id: string };
};
remove_tool_leg: {
params: { call_id: string; tool_leg_id: string };
result: Record<string, never>;
};
set_leg_metadata: {
params: { call_id: string; leg_id: string; key: string; value: unknown };
result: Record<string, never>;
};
};
// ---------------------------------------------------------------------------
@@ -280,11 +306,107 @@ export async function webrtcClose(sessionId: string): Promise<void> {
} catch { /* ignore */ }
}
// ---------------------------------------------------------------------------
// Leg interaction & tool leg commands
// ---------------------------------------------------------------------------
/**
* Start an interaction on a specific leg — isolate it, play a prompt, collect DTMF.
* Blocks until the interaction completes (digit pressed, timeout, or cancelled).
*/
export async function startInteraction(
callId: string,
legId: string,
promptWav: string,
expectedDigits: string,
timeoutMs: number,
): Promise<{ result: 'digit' | 'timeout' | 'cancelled'; digit?: string } | null> {
if (!bridge || !initialized) return null;
try {
const result = await bridge.sendCommand('start_interaction', {
call_id: callId,
leg_id: legId,
prompt_wav: promptWav,
expected_digits: expectedDigits,
timeout_ms: timeoutMs,
} as any);
return result as any;
} catch (e: any) {
logFn?.(`[proxy-engine] start_interaction error: ${e?.message || e}`);
return null;
}
}
/**
* Add a tool leg (recording or transcription) to a call.
* Tool legs receive per-source unmerged audio from all participants.
*/
export async function addToolLeg(
callId: string,
toolType: 'recording' | 'transcription',
config?: Record<string, unknown>,
): Promise<string | null> {
if (!bridge || !initialized) return null;
try {
const result = await bridge.sendCommand('add_tool_leg', {
call_id: callId,
tool_type: toolType,
config,
} as any);
return (result as any)?.tool_leg_id || null;
} catch (e: any) {
logFn?.(`[proxy-engine] add_tool_leg error: ${e?.message || e}`);
return null;
}
}
/**
* Remove a tool leg from a call. Triggers finalization (WAV files, metadata).
*/
export async function removeToolLeg(callId: string, toolLegId: string): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await bridge.sendCommand('remove_tool_leg', {
call_id: callId,
tool_leg_id: toolLegId,
} as any);
return true;
} catch (e: any) {
logFn?.(`[proxy-engine] remove_tool_leg error: ${e?.message || e}`);
return false;
}
}
/**
* Set a metadata key-value pair on a leg.
*/
export async function setLegMetadata(
callId: string,
legId: string,
key: string,
value: unknown,
): Promise<boolean> {
if (!bridge || !initialized) return false;
try {
await bridge.sendCommand('set_leg_metadata', {
call_id: callId,
leg_id: legId,
key,
value,
} as any);
return true;
} catch (e: any) {
logFn?.(`[proxy-engine] set_leg_metadata error: ${e?.message || e}`);
return false;
}
}
/**
* Subscribe to an event from the proxy engine.
* Event names: incoming_call, outbound_device_call, call_ringing,
* call_answered, call_ended, provider_registered, device_registered,
* dtmf_digit, recording_done, sip_unhandled
* dtmf_digit, recording_done, tool_recording_done, tool_transcription_done,
* leg_added, leg_removed, sip_unhandled
*/
export function onProxyEvent(event: string, handler: (data: any) => void): void {
if (!bridge) throw new Error('proxy engine not initialized');

View File

@@ -96,6 +96,16 @@ interface IDeviceStatus {
isBrowser: boolean;
}
interface IActiveLeg {
id: string;
type: 'sip-device' | 'sip-provider' | 'webrtc' | 'tool';
state: string;
codec: string | null;
rtpPort: number | null;
remoteMedia: string | null;
metadata: Record<string, unknown>;
}
interface IActiveCall {
id: string;
direction: string;
@@ -104,6 +114,13 @@ interface IActiveCall {
providerUsed: string | null;
state: string;
startedAt: number;
legs: Map<string, IActiveLeg>;
}
interface IHistoryLeg {
id: string;
type: string;
metadata: Record<string, unknown>;
}
interface ICallHistoryEntry {
@@ -113,6 +130,7 @@ interface ICallHistoryEntry {
calleeNumber: string | null;
startedAt: number;
duration: number;
legs: IHistoryLeg[];
}
const providerStatuses = new Map<string, IProviderStatus>();
@@ -187,7 +205,18 @@ function getStatus() {
calls: [...activeCalls.values()].map((c) => ({
...c,
duration: Math.floor((Date.now() - c.startedAt) / 1000),
legs: [],
legs: [...c.legs.values()].map((l) => ({
id: l.id,
type: l.type,
state: l.state,
codec: l.codec,
rtpPort: l.rtpPort,
remoteMedia: l.remoteMedia,
metadata: l.metadata || {},
pktSent: 0,
pktReceived: 0,
transcoding: false,
})),
})),
callHistory,
contacts: appConfig.contacts || [],
@@ -242,6 +271,7 @@ async function startProxyEngine(): Promise<void> {
providerUsed: data.provider_id,
state: 'ringing',
startedAt: Date.now(),
legs: new Map(),
});
// Notify browsers of incoming call.
@@ -266,6 +296,7 @@ async function startProxyEngine(): Promise<void> {
providerUsed: null,
state: 'setting-up',
startedAt: Date.now(),
legs: new Map(),
});
});
@@ -279,6 +310,7 @@ async function startProxyEngine(): Promise<void> {
providerUsed: data.provider_id,
state: 'setting-up',
startedAt: Date.now(),
legs: new Map(),
});
// Notify all browser devices — they can connect via WebRTC to listen/talk.
@@ -303,6 +335,20 @@ async function startProxyEngine(): Promise<void> {
if (call) {
call.state = 'connected';
log(`[call] ${data.call_id} connected`);
// Enrich provider leg with media info from the answered event.
if (data.provider_media_addr && data.provider_media_port) {
for (const leg of call.legs.values()) {
if (leg.type === 'sip-provider') {
leg.remoteMedia = `${data.provider_media_addr}:${data.provider_media_port}`;
if (data.sip_pt !== undefined) {
const codecNames: Record<number, string> = { 0: 'PCMU', 8: 'PCMA', 9: 'G.722', 111: 'Opus' };
leg.codec = codecNames[data.sip_pt] || `PT${data.sip_pt}`;
}
break;
}
}
}
}
// Try to link WebRTC session to this call for audio bridging.
@@ -331,6 +377,15 @@ async function startProxyEngine(): Promise<void> {
const call = activeCalls.get(data.call_id);
if (call) {
log(`[call] ${data.call_id} ended: ${data.reason} (${data.duration}s)`);
// Snapshot legs with metadata for history.
const historyLegs: IHistoryLeg[] = [];
for (const [, leg] of call.legs) {
historyLegs.push({
id: leg.id,
type: leg.type,
metadata: leg.metadata || {},
});
}
// Move to history.
callHistory.unshift({
id: call.id,
@@ -339,6 +394,7 @@ async function startProxyEngine(): Promise<void> {
calleeNumber: call.calleeNumber,
startedAt: call.startedAt,
duration: data.duration,
legs: historyLegs,
});
if (callHistory.length > MAX_HISTORY) callHistory.pop();
activeCalls.delete(data.call_id);
@@ -361,17 +417,50 @@ async function startProxyEngine(): Promise<void> {
log(`[sip] unhandled ${data.method_or_status} Call-ID=${data.call_id?.slice(0, 20)} from=${data.from_addr}:${data.from_port}`);
});
// Leg events (multiparty).
// Leg events (multiparty) — update shadow state so the dashboard shows legs.
onProxyEvent('leg_added', (data: any) => {
log(`[leg] added: call=${data.call_id} leg=${data.leg_id} kind=${data.kind} state=${data.state}`);
const call = activeCalls.get(data.call_id);
if (call) {
call.legs.set(data.leg_id, {
id: data.leg_id,
type: data.kind,
state: data.state,
codec: null,
rtpPort: null,
remoteMedia: null,
metadata: data.metadata || {},
});
}
});
onProxyEvent('leg_removed', (data: any) => {
log(`[leg] removed: call=${data.call_id} leg=${data.leg_id}`);
activeCalls.get(data.call_id)?.legs.delete(data.leg_id);
});
onProxyEvent('leg_state_changed', (data: any) => {
log(`[leg] state: call=${data.call_id} leg=${data.leg_id}${data.state}`);
const call = activeCalls.get(data.call_id);
if (!call) return;
const leg = call.legs.get(data.leg_id);
if (leg) {
leg.state = data.state;
if (data.metadata) leg.metadata = data.metadata;
} else {
// Initial legs (provider/device) don't emit leg_added — create on first state change.
const legId: string = data.leg_id;
const type = legId.includes('-prov') ? 'sip-provider' : legId.includes('-dev') ? 'sip-device' : 'webrtc';
call.legs.set(data.leg_id, {
id: data.leg_id,
type,
state: data.state,
codec: null,
rtpPort: null,
remoteMedia: null,
metadata: data.metadata || {},
});
}
});
// WebRTC events from Rust — forward ICE candidates to browser via WebSocket.
@@ -484,6 +573,7 @@ initWebUi(
providerUsed: providerId || null,
state: 'setting-up',
startedAt: Date.now(),
legs: new Map(),
});
} else {
log(`[dashboard] call failed for ${number}`);

View File

@@ -20,7 +20,7 @@ export interface IDeviceStatus {
export interface ILegStatus {
id: string;
type: 'sip-device' | 'sip-provider' | 'webrtc';
type: 'sip-device' | 'sip-provider' | 'webrtc' | 'tool';
state: string;
remoteMedia: { address: string; port: number } | null;
rtpPort: number | null;
@@ -28,6 +28,7 @@ export interface ILegStatus {
pktReceived: number;
codec: string | null;
transcoding: boolean;
metadata?: Record<string, unknown>;
}
export interface ICallStatus {
@@ -42,6 +43,12 @@ export interface ICallStatus {
legs: ILegStatus[];
}
export interface IHistoryLeg {
id: string;
type: string;
metadata: Record<string, unknown>;
}
export interface ICallHistoryEntry {
id: string;
direction: 'inbound' | 'outbound' | 'internal';
@@ -50,6 +57,7 @@ export interface ICallHistoryEntry {
providerUsed: string | null;
startedAt: number;
duration: number;
legs?: IHistoryLeg[];
}
export interface IContact {