diff --git a/rust/crates/proxy-engine/src/audio_player.rs b/rust/crates/proxy-engine/src/audio_player.rs index 4c5332a..d4a79ff 100644 --- a/rust/crates/proxy-engine/src/audio_player.rs +++ b/rust/crates/proxy-engine/src/audio_player.rs @@ -1,4 +1,5 @@ //! Audio player — reads a WAV file and streams it as RTP packets. +//! Also provides prompt preparation for the leg interaction system. use crate::rtp::{build_rtp_header, rtp_clock_increment}; use codec_lib::{codec_sample_rate, TranscodeState}; @@ -8,6 +9,11 @@ use std::sync::Arc; use tokio::net::UdpSocket; use tokio::time::{self, Duration}; +/// Mixing sample rate used by the mixer (must stay in sync with mixer::MIX_RATE). +const MIX_RATE: u32 = 16000; +/// Samples per 20ms frame at the mixing rate. +const MIX_FRAME_SIZE: usize = 320; + /// Play a WAV file as RTP to a destination. /// Returns when playback is complete. pub async fn play_wav_file( @@ -171,3 +177,64 @@ pub async fn play_beep( Ok((seq, ts)) } + +/// Load a WAV file and split it into 20ms PCM frames at 16kHz. +/// Used by the leg interaction system to prepare prompt audio for the mixer. +pub fn load_prompt_pcm_frames(wav_path: &str) -> Result>, String> { + let path = Path::new(wav_path); + if !path.exists() { + return Err(format!("WAV file not found: {wav_path}")); + } + + let mut reader = + hound::WavReader::open(path).map_err(|e| format!("open WAV {wav_path}: {e}"))?; + let spec = reader.spec(); + let wav_rate = spec.sample_rate; + + // Read all samples as i16. + let samples: Vec = if spec.bits_per_sample == 16 { + reader + .samples::() + .filter_map(|s| s.ok()) + .collect() + } else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float { + reader + .samples::() + .filter_map(|s| s.ok()) + .map(|s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16) + .collect() + } else { + return Err(format!( + "unsupported WAV format: {}bit {:?}", + spec.bits_per_sample, spec.sample_format + )); + }; + + if samples.is_empty() { + return Ok(vec![]); + } + + // Resample to MIX_RATE (16kHz) if needed. + let resampled = if wav_rate != MIX_RATE { + let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?; + transcoder + .resample(&samples, wav_rate, MIX_RATE) + .map_err(|e| format!("resample: {e}"))? + } else { + samples + }; + + // Split into MIX_FRAME_SIZE (320) sample frames. + let mut frames = Vec::new(); + let mut offset = 0; + while offset < resampled.len() { + let end = (offset + MIX_FRAME_SIZE).min(resampled.len()); + let mut frame = resampled[offset..end].to_vec(); + // Pad short final frame with silence. + frame.resize(MIX_FRAME_SIZE, 0); + frames.push(frame); + offset += MIX_FRAME_SIZE; + } + + Ok(frames) +} diff --git a/rust/crates/proxy-engine/src/call.rs b/rust/crates/proxy-engine/src/call.rs index ba38c0c..415ad1a 100644 --- a/rust/crates/proxy-engine/src/call.rs +++ b/rust/crates/proxy-engine/src/call.rs @@ -5,6 +5,7 @@ use crate::mixer::{MixerCommand, RtpPacket}; use crate::sip_leg::SipLeg; +use sip_proto::message::SipMessage; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; @@ -59,6 +60,7 @@ pub enum LegKind { SipDevice, WebRtc, Media, // voicemail playback, IVR, recording + Tool, // observer leg for recording, transcription, etc. } impl LegKind { @@ -68,6 +70,7 @@ impl LegKind { Self::SipDevice => "sip-device", Self::WebRtc => "webrtc", Self::Media => "media", + Self::Tool => "tool", } } } @@ -113,6 +116,10 @@ pub struct LegInfo { pub remote_media: Option, /// SIP signaling address (provider or device). pub signaling_addr: Option, + + /// Flexible key-value metadata (consent state, tool config, etc.). + /// Persisted into call history on call end. + pub metadata: HashMap, } /// A multiparty call with N legs and a central mixer. @@ -127,6 +134,10 @@ pub struct Call { pub callee_number: Option, pub provider_id: String, + /// Original INVITE from the device (for device-originated outbound calls). + /// Used to construct proper 180/200/error responses back to the device. + pub device_invite: Option, + /// All legs in this call, keyed by leg ID. pub legs: HashMap, @@ -153,6 +164,7 @@ impl Call { caller_number: None, callee_number: None, provider_id, + device_invite: None, legs: HashMap::new(), mixer_cmd_tx, mixer_task: Some(mixer_task), @@ -207,6 +219,13 @@ impl Call { .values() .filter(|l| l.state != LegState::Terminated) .map(|l| { + let metadata: serde_json::Value = if l.metadata.is_empty() { + serde_json::json!({}) + } else { + serde_json::Value::Object( + l.metadata.iter().map(|(k, v)| (k.clone(), v.clone())).collect(), + ) + }; serde_json::json!({ "id": l.id, "type": l.kind.as_str(), @@ -214,6 +233,7 @@ impl Call { "codec": sip_proto::helpers::codec_name(l.codec_pt), "rtpPort": l.rtp_port, "remoteMedia": l.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())), + "metadata": metadata, }) }) .collect(); diff --git a/rust/crates/proxy-engine/src/call_manager.rs b/rust/crates/proxy-engine/src/call_manager.rs index a93b5b6..00a0c3c 100644 --- a/rust/crates/proxy-engine/src/call_manager.rs +++ b/rust/crates/proxy-engine/src/call_manager.rs @@ -12,8 +12,8 @@ use crate::mixer::spawn_mixer; use crate::registrar::Registrar; use crate::rtp::RtpPortPool; use crate::sip_leg::{SipLeg, SipLegAction, SipLegConfig}; -use sip_proto::helpers::{generate_call_id, parse_sdp_endpoint}; -use sip_proto::message::SipMessage; +use sip_proto::helpers::{build_sdp, generate_call_id, generate_tag, parse_sdp_endpoint, SdpOptions}; +use sip_proto::message::{ResponseOptions, SipMessage}; use sip_proto::rewrite::{rewrite_sdp, rewrite_sip_uri}; use std::collections::HashMap; use std::net::SocketAddr; @@ -167,6 +167,16 @@ impl CallManager { if let Some(leg) = call.legs.get_mut(leg_id) { leg.state = LegState::Ringing; } + // Forward 180 Ringing to device if this is a device-originated call. + if let Some(device_invite) = &call.device_invite { + let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice); + if let Some(dev) = device_leg { + if let Some(dev_addr) = dev.signaling_addr { + let ringing = SipMessage::create_response(180, "Ringing", device_invite, None); + let _ = socket.send_to(&ringing.serialize(), dev_addr).await; + } + } + } } emit_event(&self.out_tx, "call_ringing", serde_json::json!({ "call_id": call_id })); emit_event(&self.out_tx, "leg_state_changed", @@ -187,7 +197,7 @@ impl CallManager { remote }; - // Wire the leg to the mixer if remote media is known. + // Wire the provider leg to the mixer if remote media is known. if let (Some(remote_addr), Some(rtp_socket)) = (remote, rtp_socket_clone) { let channels = create_leg_channels(); spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx); @@ -198,6 +208,66 @@ impl CallManager { } } + // For device-originated calls: send 200 OK to device and wire device leg. + if let Some(call) = self.calls.get(call_id) { + if let Some(device_invite) = call.device_invite.clone() { + let device_leg_info: Option<(SocketAddr, u16, Arc, Option, String)> = + call.legs.values().find(|l| l.kind == LegKind::SipDevice).and_then(|dev| { + Some(( + dev.signaling_addr?, + dev.rtp_port, + dev.rtp_socket.clone()?, + dev.remote_media, + dev.id.clone(), + )) + }); + + if let Some((dev_addr, dev_rtp_port, dev_rtp_socket, dev_remote, dev_leg_id)) = device_leg_info { + // Build SDP pointing device to our device_rtp port. + // Use LAN IP for the device (it's on the local network). + let call_ref = self.calls.get(call_id).unwrap(); + let prov_leg = call_ref.legs.values().find(|l| l.kind == LegKind::SipProvider); + let lan_ip_str = prov_leg + .and_then(|l| l.sip_leg.as_ref()) + .map(|sl| sl.config.lan_ip.clone()) + .unwrap_or_else(|| "0.0.0.0".to_string()); + + let sdp = build_sdp(&SdpOptions { + ip: &lan_ip_str, + port: dev_rtp_port, + ..Default::default() + }); + + let ok = SipMessage::create_response(200, "OK", &device_invite, Some(ResponseOptions { + to_tag: Some(generate_tag()), + contact: Some(format!("", lan_ip_str, 5060)), + body: Some(sdp), + content_type: Some("application/sdp".to_string()), + extra_headers: None, + })); + let _ = socket.send_to(&ok.serialize(), dev_addr).await; + + // Update device leg state. + if let Some(call) = self.calls.get_mut(call_id) { + if let Some(dev_leg) = call.legs.get_mut(&dev_leg_id) { + dev_leg.state = LegState::Connected; + } + } + + // Wire device leg to mixer. + if let Some(dev_remote_addr) = dev_remote { + let dev_channels = create_leg_channels(); + spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx); + spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx); + if let Some(call) = self.calls.get(call_id) { + call.add_leg_to_mixer(&dev_leg_id, sip_pt, dev_channels.inbound_rx, dev_channels.outbound_tx) + .await; + } + } + } + } + } + emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id, "provider_media_addr": remote.map(|a| a.ip().to_string()), @@ -209,6 +279,34 @@ impl CallManager { } SipLegAction::Terminated(reason) => { let duration = self.calls.get(call_id).map(|c| c.duration_secs()).unwrap_or(0); + + // Notify device if this is a device-originated outbound call. + if let Some(call) = self.calls.get(call_id) { + if let Some(device_invite) = &call.device_invite { + let device_leg = call.legs.values().find(|l| l.kind == LegKind::SipDevice); + if let Some(dev) = device_leg { + if let Some(dev_addr) = dev.signaling_addr { + // Map reason to SIP response code. + let code: u16 = if reason.starts_with("rejected_") { + reason.strip_prefix("rejected_") + .and_then(|s| s.parse().ok()) + .unwrap_or(503) + } else if reason == "bye" { + // Provider sent BYE — send BYE to device too. + // (200 OK already connected; just let terminate_call handle it) + 0 + } else { + 503 + }; + if code > 0 && dev.state != LegState::Connected { + let resp = SipMessage::create_response(code, "Service Unavailable", device_invite, None); + let _ = socket.send_to(&resp.serialize(), dev_addr).await; + } + } + } + } + } + if let Some(call) = self.calls.get_mut(call_id) { if let Some(leg) = call.legs.get_mut(leg_id) { leg.state = LegState::Terminated; @@ -277,13 +375,48 @@ impl CallManager { // Get this leg's RTP port (for SDP rewriting — tell the other side to send RTP here). let this_rtp_port = call.legs.get(this_leg_id).map(|l| l.rtp_port).unwrap_or(0); + // Check if the other leg is a B2BUA leg (has SipLeg for proper dialog mgmt). + let other_has_sip_leg = call.legs.get(&other_leg_id) + .map(|l| l.sip_leg.is_some()) + .unwrap_or(false); + if msg.is_request() { let method = msg.method().unwrap_or(""); + // ACK: In hybrid B2BUA mode, the device's ACK for our 200 OK + // is absorbed silently (provider's 200 was already ACKed by SipLeg). + if method == "ACK" { + if other_has_sip_leg { + return true; // Absorb — provider ACK handled by SipLeg. + } + // Pure passthrough: forward ACK normally. + let _ = socket.send_to(&msg.serialize(), forward_to).await; + return true; + } + + // INVITE retransmit: the call already exists, re-send 100 Trying. + if method == "INVITE" { + let trying = SipMessage::create_response(100, "Trying", msg, None); + let _ = socket.send_to(&trying.serialize(), from_addr).await; + return true; + } + if method == "BYE" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; - let _ = socket.send_to(&msg.serialize(), forward_to).await; + + // If other leg has SipLeg, use build_hangup for proper dialog teardown. + if other_has_sip_leg { + if let Some(other) = call.legs.get_mut(&other_leg_id) { + if let Some(sip_leg) = &mut other.sip_leg { + if let Some(hangup_buf) = sip_leg.build_hangup() { + let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await; + } + } + } + } else { + let _ = socket.send_to(&msg.serialize(), forward_to).await; + } let duration = call.duration_secs(); emit_event( @@ -302,7 +435,19 @@ impl CallManager { if method == "CANCEL" { let ok = SipMessage::create_response(200, "OK", msg, None); let _ = socket.send_to(&ok.serialize(), from_addr).await; - let _ = socket.send_to(&msg.serialize(), forward_to).await; + + // If other leg has SipLeg, use build_hangup (produces CANCEL for early dialog). + if other_has_sip_leg { + if let Some(other) = call.legs.get_mut(&other_leg_id) { + if let Some(sip_leg) = &mut other.sip_leg { + if let Some(hangup_buf) = sip_leg.build_hangup() { + let _ = socket.send_to(&hangup_buf, sip_leg.config.sip_target).await; + } + } + } + } else { + let _ = socket.send_to(&msg.serialize(), forward_to).await; + } let duration = call.duration_secs(); emit_event( @@ -559,6 +704,7 @@ impl CallManager { rtp_port: provider_rtp.port, remote_media: provider_media, signaling_addr: Some(from_addr), + metadata: HashMap::new(), }, ); @@ -578,6 +724,7 @@ impl CallManager { rtp_port: device_rtp.port, remote_media: None, // Learned from device's 200 OK. signaling_addr: Some(device_addr), + metadata: HashMap::new(), }, ); @@ -686,6 +833,7 @@ impl CallManager { rtp_port: rtp_alloc.port, remote_media: None, signaling_addr: Some(provider_dest), + metadata: HashMap::new(), }, ); @@ -697,8 +845,12 @@ impl CallManager { Some(call_id) } - /// Create an outbound passthrough call (device → provider). - pub async fn create_outbound_passthrough( + /// Create a device-originated outbound call (device → provider) using hybrid B2BUA. + /// + /// The device side is a simple passthrough leg (no SipLeg needed). + /// The provider side uses a full SipLeg for proper dialog management, + /// 407 auth, correct From URI, and public IP in SDP. + pub async fn create_device_outbound_call( &mut self, invite: &SipMessage, from_addr: SocketAddr, @@ -707,19 +859,28 @@ impl CallManager { rtp_pool: &mut RtpPortPool, socket: &UdpSocket, public_ip: Option<&str>, + registered_aor: &str, ) -> Option { let call_id = self.next_call_id(); let lan_ip = &config.proxy.lan_ip; let lan_port = config.proxy.lan_port; - let pub_ip = public_ip.unwrap_or(lan_ip.as_str()); - let sip_call_id = invite.call_id().to_string(); - let callee = invite.request_uri().unwrap_or("").to_string(); + let device_sip_call_id = invite.call_id().to_string(); + + let dialed_number = invite + .request_uri() + .and_then(|uri| SipMessage::extract_uri(uri)) + .unwrap_or(invite.request_uri().unwrap_or("")) + .to_string(); let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() { Some(a) => a, None => return None, }; + // Send 100 Trying to device immediately to stop retransmissions. + let trying = SipMessage::create_response(100, "Trying", invite, None); + let _ = socket.send_to(&trying.serialize(), from_addr).await; + // Allocate RTP ports for both legs. let device_rtp = match rtp_pool.allocate().await { Some(a) => a, @@ -741,9 +902,10 @@ impl CallManager { mixer_cmd_tx, mixer_task, ); - call.callee_number = Some(callee); + call.callee_number = Some(dialed_number.clone()); + call.device_invite = Some(invite.clone()); - // Device leg. + // --- Device leg (passthrough, no SipLeg) --- let device_leg_id = format!("{call_id}-dev"); let mut device_media: Option = None; if invite.has_sdp_body() { @@ -759,20 +921,45 @@ impl CallManager { LegInfo { id: device_leg_id.clone(), kind: LegKind::SipDevice, - state: LegState::Connected, + state: LegState::Inviting, // Not connected yet — waiting for provider answer codec_pt, sip_leg: None, - sip_call_id: Some(sip_call_id.clone()), + sip_call_id: Some(device_sip_call_id.clone()), webrtc_session_id: None, rtp_socket: Some(device_rtp.socket.clone()), rtp_port: device_rtp.port, remote_media: device_media, signaling_addr: Some(from_addr), + metadata: HashMap::new(), }, ); - // Provider leg. + // Register device's SIP Call-ID → device leg. + self.sip_index + .insert(device_sip_call_id, (call_id.clone(), device_leg_id)); + + // --- Provider leg (B2BUA with SipLeg) --- let provider_leg_id = format!("{call_id}-prov"); + let provider_sip_call_id = generate_call_id(None); + + let leg_config = SipLegConfig { + lan_ip: lan_ip.clone(), + lan_port, + public_ip: public_ip.map(|s| s.to_string()), + sip_target: provider_dest, + username: Some(provider_config.username.clone()), + password: Some(provider_config.password.clone()), + registered_aor: Some(registered_aor.to_string()), + codecs: provider_config.codecs.clone(), + rtp_port: provider_rtp.port, + }; + + let mut sip_leg = SipLeg::new(provider_leg_id.clone(), leg_config); + + // Build proper To URI and send INVITE. + let to_uri = format!("sip:{}@{}", dialed_number, provider_config.domain); + sip_leg.send_invite(registered_aor, &to_uri, &provider_sip_call_id, socket).await; + call.legs.insert( provider_leg_id.clone(), LegInfo { @@ -780,38 +967,20 @@ impl CallManager { kind: LegKind::SipProvider, state: LegState::Inviting, codec_pt, - sip_leg: None, - sip_call_id: Some(sip_call_id.clone()), + sip_leg: Some(sip_leg), + sip_call_id: Some(provider_sip_call_id.clone()), webrtc_session_id: None, rtp_socket: Some(provider_rtp.socket.clone()), rtp_port: provider_rtp.port, remote_media: None, signaling_addr: Some(provider_dest), + metadata: HashMap::new(), }, ); + // Register provider's SIP Call-ID → provider leg. self.sip_index - .insert(sip_call_id.clone(), (call_id.clone(), device_leg_id)); - - // Forward INVITE to provider with SDP rewriting. - let mut fwd_invite = invite.clone(); - fwd_invite.prepend_header("Record-Route", &format!("")); - - if let Some(contact) = fwd_invite.get_header("Contact").map(|s| s.to_string()) { - let new_contact = rewrite_sip_uri(&contact, pub_ip, lan_port); - if new_contact != contact { - fwd_invite.set_header("Contact", &new_contact); - } - } - - // Tell provider to send RTP to our provider_rtp port. - if fwd_invite.has_sdp_body() { - let (new_body, _) = rewrite_sdp(&fwd_invite.body, pub_ip, provider_rtp.port); - fwd_invite.body = new_body; - fwd_invite.update_content_length(); - } - - let _ = socket.send_to(&fwd_invite.serialize(), provider_dest).await; + .insert(provider_sip_call_id, (call_id.clone(), provider_leg_id)); self.calls.insert(call_id.clone(), call); Some(call_id) @@ -872,6 +1041,7 @@ impl CallManager { rtp_port: rtp_alloc.port, remote_media: None, signaling_addr: Some(provider_dest), + metadata: HashMap::new(), }; self.sip_index @@ -1099,6 +1269,7 @@ impl CallManager { rtp_port: rtp_alloc.port, remote_media: Some(provider_media), signaling_addr: Some(from_addr), + metadata: HashMap::new(), }, ); diff --git a/rust/crates/proxy-engine/src/leg_io.rs b/rust/crates/proxy-engine/src/leg_io.rs index 1b4ac9a..ce2e6ff 100644 --- a/rust/crates/proxy-engine/src/leg_io.rs +++ b/rust/crates/proxy-engine/src/leg_io.rs @@ -50,11 +50,13 @@ pub fn spawn_sip_inbound( continue; // Too small for RTP header. } let pt = buf[1] & 0x7F; + let marker = (buf[1] & 0x80) != 0; + let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); let payload = buf[12..n].to_vec(); if payload.is_empty() { continue; } - if inbound_tx.send(RtpPacket { payload, payload_type: pt }).await.is_err() { + if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, timestamp }).await.is_err() { break; // Channel closed — leg removed. } } diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index cee40c8..bf01113 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -20,6 +20,7 @@ mod registrar; mod rtp; mod sip_leg; mod sip_transport; +mod tool_leg; mod voicemail; mod webrtc_engine; @@ -141,6 +142,11 @@ async fn handle_command( "webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await, // webrtc_link needs both: engine (for mixer channels) and webrtc (for session). "webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await, + // Leg interaction and tool leg commands. + "start_interaction" => handle_start_interaction(engine, out_tx, &cmd).await, + "add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await, + "remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await, + "set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await, _ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)), } } @@ -373,11 +379,14 @@ async fn handle_sip_packet( ); if let Some(route) = route_result { - let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await { + // Look up provider state by config ID (not by device address). + let (public_ip, registered_aor) = if let Some(ps_arc) = + eng.provider_mgr.find_by_provider_id(&route.provider.id).await + { let ps = ps_arc.lock().await; - ps.public_ip.clone() + (ps.public_ip.clone(), ps.registered_aor.clone()) } else { - None + (None, format!("sip:{}@{}", route.provider.username, route.provider.domain)) }; let ProxyEngine { @@ -387,7 +396,7 @@ async fn handle_sip_packet( } = *eng; let rtp_pool = rtp_pool.as_mut().unwrap(); let call_id = call_mgr - .create_outbound_passthrough( + .create_device_outbound_call( &msg, from_addr, &route.provider, @@ -395,6 +404,7 @@ async fn handle_sip_packet( rtp_pool, socket, public_ip.as_deref(), + ®istered_aor, ) .await; @@ -645,6 +655,7 @@ async fn handle_webrtc_link( rtp_port: 0, remote_media: None, signaling_addr: None, + metadata: std::collections::HashMap::new(), }, ); } @@ -773,3 +784,319 @@ async fn handle_webrtc_close(webrtc: Arc>, out_tx: &OutTx, c Err(e) => respond_err(out_tx, &cmd.id, &e), } } + +// --------------------------------------------------------------------------- +// Leg interaction & tool leg commands +// --------------------------------------------------------------------------- + +/// Handle `start_interaction` — isolate a leg, play a prompt, collect DTMF. +/// This command blocks until the interaction completes (digit, timeout, or cancel). +async fn handle_start_interaction( + engine: Arc>, + out_tx: &OutTx, + cmd: &Command, +) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + }; + let prompt_wav = match cmd.params.get("prompt_wav").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing prompt_wav"); return; } + }; + let expected_digits: Vec = cmd + .params + .get("expected_digits") + .and_then(|v| v.as_str()) + .unwrap_or("12") + .chars() + .collect(); + let timeout_ms = cmd + .params + .get("timeout_ms") + .and_then(|v| v.as_u64()) + .unwrap_or(15000) as u32; + + // Load prompt audio from WAV file. + let prompt_frames = match crate::audio_player::load_prompt_pcm_frames(&prompt_wav) { + Ok(f) => f, + Err(e) => { + respond_err(out_tx, &cmd.id, &format!("prompt load failed: {e}")); + return; + } + }; + + // Create oneshot channel for the result. + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + + // Send StartInteraction to the mixer. + { + let eng = engine.lock().await; + let call = match eng.call_mgr.calls.get(&call_id) { + Some(c) => c, + None => { + respond_err(out_tx, &cmd.id, &format!("call {call_id} not found")); + return; + } + }; + let _ = call + .mixer_cmd_tx + .send(crate::mixer::MixerCommand::StartInteraction { + leg_id: leg_id.clone(), + prompt_pcm_frames: prompt_frames, + expected_digits: expected_digits.clone(), + timeout_ms, + result_tx, + }) + .await; + } // engine lock released — we block on the oneshot, not the lock. + + // Await the interaction result (blocks this task until complete). + let safety_timeout = tokio::time::Duration::from_millis(timeout_ms as u64 + 30000); + let result = match tokio::time::timeout(safety_timeout, result_rx).await { + Ok(Ok(r)) => r, + Ok(Err(_)) => crate::mixer::InteractionResult::Cancelled, // oneshot dropped + Err(_) => crate::mixer::InteractionResult::Timeout, // safety timeout + }; + + // Store consent result in leg metadata. + let (result_str, digit_str) = match &result { + crate::mixer::InteractionResult::Digit(d) => ("digit", Some(d.to_string())), + crate::mixer::InteractionResult::Timeout => ("timeout", None), + crate::mixer::InteractionResult::Cancelled => ("cancelled", None), + }; + + { + let mut eng = engine.lock().await; + if let Some(call) = eng.call_mgr.calls.get_mut(&call_id) { + if let Some(leg) = call.legs.get_mut(&leg_id) { + leg.metadata.insert( + "last_interaction_result".to_string(), + serde_json::json!(result_str), + ); + if let Some(ref d) = digit_str { + leg.metadata.insert( + "last_interaction_digit".to_string(), + serde_json::json!(d), + ); + } + } + } + } + + let mut resp = serde_json::json!({ "result": result_str }); + if let Some(d) = digit_str { + resp["digit"] = serde_json::json!(d); + } + respond_ok(out_tx, &cmd.id, resp); +} + +/// Handle `add_tool_leg` — add a recording or transcription tool leg to a call. +async fn handle_add_tool_leg( + engine: Arc>, + out_tx: &OutTx, + cmd: &Command, +) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let tool_type_str = match cmd.params.get("tool_type").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing tool_type"); return; } + }; + + let tool_type = match tool_type_str.as_str() { + "recording" => crate::mixer::ToolType::Recording, + "transcription" => crate::mixer::ToolType::Transcription, + other => { + respond_err(out_tx, &cmd.id, &format!("unknown tool_type: {other}")); + return; + } + }; + + let tool_leg_id = format!("{call_id}-tool-{}", rand::random::()); + + // Spawn the appropriate background task. + let (audio_tx, _task_handle) = match tool_type { + crate::mixer::ToolType::Recording => { + let base_dir = cmd + .params + .get("config") + .and_then(|c| c.get("base_dir")) + .and_then(|v| v.as_str()) + .unwrap_or(".nogit/recordings") + .to_string(); + crate::tool_leg::spawn_recording_tool( + tool_leg_id.clone(), + call_id.clone(), + base_dir, + out_tx.clone(), + ) + } + crate::mixer::ToolType::Transcription => { + crate::tool_leg::spawn_transcription_tool( + tool_leg_id.clone(), + call_id.clone(), + out_tx.clone(), + ) + } + }; + + // Send AddToolLeg to the mixer and register in call. + { + let mut eng = engine.lock().await; + let call = match eng.call_mgr.calls.get_mut(&call_id) { + Some(c) => c, + None => { + respond_err(out_tx, &cmd.id, &format!("call {call_id} not found")); + return; + } + }; + + let _ = call + .mixer_cmd_tx + .send(crate::mixer::MixerCommand::AddToolLeg { + leg_id: tool_leg_id.clone(), + tool_type, + audio_tx, + }) + .await; + + // Register tool leg in the call's leg map. + let mut metadata = std::collections::HashMap::new(); + metadata.insert( + "tool_type".to_string(), + serde_json::json!(tool_type_str), + ); + call.legs.insert( + tool_leg_id.clone(), + crate::call::LegInfo { + id: tool_leg_id.clone(), + kind: crate::call::LegKind::Tool, + state: crate::call::LegState::Connected, + codec_pt: 0, + sip_leg: None, + sip_call_id: None, + webrtc_session_id: None, + rtp_socket: None, + rtp_port: 0, + remote_media: None, + signaling_addr: None, + metadata, + }, + ); + } + + emit_event( + out_tx, + "leg_added", + serde_json::json!({ + "call_id": call_id, + "leg_id": tool_leg_id, + "kind": "tool", + "tool_type": tool_type_str, + "state": "connected", + }), + ); + + respond_ok( + out_tx, + &cmd.id, + serde_json::json!({ "tool_leg_id": tool_leg_id }), + ); +} + +/// Handle `remove_tool_leg` — remove a tool leg from a call. +async fn handle_remove_tool_leg( + engine: Arc>, + out_tx: &OutTx, + cmd: &Command, +) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let tool_leg_id = match cmd.params.get("tool_leg_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing tool_leg_id"); return; } + }; + + let mut eng = engine.lock().await; + let call = match eng.call_mgr.calls.get_mut(&call_id) { + Some(c) => c, + None => { + respond_err(out_tx, &cmd.id, &format!("call {call_id} not found")); + return; + } + }; + + // Remove from mixer (drops audio_tx → background task finalizes). + let _ = call + .mixer_cmd_tx + .send(crate::mixer::MixerCommand::RemoveToolLeg { + leg_id: tool_leg_id.clone(), + }) + .await; + + // Remove from call's leg map. + call.legs.remove(&tool_leg_id); + + emit_event( + out_tx, + "leg_removed", + serde_json::json!({ + "call_id": call_id, + "leg_id": tool_leg_id, + }), + ); + + respond_ok(out_tx, &cmd.id, serde_json::json!({})); +} + +/// Handle `set_leg_metadata` — set a metadata key on a leg. +async fn handle_set_leg_metadata( + engine: Arc>, + out_tx: &OutTx, + cmd: &Command, +) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + }; + let key = match cmd.params.get("key").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing key"); return; } + }; + let value = match cmd.params.get("value") { + Some(v) => v.clone(), + None => { respond_err(out_tx, &cmd.id, "missing value"); return; } + }; + + let mut eng = engine.lock().await; + let call = match eng.call_mgr.calls.get_mut(&call_id) { + Some(c) => c, + None => { + respond_err(out_tx, &cmd.id, &format!("call {call_id} not found")); + return; + } + }; + let leg = match call.legs.get_mut(&leg_id) { + Some(l) => l, + None => { + respond_err(out_tx, &cmd.id, &format!("leg {leg_id} not found")); + return; + } + }; + + leg.metadata.insert(key, value); + respond_ok(out_tx, &cmd.id, serde_json::json!({})); +} diff --git a/rust/crates/proxy-engine/src/mixer.rs b/rust/crates/proxy-engine/src/mixer.rs index 878356d..9b42aac 100644 --- a/rust/crates/proxy-engine/src/mixer.rs +++ b/rust/crates/proxy-engine/src/mixer.rs @@ -5,14 +5,17 @@ //! //! The mixer runs a 20ms tick loop: //! 1. Drain inbound channels, decode to PCM, resample to 16kHz -//! 2. Compute total mix (sum of all legs' PCM as i32) -//! 3. For each leg: mix-minus = total - own, resample to leg codec rate, encode, send +//! 2. Compute total mix (sum of all **participant** legs' PCM as i32) +//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send +//! 4. For each isolated leg: play prompt frame or silence, check DTMF +//! 5. For each tool leg: send per-source unmerged audio batch +//! 6. Forward DTMF between participant legs only use crate::ipc::{emit_event, OutTx}; use crate::rtp::{build_rtp_header, rtp_clock_increment}; use codec_lib::{codec_sample_rate, TranscodeState}; -use std::collections::HashMap; -use tokio::sync::mpsc; +use std::collections::{HashMap, VecDeque}; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; use tokio::time::{self, Duration, MissedTickBehavior}; @@ -25,11 +28,84 @@ const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020 pub struct RtpPacket { pub payload: Vec, pub payload_type: u8, + /// RTP marker bit (first packet of a DTMF event, etc.). + pub marker: bool, + /// RTP timestamp from the original packet header. + pub timestamp: u32, } +// --------------------------------------------------------------------------- +// Leg roles +// --------------------------------------------------------------------------- + +/// What role a leg currently plays in the mixer. +enum LegRole { + /// Normal participant: contributes to mix, receives mix-minus. + Participant, + /// Temporarily isolated for IVR/consent interaction. + Isolated(IsolationState), +} + +struct IsolationState { + /// PCM frames at MIX_RATE (320 samples each) queued for playback. + prompt_frames: VecDeque>, + /// Digits that complete the interaction (e.g., ['1', '2']). + expected_digits: Vec, + /// Ticks remaining before timeout (decremented each tick after prompt ends). + timeout_ticks_remaining: u32, + /// Whether we've finished playing the prompt. + prompt_done: bool, + /// Channel to send the result back to the command handler. + result_tx: Option>, +} + +/// Result of a leg interaction (consent prompt, IVR, etc.). +pub enum InteractionResult { + /// The participant pressed one of the expected digits. + Digit(char), + /// No digit was received within the timeout. + Timeout, + /// The leg was removed or the call tore down before completion. + Cancelled, +} + +// --------------------------------------------------------------------------- +// Tool legs +// --------------------------------------------------------------------------- + +/// Type of tool leg. +#[derive(Debug, Clone, Copy)] +pub enum ToolType { + Recording, + Transcription, +} + +/// Per-source audio delivered to a tool leg each mixer tick. +pub struct ToolAudioBatch { + pub sources: Vec, +} + +/// One participant's 20ms audio frame. +pub struct ToolAudioSource { + pub leg_id: String, + /// PCM at 16kHz, MIX_FRAME_SIZE (320) samples. + pub pcm_16k: Vec, +} + +/// Internal storage for a tool leg inside the mixer. +struct ToolLegSlot { + #[allow(dead_code)] + tool_type: ToolType, + audio_tx: mpsc::Sender, +} + +// --------------------------------------------------------------------------- +// Commands +// --------------------------------------------------------------------------- + /// Commands sent to the mixer task via a control channel. pub enum MixerCommand { - /// Add a new leg to the mix. + /// Add a new participant leg to the mix. AddLeg { leg_id: String, codec_pt: u8, @@ -40,8 +116,35 @@ pub enum MixerCommand { RemoveLeg { leg_id: String }, /// Shut down the mixer. Shutdown, + + /// Isolate a leg and start an interaction (consent prompt, IVR). + /// The leg is removed from the mix and hears the prompt instead. + /// DTMF from the leg is checked against expected_digits. + StartInteraction { + leg_id: String, + /// PCM frames at MIX_RATE (16kHz), each 320 samples. + prompt_pcm_frames: Vec>, + expected_digits: Vec, + timeout_ms: u32, + result_tx: oneshot::Sender, + }, + /// Cancel an in-progress interaction (e.g., leg being removed). + CancelInteraction { leg_id: String }, + + /// Add a tool leg that receives per-source unmerged audio. + AddToolLeg { + leg_id: String, + tool_type: ToolType, + audio_tx: mpsc::Sender, + }, + /// Remove a tool leg (drops the channel, background task finalizes). + RemoveToolLeg { leg_id: String }, } +// --------------------------------------------------------------------------- +// Mixer internals +// --------------------------------------------------------------------------- + /// Internal per-leg state inside the mixer. struct MixerLegSlot { codec_pt: u8, @@ -56,6 +159,8 @@ struct MixerLegSlot { rtp_seq: u16, rtp_ts: u32, rtp_ssrc: u32, + /// Current role of this leg in the mixer. + role: LegRole, } /// Spawn the mixer task for a call. Returns the command sender and task handle. @@ -79,13 +184,14 @@ async fn mixer_loop( out_tx: OutTx, ) { let mut legs: HashMap = HashMap::new(); + let mut tool_legs: HashMap = HashMap::new(); let mut interval = time::interval(Duration::from_millis(20)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { interval.tick().await; - // 1. Process control commands (non-blocking). + // ── 1. Process control commands (non-blocking). ───────────── loop { match cmd_rx.try_recv() { Ok(MixerCommand::AddLeg { @@ -121,38 +227,115 @@ async fn mixer_loop( rtp_seq: 0, rtp_ts: 0, rtp_ssrc: rand::random(), + role: LegRole::Participant, }, ); } Ok(MixerCommand::RemoveLeg { leg_id }) => { + // If the leg is isolated, send Cancelled before dropping. + if let Some(slot) = legs.get_mut(&leg_id) { + if let LegRole::Isolated(ref mut state) = slot.role { + if let Some(tx) = state.result_tx.take() { + let _ = tx.send(InteractionResult::Cancelled); + } + } + } legs.remove(&leg_id); // Channels drop → I/O tasks exit cleanly. } - Ok(MixerCommand::Shutdown) => return, + Ok(MixerCommand::Shutdown) => { + // Cancel all outstanding interactions before shutting down. + for slot in legs.values_mut() { + if let LegRole::Isolated(ref mut state) = slot.role { + if let Some(tx) = state.result_tx.take() { + let _ = tx.send(InteractionResult::Cancelled); + } + } + } + return; + } + Ok(MixerCommand::StartInteraction { + leg_id, + prompt_pcm_frames, + expected_digits, + timeout_ms, + result_tx, + }) => { + if let Some(slot) = legs.get_mut(&leg_id) { + // Cancel any existing interaction first. + if let LegRole::Isolated(ref mut old_state) = slot.role { + if let Some(tx) = old_state.result_tx.take() { + let _ = tx.send(InteractionResult::Cancelled); + } + } + let timeout_ticks = timeout_ms / 20; + slot.role = LegRole::Isolated(IsolationState { + prompt_frames: VecDeque::from(prompt_pcm_frames), + expected_digits, + timeout_ticks_remaining: timeout_ticks, + prompt_done: false, + result_tx: Some(result_tx), + }); + } else { + // Leg not found — immediately cancel. + let _ = result_tx.send(InteractionResult::Cancelled); + } + } + Ok(MixerCommand::CancelInteraction { leg_id }) => { + if let Some(slot) = legs.get_mut(&leg_id) { + if let LegRole::Isolated(ref mut state) = slot.role { + if let Some(tx) = state.result_tx.take() { + let _ = tx.send(InteractionResult::Cancelled); + } + } + slot.role = LegRole::Participant; + } + } + Ok(MixerCommand::AddToolLeg { + leg_id, + tool_type, + audio_tx, + }) => { + tool_legs.insert(leg_id, ToolLegSlot { tool_type, audio_tx }); + } + Ok(MixerCommand::RemoveToolLeg { leg_id }) => { + tool_legs.remove(&leg_id); + // Dropping the ToolLegSlot drops audio_tx → background task sees channel close. + } Err(mpsc::error::TryRecvError::Empty) => break, Err(mpsc::error::TryRecvError::Disconnected) => return, } } - if legs.is_empty() { + if legs.is_empty() && tool_legs.is_empty() { continue; } - // 2. Drain inbound packets, decode to 16kHz PCM. + // ── 2. Drain inbound packets, decode to 16kHz PCM. ───────── + // DTMF (PT 101) packets are collected separately. let leg_ids: Vec = legs.keys().cloned().collect(); + let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new(); + for lid in &leg_ids { let slot = legs.get_mut(lid).unwrap(); - // Drain channel, keep only the latest packet (simple jitter handling). - let mut latest: Option = None; + // Drain channel — collect DTMF packets separately, keep latest audio. + let mut latest_audio: Option = None; loop { match slot.inbound_rx.try_recv() { - Ok(pkt) => latest = Some(pkt), + Ok(pkt) => { + if pkt.payload_type == 101 { + // DTMF telephone-event: collect for processing. + dtmf_forward.push((lid.clone(), pkt)); + } else { + latest_audio = Some(pkt); + } + } Err(_) => break, } } - if let Some(pkt) = latest { + if let Some(pkt) = latest_audio { slot.silent_ticks = 0; match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) { Ok((pcm, rate)) => { @@ -174,6 +357,9 @@ async fn mixer_loop( slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE]; } } + } else if dtmf_forward.iter().any(|(src, _)| src == lid) { + // Got DTMF but no audio — don't bump silent_ticks (DTMF counts as activity). + slot.silent_ticks = 0; } else { slot.silent_ticks += 1; // After 150 ticks (3 seconds) of silence, zero out to avoid stale audio. @@ -183,50 +369,210 @@ async fn mixer_loop( } } - // 3. Compute total mix (sum of all legs as i32 to avoid overflow). + // ── 3. Compute total mix from PARTICIPANT legs only. ──────── let mut total_mix = vec![0i32; MIX_FRAME_SIZE]; for slot in legs.values() { - for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) { - total_mix[i] += s as i32; + if matches!(slot.role, LegRole::Participant) { + for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) { + total_mix[i] += s as i32; + } } } - // 4. For each leg: mix-minus, resample, encode, send. - for slot in legs.values_mut() { - // Mix-minus: total minus this leg's own contribution. - let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE); - for i in 0..MIX_FRAME_SIZE { - let sample = - (total_mix[i] - slot.last_pcm_frame[i] as i32).clamp(-32768, 32767) as i16; - mix_minus.push(sample); + // ── 4. Per-leg output. ────────────────────────────────────── + // Collect interaction completions to apply after the loop + // (can't mutate role while iterating mutably for encode). + let mut completed_interactions: Vec<(String, InteractionResult)> = Vec::new(); + + for (lid, slot) in legs.iter_mut() { + match &mut slot.role { + LegRole::Participant => { + // Mix-minus: total minus this leg's own contribution. + let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE); + for i in 0..MIX_FRAME_SIZE { + let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32) + .clamp(-32768, 32767) as i16; + mix_minus.push(sample); + } + + // Resample from 16kHz to the leg's codec native rate. + let target_rate = codec_sample_rate(slot.codec_pt); + let resampled = if target_rate == MIX_RATE { + mix_minus + } else { + slot.transcoder + .resample(&mix_minus, MIX_RATE, target_rate) + .unwrap_or_default() + }; + + // Encode to the leg's codec. + let encoded = + match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) { + Ok(e) if !e.is_empty() => e, + _ => continue, + }; + + // Build RTP packet with header. + let header = + build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc); + let mut rtp = header.to_vec(); + rtp.extend_from_slice(&encoded); + + slot.rtp_seq = slot.rtp_seq.wrapping_add(1); + slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); + + // Non-blocking send — drop frame if channel is full. + let _ = slot.outbound_tx.try_send(rtp); + } + LegRole::Isolated(state) => { + // Check for DTMF digit from this leg. + let mut matched_digit: Option = None; + for (src_lid, dtmf_pkt) in &dtmf_forward { + if src_lid == lid && dtmf_pkt.payload.len() >= 4 { + let event_id = dtmf_pkt.payload[0]; + let end_bit = (dtmf_pkt.payload[1] & 0x80) != 0; + if end_bit { + const EVENT_CHARS: &[char] = &[ + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '*', '#', + 'A', 'B', 'C', 'D', + ]; + if let Some(&ch) = EVENT_CHARS.get(event_id as usize) { + if state.expected_digits.contains(&ch) { + matched_digit = Some(ch); + break; + } + } + } + } + } + + if let Some(digit) = matched_digit { + // Interaction complete — digit matched. + completed_interactions + .push((lid.clone(), InteractionResult::Digit(digit))); + } else { + // Play prompt frame or silence. + let pcm_frame = if let Some(frame) = state.prompt_frames.pop_front() { + frame + } else { + state.prompt_done = true; + vec![0i16; MIX_FRAME_SIZE] + }; + + // Encode prompt frame to the leg's codec (reuses existing encode path). + let target_rate = codec_sample_rate(slot.codec_pt); + let resampled = if target_rate == MIX_RATE { + pcm_frame + } else { + slot.transcoder + .resample(&pcm_frame, MIX_RATE, target_rate) + .unwrap_or_default() + }; + + if let Ok(encoded) = + slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) + { + if !encoded.is_empty() { + let header = build_rtp_header( + slot.codec_pt, + slot.rtp_seq, + slot.rtp_ts, + slot.rtp_ssrc, + ); + let mut rtp = header.to_vec(); + rtp.extend_from_slice(&encoded); + slot.rtp_seq = slot.rtp_seq.wrapping_add(1); + slot.rtp_ts = slot + .rtp_ts + .wrapping_add(rtp_clock_increment(slot.codec_pt)); + let _ = slot.outbound_tx.try_send(rtp); + } + } + + // Check timeout (only after prompt finishes). + if state.prompt_done { + if state.timeout_ticks_remaining == 0 { + completed_interactions + .push((lid.clone(), InteractionResult::Timeout)); + } else { + state.timeout_ticks_remaining -= 1; + } + } + } + } } + } - // Resample from 16kHz to the leg's codec native rate. - let target_rate = codec_sample_rate(slot.codec_pt); - let resampled = if target_rate == MIX_RATE { - mix_minus - } else { - slot.transcoder - .resample(&mix_minus, MIX_RATE, target_rate) - .unwrap_or_default() - }; + // Apply completed interactions — revert legs to Participant. + for (lid, result) in completed_interactions { + if let Some(slot) = legs.get_mut(&lid) { + if let LegRole::Isolated(ref mut state) = slot.role { + if let Some(tx) = state.result_tx.take() { + let _ = tx.send(result); + } + } + slot.role = LegRole::Participant; + } + } - // Encode to the leg's codec. - let encoded = match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) { - Ok(e) if !e.is_empty() => e, - _ => continue, - }; + // ── 5. Distribute per-source audio to tool legs. ──────────── + if !tool_legs.is_empty() { + // Collect participant PCM frames (computed in step 2). + let sources: Vec = legs + .iter() + .filter(|(_, s)| matches!(s.role, LegRole::Participant)) + .map(|(lid, s)| ToolAudioSource { + leg_id: lid.clone(), + pcm_16k: s.last_pcm_frame.clone(), + }) + .collect(); - // Build RTP packet with header. - let header = build_rtp_header(slot.codec_pt, slot.rtp_seq, slot.rtp_ts, slot.rtp_ssrc); - let mut rtp = header.to_vec(); - rtp.extend_from_slice(&encoded); + for tool in tool_legs.values() { + let batch = ToolAudioBatch { + sources: sources + .iter() + .map(|s| ToolAudioSource { + leg_id: s.leg_id.clone(), + pcm_16k: s.pcm_16k.clone(), + }) + .collect(), + }; + // Non-blocking send — drop batch if tool can't keep up. + let _ = tool.audio_tx.try_send(batch); + } + } - slot.rtp_seq = slot.rtp_seq.wrapping_add(1); - slot.rtp_ts = slot.rtp_ts.wrapping_add(rtp_clock_increment(slot.codec_pt)); - - // Non-blocking send — drop frame if channel is full. - let _ = slot.outbound_tx.try_send(rtp); + // ── 6. Forward DTMF packets between participant legs only. ── + for (source_lid, dtmf_pkt) in &dtmf_forward { + // Skip if the source is an isolated leg (its DTMF was handled in step 4). + if let Some(src_slot) = legs.get(source_lid) { + if matches!(src_slot.role, LegRole::Isolated(_)) { + continue; + } + } + for (target_lid, target_slot) in legs.iter_mut() { + if target_lid == source_lid { + continue; // Don't echo DTMF back to sender. + } + // Don't forward to isolated legs. + if matches!(target_slot.role, LegRole::Isolated(_)) { + continue; + } + let mut header = build_rtp_header( + 101, + target_slot.rtp_seq, + target_slot.rtp_ts, + target_slot.rtp_ssrc, + ); + if dtmf_pkt.marker { + header[1] |= 0x80; // Set marker bit. + } + let mut rtp_out = header.to_vec(); + rtp_out.extend_from_slice(&dtmf_pkt.payload); + target_slot.rtp_seq = target_slot.rtp_seq.wrapping_add(1); + // Don't increment rtp_ts for DTMF — it shares timestamp context with audio. + let _ = target_slot.outbound_tx.try_send(rtp_out); + } } } } diff --git a/rust/crates/proxy-engine/src/provider.rs b/rust/crates/proxy-engine/src/provider.rs index b34c184..67cb71c 100644 --- a/rust/crates/proxy-engine/src/provider.rs +++ b/rust/crates/proxy-engine/src/provider.rs @@ -321,6 +321,17 @@ impl ProviderManager { None } + /// Find a provider by its config ID (e.g. "easybell"). + pub async fn find_by_provider_id(&self, provider_id: &str) -> Option>> { + for ps_arc in &self.providers { + let ps = ps_arc.lock().await; + if ps.config.id == provider_id { + return Some(ps_arc.clone()); + } + } + None + } + /// Check if a provider is currently registered. pub async fn is_registered(&self, provider_id: &str) -> bool { for ps_arc in &self.providers { diff --git a/rust/crates/proxy-engine/src/recorder.rs b/rust/crates/proxy-engine/src/recorder.rs index 464e4c5..35b46f8 100644 --- a/rust/crates/proxy-engine/src/recorder.rs +++ b/rust/crates/proxy-engine/src/recorder.rs @@ -55,6 +55,56 @@ impl Recorder { }) } + /// Create a recorder that writes raw PCM at a given sample rate. + /// Used by tool legs that already have decoded PCM (no RTP processing needed). + pub fn new_pcm(file_path: &str, sample_rate: u32, max_duration_ms: Option) -> Result { + if let Some(parent) = Path::new(file_path).parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("create dir: {e}"))?; + } + + let spec = hound::WavSpec { + channels: 1, + sample_rate, + bits_per_sample: 16, + sample_format: hound::SampleFormat::Int, + }; + + let writer = hound::WavWriter::create(file_path, spec) + .map_err(|e| format!("create WAV {file_path}: {e}"))?; + + // source_pt is unused for PCM recording; set to 0. + let transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?; + let max_samples = max_duration_ms.map(|ms| (sample_rate as u64 * ms) / 1000); + + Ok(Self { + writer, + transcoder, + source_pt: 0, + total_samples: 0, + sample_rate, + max_samples, + file_path: file_path.to_string(), + }) + } + + /// Write raw PCM samples directly (no RTP decoding). + /// Returns true if recording should continue, false if max duration reached. + pub fn write_pcm(&mut self, samples: &[i16]) -> bool { + for &sample in samples { + if self.writer.write_sample(sample).is_err() { + return false; + } + self.total_samples += 1; + if let Some(max) = self.max_samples { + if self.total_samples >= max { + return false; + } + } + } + true + } + /// Process an incoming RTP packet (full packet with header). /// Returns true if recording should continue, false if max duration reached. pub fn process_rtp(&mut self, data: &[u8]) -> bool { diff --git a/rust/crates/proxy-engine/src/tool_leg.rs b/rust/crates/proxy-engine/src/tool_leg.rs new file mode 100644 index 0000000..021b6e1 --- /dev/null +++ b/rust/crates/proxy-engine/src/tool_leg.rs @@ -0,0 +1,138 @@ +//! Tool leg consumers — background tasks that process per-source unmerged audio. +//! +//! Tool legs are observer legs that receive individual audio streams from each +//! participant in a call. The mixer pipes `ToolAudioBatch` every 20ms containing +//! each participant's decoded PCM@16kHz tagged with source leg ID. +//! +//! Consumers: +//! - **Recording**: writes per-source WAV files for speaker-separated recording. +//! - **Transcription**: stub for future Whisper integration (accumulates audio in Rust). + +use crate::ipc::{emit_event, OutTx}; +use crate::mixer::ToolAudioBatch; +use crate::recorder::Recorder; +use std::collections::HashMap; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +// --------------------------------------------------------------------------- +// Recording consumer +// --------------------------------------------------------------------------- + +/// Spawn a recording tool leg that writes per-source WAV files. +/// +/// Returns the channel sender (for the mixer to send batches) and the task handle. +/// When the channel is closed (tool leg removed), all WAV files are finalized +/// and a `tool_recording_done` event is emitted. +pub fn spawn_recording_tool( + tool_leg_id: String, + call_id: String, + base_dir: String, + out_tx: OutTx, +) -> (mpsc::Sender, JoinHandle<()>) { + let (tx, mut rx) = mpsc::channel::(64); + + let handle = tokio::spawn(async move { + let mut recorders: HashMap = HashMap::new(); + + while let Some(batch) = rx.recv().await { + for source in &batch.sources { + // Skip silence-only frames (all zeros = no audio activity). + let has_audio = source.pcm_16k.iter().any(|&s| s != 0); + if !has_audio && !recorders.contains_key(&source.leg_id) { + continue; // Don't create a file for silence-only sources. + } + + let recorder = recorders.entry(source.leg_id.clone()).or_insert_with(|| { + let path = format!("{}/{}-{}.wav", base_dir, call_id, source.leg_id); + Recorder::new_pcm(&path, 16000, None).unwrap_or_else(|e| { + panic!("failed to create recorder for {}: {e}", source.leg_id); + }) + }); + + if !recorder.write_pcm(&source.pcm_16k) { + // Max duration reached — stop recording this source. + break; + } + } + } + + // Channel closed — finalize all recordings. + let mut files = Vec::new(); + for (leg_id, rec) in recorders { + let result = rec.stop(); + files.push(serde_json::json!({ + "source_leg_id": leg_id, + "file_path": result.file_path, + "duration_ms": result.duration_ms, + })); + } + + emit_event( + &out_tx, + "tool_recording_done", + serde_json::json!({ + "call_id": call_id, + "tool_leg_id": tool_leg_id, + "files": files, + }), + ); + }); + + (tx, handle) +} + +// --------------------------------------------------------------------------- +// Transcription consumer (stub — real plumbing, stub consumer) +// --------------------------------------------------------------------------- + +/// Spawn a transcription tool leg. +/// +/// The plumbing is fully real: it receives per-source unmerged PCM@16kHz from +/// the mixer every 20ms. The consumer is a stub that accumulates audio and +/// reports metadata on close. Future: will stream to a Whisper HTTP endpoint. +pub fn spawn_transcription_tool( + tool_leg_id: String, + call_id: String, + out_tx: OutTx, +) -> (mpsc::Sender, JoinHandle<()>) { + let (tx, mut rx) = mpsc::channel::(64); + + let handle = tokio::spawn(async move { + // Track per-source sample counts for duration reporting. + let mut source_samples: HashMap = HashMap::new(); + + while let Some(batch) = rx.recv().await { + for source in &batch.sources { + *source_samples.entry(source.leg_id.clone()).or_insert(0) += + source.pcm_16k.len() as u64; + + // TODO: Future — accumulate chunks and stream to Whisper endpoint. + // For now, the audio is received and counted but not processed. + } + } + + // Channel closed — report metadata. + let sources: Vec = source_samples + .iter() + .map(|(leg_id, samples)| { + serde_json::json!({ + "source_leg_id": leg_id, + "duration_ms": (samples * 1000) / 16000, + }) + }) + .collect(); + + emit_event( + &out_tx, + "tool_transcription_done", + serde_json::json!({ + "call_id": call_id, + "tool_leg_id": tool_leg_id, + "sources": sources, + }), + ); + }); + + (tx, handle) +} diff --git a/rust/crates/proxy-engine/src/webrtc_engine.rs b/rust/crates/proxy-engine/src/webrtc_engine.rs index 40558f4..a31b130 100644 --- a/rust/crates/proxy-engine/src/webrtc_engine.rs +++ b/rust/crates/proxy-engine/src/webrtc_engine.rs @@ -290,6 +290,8 @@ async fn browser_to_mixer_loop( .send(RtpPacket { payload: payload.to_vec(), payload_type: PT_OPUS, + marker: false, + timestamp: 0, }) .await; } diff --git a/ts/proxybridge.ts b/ts/proxybridge.ts index fafcb85..0ec4892 100644 --- a/ts/proxybridge.ts +++ b/ts/proxybridge.ts @@ -41,6 +41,32 @@ type TProxyCommands = { params: { call_id: string }; result: { file_path: string; duration_ms: number }; }; + start_interaction: { + params: { + call_id: string; + leg_id: string; + prompt_wav: string; + expected_digits: string; + timeout_ms: number; + }; + result: { result: 'digit' | 'timeout' | 'cancelled'; digit?: string }; + }; + add_tool_leg: { + params: { + call_id: string; + tool_type: 'recording' | 'transcription'; + config?: Record; + }; + result: { tool_leg_id: string }; + }; + remove_tool_leg: { + params: { call_id: string; tool_leg_id: string }; + result: Record; + }; + set_leg_metadata: { + params: { call_id: string; leg_id: string; key: string; value: unknown }; + result: Record; + }; }; // --------------------------------------------------------------------------- @@ -280,11 +306,107 @@ export async function webrtcClose(sessionId: string): Promise { } catch { /* ignore */ } } +// --------------------------------------------------------------------------- +// Leg interaction & tool leg commands +// --------------------------------------------------------------------------- + +/** + * Start an interaction on a specific leg — isolate it, play a prompt, collect DTMF. + * Blocks until the interaction completes (digit pressed, timeout, or cancelled). + */ +export async function startInteraction( + callId: string, + legId: string, + promptWav: string, + expectedDigits: string, + timeoutMs: number, +): Promise<{ result: 'digit' | 'timeout' | 'cancelled'; digit?: string } | null> { + if (!bridge || !initialized) return null; + try { + const result = await bridge.sendCommand('start_interaction', { + call_id: callId, + leg_id: legId, + prompt_wav: promptWav, + expected_digits: expectedDigits, + timeout_ms: timeoutMs, + } as any); + return result as any; + } catch (e: any) { + logFn?.(`[proxy-engine] start_interaction error: ${e?.message || e}`); + return null; + } +} + +/** + * Add a tool leg (recording or transcription) to a call. + * Tool legs receive per-source unmerged audio from all participants. + */ +export async function addToolLeg( + callId: string, + toolType: 'recording' | 'transcription', + config?: Record, +): Promise { + if (!bridge || !initialized) return null; + try { + const result = await bridge.sendCommand('add_tool_leg', { + call_id: callId, + tool_type: toolType, + config, + } as any); + return (result as any)?.tool_leg_id || null; + } catch (e: any) { + logFn?.(`[proxy-engine] add_tool_leg error: ${e?.message || e}`); + return null; + } +} + +/** + * Remove a tool leg from a call. Triggers finalization (WAV files, metadata). + */ +export async function removeToolLeg(callId: string, toolLegId: string): Promise { + if (!bridge || !initialized) return false; + try { + await bridge.sendCommand('remove_tool_leg', { + call_id: callId, + tool_leg_id: toolLegId, + } as any); + return true; + } catch (e: any) { + logFn?.(`[proxy-engine] remove_tool_leg error: ${e?.message || e}`); + return false; + } +} + +/** + * Set a metadata key-value pair on a leg. + */ +export async function setLegMetadata( + callId: string, + legId: string, + key: string, + value: unknown, +): Promise { + if (!bridge || !initialized) return false; + try { + await bridge.sendCommand('set_leg_metadata', { + call_id: callId, + leg_id: legId, + key, + value, + } as any); + return true; + } catch (e: any) { + logFn?.(`[proxy-engine] set_leg_metadata error: ${e?.message || e}`); + return false; + } +} + /** * Subscribe to an event from the proxy engine. * Event names: incoming_call, outbound_device_call, call_ringing, * call_answered, call_ended, provider_registered, device_registered, - * dtmf_digit, recording_done, sip_unhandled + * dtmf_digit, recording_done, tool_recording_done, tool_transcription_done, + * leg_added, leg_removed, sip_unhandled */ export function onProxyEvent(event: string, handler: (data: any) => void): void { if (!bridge) throw new Error('proxy engine not initialized'); diff --git a/ts/sipproxy.ts b/ts/sipproxy.ts index 2ee994c..01387dd 100644 --- a/ts/sipproxy.ts +++ b/ts/sipproxy.ts @@ -96,6 +96,16 @@ interface IDeviceStatus { isBrowser: boolean; } +interface IActiveLeg { + id: string; + type: 'sip-device' | 'sip-provider' | 'webrtc' | 'tool'; + state: string; + codec: string | null; + rtpPort: number | null; + remoteMedia: string | null; + metadata: Record; +} + interface IActiveCall { id: string; direction: string; @@ -104,6 +114,13 @@ interface IActiveCall { providerUsed: string | null; state: string; startedAt: number; + legs: Map; +} + +interface IHistoryLeg { + id: string; + type: string; + metadata: Record; } interface ICallHistoryEntry { @@ -113,6 +130,7 @@ interface ICallHistoryEntry { calleeNumber: string | null; startedAt: number; duration: number; + legs: IHistoryLeg[]; } const providerStatuses = new Map(); @@ -187,7 +205,18 @@ function getStatus() { calls: [...activeCalls.values()].map((c) => ({ ...c, duration: Math.floor((Date.now() - c.startedAt) / 1000), - legs: [], + legs: [...c.legs.values()].map((l) => ({ + id: l.id, + type: l.type, + state: l.state, + codec: l.codec, + rtpPort: l.rtpPort, + remoteMedia: l.remoteMedia, + metadata: l.metadata || {}, + pktSent: 0, + pktReceived: 0, + transcoding: false, + })), })), callHistory, contacts: appConfig.contacts || [], @@ -242,6 +271,7 @@ async function startProxyEngine(): Promise { providerUsed: data.provider_id, state: 'ringing', startedAt: Date.now(), + legs: new Map(), }); // Notify browsers of incoming call. @@ -266,6 +296,7 @@ async function startProxyEngine(): Promise { providerUsed: null, state: 'setting-up', startedAt: Date.now(), + legs: new Map(), }); }); @@ -279,6 +310,7 @@ async function startProxyEngine(): Promise { providerUsed: data.provider_id, state: 'setting-up', startedAt: Date.now(), + legs: new Map(), }); // Notify all browser devices — they can connect via WebRTC to listen/talk. @@ -303,6 +335,20 @@ async function startProxyEngine(): Promise { if (call) { call.state = 'connected'; log(`[call] ${data.call_id} connected`); + + // Enrich provider leg with media info from the answered event. + if (data.provider_media_addr && data.provider_media_port) { + for (const leg of call.legs.values()) { + if (leg.type === 'sip-provider') { + leg.remoteMedia = `${data.provider_media_addr}:${data.provider_media_port}`; + if (data.sip_pt !== undefined) { + const codecNames: Record = { 0: 'PCMU', 8: 'PCMA', 9: 'G.722', 111: 'Opus' }; + leg.codec = codecNames[data.sip_pt] || `PT${data.sip_pt}`; + } + break; + } + } + } } // Try to link WebRTC session to this call for audio bridging. @@ -331,6 +377,15 @@ async function startProxyEngine(): Promise { const call = activeCalls.get(data.call_id); if (call) { log(`[call] ${data.call_id} ended: ${data.reason} (${data.duration}s)`); + // Snapshot legs with metadata for history. + const historyLegs: IHistoryLeg[] = []; + for (const [, leg] of call.legs) { + historyLegs.push({ + id: leg.id, + type: leg.type, + metadata: leg.metadata || {}, + }); + } // Move to history. callHistory.unshift({ id: call.id, @@ -339,6 +394,7 @@ async function startProxyEngine(): Promise { calleeNumber: call.calleeNumber, startedAt: call.startedAt, duration: data.duration, + legs: historyLegs, }); if (callHistory.length > MAX_HISTORY) callHistory.pop(); activeCalls.delete(data.call_id); @@ -361,17 +417,50 @@ async function startProxyEngine(): Promise { log(`[sip] unhandled ${data.method_or_status} Call-ID=${data.call_id?.slice(0, 20)} from=${data.from_addr}:${data.from_port}`); }); - // Leg events (multiparty). + // Leg events (multiparty) — update shadow state so the dashboard shows legs. onProxyEvent('leg_added', (data: any) => { log(`[leg] added: call=${data.call_id} leg=${data.leg_id} kind=${data.kind} state=${data.state}`); + const call = activeCalls.get(data.call_id); + if (call) { + call.legs.set(data.leg_id, { + id: data.leg_id, + type: data.kind, + state: data.state, + codec: null, + rtpPort: null, + remoteMedia: null, + metadata: data.metadata || {}, + }); + } }); onProxyEvent('leg_removed', (data: any) => { log(`[leg] removed: call=${data.call_id} leg=${data.leg_id}`); + activeCalls.get(data.call_id)?.legs.delete(data.leg_id); }); onProxyEvent('leg_state_changed', (data: any) => { log(`[leg] state: call=${data.call_id} leg=${data.leg_id} → ${data.state}`); + const call = activeCalls.get(data.call_id); + if (!call) return; + const leg = call.legs.get(data.leg_id); + if (leg) { + leg.state = data.state; + if (data.metadata) leg.metadata = data.metadata; + } else { + // Initial legs (provider/device) don't emit leg_added — create on first state change. + const legId: string = data.leg_id; + const type = legId.includes('-prov') ? 'sip-provider' : legId.includes('-dev') ? 'sip-device' : 'webrtc'; + call.legs.set(data.leg_id, { + id: data.leg_id, + type, + state: data.state, + codec: null, + rtpPort: null, + remoteMedia: null, + metadata: data.metadata || {}, + }); + } }); // WebRTC events from Rust — forward ICE candidates to browser via WebSocket. @@ -484,6 +573,7 @@ initWebUi( providerUsed: providerId || null, state: 'setting-up', startedAt: Date.now(), + legs: new Map(), }); } else { log(`[dashboard] call failed for ${number}`); diff --git a/ts_web/state/appstate.ts b/ts_web/state/appstate.ts index ac6db69..8790be9 100644 --- a/ts_web/state/appstate.ts +++ b/ts_web/state/appstate.ts @@ -20,7 +20,7 @@ export interface IDeviceStatus { export interface ILegStatus { id: string; - type: 'sip-device' | 'sip-provider' | 'webrtc'; + type: 'sip-device' | 'sip-provider' | 'webrtc' | 'tool'; state: string; remoteMedia: { address: string; port: number } | null; rtpPort: number | null; @@ -28,6 +28,7 @@ export interface ILegStatus { pktReceived: number; codec: string | null; transcoding: boolean; + metadata?: Record; } export interface ICallStatus { @@ -42,6 +43,12 @@ export interface ICallStatus { legs: ILegStatus[]; } +export interface IHistoryLeg { + id: string; + type: string; + metadata: Record; +} + export interface ICallHistoryEntry { id: string; direction: 'inbound' | 'outbound' | 'internal'; @@ -50,6 +57,7 @@ export interface ICallHistoryEntry { providerUsed: string | null; startedAt: number; duration: number; + legs?: IHistoryLeg[]; } export interface IContact {