4 Commits

20 changed files with 435 additions and 127 deletions

View File

@@ -1,5 +1,22 @@
# Changelog
## 2026-04-10 - 1.17.1 - fix(proxy-engine,codec-lib,sip-proto,ts)
preserve negotiated media details and improve RTP audio handling across call legs
- Use native Opus float encode/decode to avoid unnecessary i16 quantization in the f32 audio path.
- Parse full RTP headers including extensions and sequence numbers, then sort inbound packets before decoding to keep codec state stable for out-of-order audio.
- Capture negotiated codec payload types from SDP offers and answers and include codec, RTP port, remote media, and metadata in leg_added events.
- Emit leg_state_changed and leg_removed events more consistently so the dashboard reflects leg lifecycle updates accurately.
## 2026-04-10 - 1.17.0 - feat(proxy-engine)
upgrade the internal audio bus to 48kHz f32 with per-leg denoising and improve SIP leg routing
- switch mixer, prompt playback, and tool leg audio handling from 16kHz i16 to 48kHz f32 for higher-quality internal processing
- add f32 decode/encode and resampling support plus standalone RNNoise denoiser creation in codec-lib
- apply per-leg inbound noise suppression in the mixer before mix-minus generation
- fix passthrough call routing by matching the actual leg from the signaling source address when Call-IDs are shared
- correct dialed number extraction from bare SIP request URIs by parsing the user part directly
## 2026-04-10 - 1.16.0 - feat(proxy-engine)
integrate Kokoro TTS generation into proxy-engine and simplify TypeScript prompt handling to use cached WAV files

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,6 @@
{
"name": "siprouter",
"version": "1.16.0",
"version": "1.17.1",
"private": true,
"type": "module",
"scripts": {

1
rust/Cargo.lock generated
View File

@@ -2179,6 +2179,7 @@ dependencies = [
"codec-lib",
"hound",
"kokoro-tts",
"nnnoiseless",
"ort",
"rand 0.8.5",
"regex-lite",

View File

@@ -104,6 +104,8 @@ pub struct TranscodeState {
g722_dec: libg722::decoder::Decoder,
/// Cached FFT resamplers keyed by (from_rate, to_rate, chunk_size).
resamplers: HashMap<(u32, u32, usize), FftFixedIn<f64>>,
/// Cached f32 FFT resamplers keyed by (from_rate, to_rate, chunk_size).
resamplers_f32: HashMap<(u32, u32, usize), FftFixedIn<f32>>,
/// ML noise suppression for the SIP-bound direction.
denoiser_to_sip: Box<DenoiseState<'static>>,
/// ML noise suppression for the browser-bound direction.
@@ -133,6 +135,7 @@ impl TranscodeState {
g722_enc,
g722_dec,
resamplers: HashMap::new(),
resamplers_f32: HashMap::new(),
denoiser_to_sip: DenoiseState::new(),
denoiser_to_browser: DenoiseState::new(),
})
@@ -293,6 +296,126 @@ impl TranscodeState {
_ => Err(format!("unsupported target PT {pt}")),
}
}
// ---- f32 API for high-quality internal bus ----------------------------
/// Decode an encoded audio payload to f32 PCM samples in [-1.0, 1.0].
/// Returns (samples, sample_rate).
///
/// For Opus, uses native float decode (no i16 quantization).
/// For G.722/G.711, decodes to i16 then converts (codec is natively i16).
pub fn decode_to_f32(&mut self, data: &[u8], pt: u8) -> Result<(Vec<f32>, u32), String> {
match pt {
PT_OPUS => {
let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz
let packet =
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
let n: usize = self
.opus_dec
.decode_float(Some(packet), out, false)
.map_err(|e| format!("opus decode_float: {e}"))?
.into();
pcm.truncate(n);
Ok((pcm, 48000))
}
_ => {
// G.722, PCMU, PCMA: natively i16 codecs — decode then convert.
let (pcm_i16, rate) = self.decode_to_pcm(data, pt)?;
let pcm_f32 = pcm_i16.iter().map(|&s| s as f32 / 32768.0).collect();
Ok((pcm_f32, rate))
}
}
}
/// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec.
///
/// For Opus, uses native float encode (no i16 quantization).
/// For G.722/G.711, converts to i16 then encodes (codec is natively i16).
pub fn encode_from_f32(&mut self, pcm: &[f32], pt: u8) -> Result<Vec<u8>, String> {
match pt {
PT_OPUS => {
let mut buf = vec![0u8; 4000];
let n: usize = self
.opus_enc
.encode_float(pcm, &mut buf)
.map_err(|e| format!("opus encode_float: {e}"))?
.into();
buf.truncate(n);
Ok(buf)
}
_ => {
// G.722, PCMU, PCMA: natively i16 codecs.
let pcm_i16: Vec<i16> = pcm
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();
self.encode_from_pcm(&pcm_i16, pt)
}
}
}
/// High-quality sample rate conversion for f32 PCM using rubato FFT resampler.
/// Uses a separate cache from the i16 resampler.
pub fn resample_f32(
&mut self,
pcm: &[f32],
from_rate: u32,
to_rate: u32,
) -> Result<Vec<f32>, String> {
if from_rate == to_rate || pcm.is_empty() {
return Ok(pcm.to_vec());
}
let chunk = pcm.len();
let key = (from_rate, to_rate, chunk);
if !self.resamplers_f32.contains_key(&key) {
let r =
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, chunk, 1, 1)
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
self.resamplers_f32.insert(key, r);
}
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
let input = vec![pcm.to_vec()];
let result = resampler
.process(&input, None)
.map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?;
Ok(result[0].clone())
}
/// Apply RNNoise ML noise suppression to 48kHz f32 PCM audio.
/// Processes in 480-sample (10ms) frames. State persists across calls.
/// Operates natively in f32 — no i16 conversion overhead.
pub fn denoise_f32(denoiser: &mut DenoiseState, pcm: &[f32]) -> Vec<f32> {
let frame_size = DenoiseState::FRAME_SIZE; // 480
let total = pcm.len();
let whole = (total / frame_size) * frame_size;
let mut output = Vec::with_capacity(total);
let mut out_buf = [0.0f32; 480];
// nnnoiseless expects f32 samples scaled as i16 range (-32768..32767).
for offset in (0..whole).step_by(frame_size) {
let input: Vec<f32> = pcm[offset..offset + frame_size]
.iter()
.map(|&s| s * 32768.0)
.collect();
denoiser.process_frame(&mut out_buf, &input);
output.extend(out_buf.iter().map(|&s| s / 32768.0));
}
if whole < total {
output.extend_from_slice(&pcm[whole..]);
}
output
}
}
/// Create a new standalone denoiser for per-leg inbound processing.
pub fn new_denoiser() -> Box<DenoiseState<'static>> {
DenoiseState::new()
}
#[cfg(test)]

View File

@@ -10,6 +10,7 @@ path = "src/main.rs"
[dependencies]
codec-lib = { path = "../codec-lib" }
sip-proto = { path = "../sip-proto" }
nnnoiseless = { version = "0.5", default-features = false }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View File

@@ -10,9 +10,9 @@ use tokio::net::UdpSocket;
use tokio::time::{self, Duration};
/// Mixing sample rate used by the mixer (must stay in sync with mixer::MIX_RATE).
const MIX_RATE: u32 = 16000;
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320;
const MIX_FRAME_SIZE: usize = 960;
/// Play a WAV file as RTP to a destination.
/// Returns when playback is complete.
@@ -178,9 +178,9 @@ pub async fn play_beep(
Ok((seq, ts))
}
/// Load a WAV file and split it into 20ms PCM frames at 16kHz.
/// Load a WAV file and split it into 20ms f32 PCM frames at 48kHz.
/// Used by the leg interaction system to prepare prompt audio for the mixer.
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
let path = Path::new(wav_path);
if !path.exists() {
return Err(format!("WAV file not found: {wav_path}"));
@@ -191,17 +191,17 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
let spec = reader.spec();
let wav_rate = spec.sample_rate;
// Read all samples as i16.
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
// Read all samples as f32 in [-1.0, 1.0].
let samples: Vec<f32> = if spec.bits_per_sample == 16 {
reader
.samples::<i16>()
.filter_map(|s| s.ok())
.map(|s| s as f32 / 32768.0)
.collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
.filter_map(|s| s.ok())
.map(|s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect()
} else {
return Err(format!(
@@ -214,24 +214,24 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
return Ok(vec![]);
}
// Resample to MIX_RATE (16kHz) if needed.
// Resample to MIX_RATE (48kHz) if needed.
let resampled = if wav_rate != MIX_RATE {
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
transcoder
.resample(&samples, wav_rate, MIX_RATE)
.resample_f32(&samples, wav_rate, MIX_RATE)
.map_err(|e| format!("resample: {e}"))?
} else {
samples
};
// Split into MIX_FRAME_SIZE (320) sample frames.
// Split into MIX_FRAME_SIZE (960) sample frames.
let mut frames = Vec::new();
let mut offset = 0;
while offset < resampled.len() {
let end = (offset + MIX_FRAME_SIZE).min(resampled.len());
let mut frame = resampled[offset..end].to_vec();
// Pad short final frame with silence.
frame.resize(MIX_FRAME_SIZE, 0);
frame.resize(MIX_FRAME_SIZE, 0.0);
frames.push(frame);
offset += MIX_FRAME_SIZE;
}

View File

@@ -20,6 +20,35 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
/// Emit a `leg_added` event with full leg information.
/// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed.
fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
let metadata: serde_json::Value = if leg.metadata.is_empty() {
serde_json::json!({})
} else {
serde_json::Value::Object(
leg.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)
};
emit_event(
tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg.id,
"kind": leg.kind.as_str(),
"state": leg.state.as_str(),
"codec": sip_proto::helpers::codec_name(leg.codec_pt),
"rtpPort": leg.rtp_port,
"remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
"metadata": metadata,
}),
);
}
pub struct CallManager {
/// All active calls, keyed by internal call ID.
pub calls: HashMap<String, Call>,
@@ -120,7 +149,19 @@ impl CallManager {
}
// Passthrough-style routing for inbound/outbound device↔provider calls.
self.route_passthrough_message(&call_id, &leg_id, msg, from_addr, socket, config)
// The sip_index only stores one leg for shared Call-IDs, so we need to
// determine which leg the message actually belongs to by comparing from_addr.
let actual_leg_id = self
.calls
.get(&call_id)
.and_then(|call| {
call.legs
.values()
.find(|l| l.signaling_addr == Some(from_addr))
.map(|l| l.id.clone())
})
.unwrap_or(leg_id);
self.route_passthrough_message(&call_id, &actual_leg_id, msg, from_addr, socket, config)
.await
}
@@ -253,6 +294,11 @@ impl CallManager {
dev_leg.state = LegState::Connected;
}
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": dev_leg_id, "state": "connected" }),
);
// Wire device leg to mixer.
if let Some(dev_remote_addr) = dev_remote {
@@ -312,6 +358,8 @@ impl CallManager {
leg.state = LegState::Terminated;
}
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
self.terminate_call(call_id).await;
@@ -517,21 +565,30 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Connected;
// Learn remote media from SDP.
// Learn remote media and negotiated codec from SDP answer.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
leg.remote_media = Some(addr);
}
// Use the codec from the SDP answer (what the remote actually selected).
if let Some(pt) = ep.codec_pt {
leg.codec_pt = pt;
}
}
}
needs_wiring = true;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
if call.state != CallState::Connected {
call.state = CallState::Connected;
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
@@ -677,15 +734,19 @@ impl CallManager {
call.callee_number = Some(called_number);
call.state = CallState::Ringing;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
let mut codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
// Provider leg — extract media from SDP.
// Provider leg — extract media and negotiated codec from SDP.
let mut provider_media: Option<SocketAddr> = None;
if invite.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&invite.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
provider_media = Some(addr);
}
// Use the codec from the provider's SDP offer (what they actually want to use).
if let Some(pt) = ep.codec_pt {
codec_pt = pt;
}
}
}
@@ -755,6 +816,16 @@ impl CallManager {
// Store the call.
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs.
if let Some(call) = self.calls.get(&call_id) {
if let Some(leg) = call.legs.get(&provider_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
if let Some(leg) = call.legs.get(&device_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -842,6 +913,14 @@ impl CallManager {
.insert(sip_call_id, (call_id.clone(), leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -866,11 +945,18 @@ impl CallManager {
let lan_port = config.proxy.lan_port;
let device_sip_call_id = invite.call_id().to_string();
// Extract just the user part from the request URI (e.g., "sip:16196000@10.0.0.1" → "16196000").
// extract_uri is for header values with angle brackets, not bare request URIs.
let dialed_number = invite
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or(invite.request_uri().unwrap_or(""))
.to_string();
.map(|uri| {
let stripped = uri
.strip_prefix("sip:")
.or_else(|| uri.strip_prefix("sips:"))
.unwrap_or(uri);
stripped.split('@').next().unwrap_or(stripped).to_string()
})
.unwrap_or_default();
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
Some(a) => a,
@@ -983,6 +1069,14 @@ impl CallManager {
.insert(provider_sip_call_id, (call_id.clone(), provider_leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs (device + provider).
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -1050,17 +1144,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-provider",
"state": "inviting",
"number": number,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1126,17 +1214,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-device",
"state": "inviting",
"device_id": device_id,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1223,6 +1305,13 @@ impl CallManager {
None => return false,
};
// Emit leg_removed for source call.
emit_event(
&self.out_tx,
"leg_removed",
serde_json::json!({ "call_id": source_call_id, "leg_id": leg_id }),
);
// Update SIP index to point to the target call.
if let Some(sip_cid) = &leg_info.sip_call_id {
self.sip_index.insert(
@@ -1255,15 +1344,12 @@ impl CallManager {
let target_call = self.calls.get_mut(target_call_id).unwrap();
target_call.legs.insert(leg_id.to_string(), leg_info);
emit_event(
&self.out_tx,
"leg_transferred",
serde_json::json!({
"leg_id": leg_id,
"source_call_id": source_call_id,
"target_call_id": target_call_id,
}),
);
// Emit leg_added for target call.
if let Some(target) = self.calls.get(target_call_id) {
if let Some(leg) = target.legs.get(leg_id) {
emit_leg_added_event(&self.out_tx, target_call_id, leg);
}
}
// Check if source call has too few legs remaining.
let source_call = self.calls.get(source_call_id).unwrap();
@@ -1366,6 +1452,11 @@ impl CallManager {
}
}
leg.state = LegState::Terminated;
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg.id, "state": "terminated" }),
);
}
emit_event(
@@ -1484,6 +1575,13 @@ impl CallManager {
);
self.calls.insert(call_id.to_string(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
// Build recording path.
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)

View File

@@ -35,7 +35,8 @@ pub fn create_leg_channels() -> LegChannels {
}
/// Spawn the inbound I/O task for a SIP leg.
/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer.
/// Reads RTP from the socket, parses the variable-length header (RFC 3550),
/// and sends the payload to the mixer.
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
pub fn spawn_sip_inbound(
rtp_socket: Arc<UdpSocket>,
@@ -51,12 +52,29 @@ pub fn spawn_sip_inbound(
}
let pt = buf[1] & 0x7F;
let marker = (buf[1] & 0x80) != 0;
let seq = u16::from_be_bytes([buf[2], buf[3]]);
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
let payload = buf[12..n].to_vec();
// RFC 3550: header length = 12 + (CC * 4) + optional extension.
let cc = (buf[0] & 0x0F) as usize;
let has_extension = (buf[0] & 0x10) != 0;
let mut offset = 12 + cc * 4;
if has_extension {
if offset + 4 > n {
continue; // Malformed: extension header truncated.
}
let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_len * 4;
}
if offset >= n {
continue; // No payload after header.
}
let payload = buf[offset..n].to_vec();
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, timestamp }).await.is_err() {
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() {
break; // Channel closed — leg removed.
}
}

View File

@@ -677,6 +677,10 @@ async fn handle_webrtc_link(
"leg_id": session_id,
"kind": "webrtc",
"state": "connected",
"codec": "Opus",
"rtpPort": 0,
"remoteMedia": null,
"metadata": {},
}));
respond_ok(out_tx, &cmd.id, serde_json::json!({
@@ -1125,8 +1129,11 @@ async fn handle_add_tool_leg(
"call_id": call_id,
"leg_id": tool_leg_id,
"kind": "tool",
"tool_type": tool_type_str,
"state": "connected",
"codec": null,
"rtpPort": 0,
"remoteMedia": null,
"metadata": { "tool_type": tool_type_str },
}),
);

View File

@@ -3,9 +3,12 @@
//! Each Call spawns one mixer task. Legs communicate with the mixer via
//! tokio mpsc channels — no shared mutable state, no lock contention.
//!
//! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame).
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
//! 2. Compute total mix (sum of all **participant** legs' PCM as i32)
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
//! 5. For each tool leg: send per-source unmerged audio batch
@@ -13,16 +16,18 @@
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, TranscodeState};
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
use nnnoiseless::DenoiseState;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
const MIX_RATE: u32 = 16000;
/// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample.
/// All processing (denoising, mixing) happens at this rate in f32 for maximum quality.
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
@@ -30,6 +35,8 @@ pub struct RtpPacket {
pub payload_type: u8,
/// RTP marker bit (first packet of a DTMF event, etc.).
pub marker: bool,
/// RTP sequence number for reordering.
pub seq: u16,
/// RTP timestamp from the original packet header.
pub timestamp: u32,
}
@@ -47,8 +54,8 @@ enum LegRole {
}
struct IsolationState {
/// PCM frames at MIX_RATE (320 samples each) queued for playback.
prompt_frames: VecDeque<Vec<i16>>,
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
prompt_frames: VecDeque<Vec<f32>>,
/// Digits that complete the interaction (e.g., ['1', '2']).
expected_digits: Vec<char>,
/// Ticks remaining before timeout (decremented each tick after prompt ends).
@@ -88,8 +95,8 @@ pub struct ToolAudioBatch {
/// One participant's 20ms audio frame.
pub struct ToolAudioSource {
pub leg_id: String,
/// PCM at 16kHz, MIX_FRAME_SIZE (320) samples.
pub pcm_16k: Vec<i16>,
/// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples.
pub pcm_48k: Vec<f32>,
}
/// Internal storage for a tool leg inside the mixer.
@@ -122,8 +129,8 @@ pub enum MixerCommand {
/// DTMF from the leg is checked against expected_digits.
StartInteraction {
leg_id: String,
/// PCM frames at MIX_RATE (16kHz), each 320 samples.
prompt_pcm_frames: Vec<Vec<i16>>,
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
prompt_pcm_frames: Vec<Vec<f32>>,
expected_digits: Vec<char>,
timeout_ms: u32,
result_tx: oneshot::Sender<InteractionResult>,
@@ -149,10 +156,12 @@ pub enum MixerCommand {
struct MixerLegSlot {
codec_pt: u8,
transcoder: TranscodeState,
/// Per-leg inbound denoiser (48kHz, 480-sample frames).
denoiser: Box<DenoiseState<'static>>,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
last_pcm_frame: Vec<i16>,
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
last_pcm_frame: Vec<f32>,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
// RTP output state.
@@ -220,9 +229,10 @@ async fn mixer_loop(
MixerLegSlot {
codec_pt,
transcoder,
denoiser: new_denoiser(),
inbound_rx,
outbound_tx,
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
@@ -311,16 +321,18 @@ async fn mixer_loop(
continue;
}
// ── 2. Drain inbound packets, decode to 16kHz PCM. ─────────
// ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ────
// DTMF (PT 101) packets are collected separately.
// Audio packets are sorted by sequence number and decoded
// in order to maintain codec state (critical for G.722 ADPCM).
let leg_ids: Vec<String> = legs.keys().cloned().collect();
let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new();
for lid in &leg_ids {
let slot = legs.get_mut(lid).unwrap();
// Drain channel — collect DTMF packets separately, keep latest audio.
let mut latest_audio: Option<RtpPacket> = None;
// Drain channel — collect DTMF separately, collect ALL audio packets.
let mut audio_packets: Vec<RtpPacket> = Vec::new();
loop {
match slot.inbound_rx.try_recv() {
Ok(pkt) => {
@@ -328,33 +340,47 @@ async fn mixer_loop(
// DTMF telephone-event: collect for processing.
dtmf_forward.push((lid.clone(), pkt));
} else {
latest_audio = Some(pkt);
audio_packets.push(pkt);
}
}
Err(_) => break,
}
}
if let Some(pkt) = latest_audio {
if !audio_packets.is_empty() {
slot.silent_ticks = 0;
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to mixing rate if needed.
let pcm_mix = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = pcm_mix;
frame.resize(MIX_FRAME_SIZE, 0);
slot.last_pcm_frame = frame;
}
Err(_) => {
// Decode failed — use silence.
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
// Sort by sequence number for correct codec state progression.
// This prevents G.722 ADPCM state corruption from out-of-order packets.
audio_packets.sort_by_key(|p| p.seq);
// Decode ALL packets in order (maintains codec state),
// but only keep the last decoded frame for mixing.
for pkt in &audio_packets {
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to 48kHz mixing rate if needed.
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
// Per-leg inbound denoising at 48kHz.
// Skip for Opus/WebRTC legs — browsers already apply
// their own noise suppression via getUserMedia.
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = processed;
frame.resize(MIX_FRAME_SIZE, 0.0);
slot.last_pcm_frame = frame;
}
Err(_) => {}
}
}
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
@@ -364,17 +390,18 @@ async fn mixer_loop(
slot.silent_ticks += 1;
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
}
}
}
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
// Accumulate as f64 to prevent precision loss when summing f32.
let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE];
for slot in legs.values() {
if matches!(slot.role, LegRole::Participant) {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as i32;
total_mix[i] += s as f64;
}
}
}
@@ -387,27 +414,27 @@ async fn mixer_loop(
for (lid, slot) in legs.iter_mut() {
match &mut slot.role {
LegRole::Participant => {
// Mix-minus: total minus this leg's own contribution.
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32)
.clamp(-32768, 32767) as i16;
mix_minus.push(sample);
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(sample.clamp(-1.0, 1.0));
}
// Resample from 16kHz to the leg's codec native rate.
// Resample from 48kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample(&mix_minus, MIX_RATE, target_rate)
.resample_f32(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Encode to the leg's codec.
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
let encoded =
match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
@@ -456,21 +483,21 @@ async fn mixer_loop(
frame
} else {
state.prompt_done = true;
vec![0i16; MIX_FRAME_SIZE]
vec![0.0f32; MIX_FRAME_SIZE]
};
// Encode prompt frame to the leg's codec (reuses existing encode path).
// Encode prompt frame to the leg's codec.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
pcm_frame
} else {
slot.transcoder
.resample(&pcm_frame, MIX_RATE, target_rate)
.resample_f32(&pcm_frame, MIX_RATE, target_rate)
.unwrap_or_default()
};
if let Ok(encoded) =
slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt)
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
{
if !encoded.is_empty() {
let header = build_rtp_header(
@@ -523,7 +550,7 @@ async fn mixer_loop(
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
.map(|(lid, s)| ToolAudioSource {
leg_id: lid.clone(),
pcm_16k: s.last_pcm_frame.clone(),
pcm_48k: s.last_pcm_frame.clone(),
})
.collect();
@@ -533,7 +560,7 @@ async fn mixer_loop(
.iter()
.map(|s| ToolAudioSource {
leg_id: s.leg_id.clone(),
pcm_16k: s.pcm_16k.clone(),
pcm_48k: s.pcm_48k.clone(),
})
.collect(),
};

View File

@@ -2,7 +2,7 @@
//!
//! Tool legs are observer legs that receive individual audio streams from each
//! participant in a call. The mixer pipes `ToolAudioBatch` every 20ms containing
//! each participant's decoded PCM@16kHz tagged with source leg ID.
//! each participant's decoded PCM@48kHz f32 tagged with source leg ID.
//!
//! Consumers:
//! - **Recording**: writes per-source WAV files for speaker-separated recording.
@@ -37,20 +37,25 @@ pub fn spawn_recording_tool(
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
// Skip silence-only frames (all zeros = no audio activity).
let has_audio = source.pcm_16k.iter().any(|&s| s != 0);
// Skip silence-only frames (near-zero = no audio activity).
let has_audio = source.pcm_48k.iter().any(|&s| s.abs() > 1e-6);
if !has_audio && !recorders.contains_key(&source.leg_id) {
continue; // Don't create a file for silence-only sources.
}
let recorder = recorders.entry(source.leg_id.clone()).or_insert_with(|| {
let path = format!("{}/{}-{}.wav", base_dir, call_id, source.leg_id);
Recorder::new_pcm(&path, 16000, None).unwrap_or_else(|e| {
Recorder::new_pcm(&path, 48000, None).unwrap_or_else(|e| {
panic!("failed to create recorder for {}: {e}", source.leg_id);
})
});
if !recorder.write_pcm(&source.pcm_16k) {
// Convert f32 [-1.0, 1.0] to i16 for WAV writing.
let pcm_i16: Vec<i16> = source.pcm_48k
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();
if !recorder.write_pcm(&pcm_i16) {
// Max duration reached — stop recording this source.
break;
}
@@ -88,7 +93,7 @@ pub fn spawn_recording_tool(
/// Spawn a transcription tool leg.
///
/// The plumbing is fully real: it receives per-source unmerged PCM@16kHz from
/// The plumbing is fully real: it receives per-source unmerged PCM@48kHz f32 from
/// the mixer every 20ms. The consumer is a stub that accumulates audio and
/// reports metadata on close. Future: will stream to a Whisper HTTP endpoint.
pub fn spawn_transcription_tool(
@@ -105,7 +110,7 @@ pub fn spawn_transcription_tool(
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
*source_samples.entry(source.leg_id.clone()).or_insert(0) +=
source.pcm_16k.len() as u64;
source.pcm_48k.len() as u64;
// TODO: Future — accumulate chunks and stream to Whisper endpoint.
// For now, the audio is received and counted but not processed.
@@ -118,7 +123,7 @@ pub fn spawn_transcription_tool(
.map(|(leg_id, samples)| {
serde_json::json!({
"source_leg_id": leg_id,
"duration_ms": (samples * 1000) / 16000,
"duration_ms": (samples * 1000) / 48000,
})
})
.collect();

View File

@@ -290,8 +290,9 @@ async fn browser_to_mixer_loop(
.send(RtpPacket {
payload: payload.to_vec(),
payload_type: PT_OPUS,
marker: false,
timestamp: 0,
marker: rtp_packet.header.marker,
seq: rtp_packet.header.sequence_number,
timestamp: rtp_packet.header.timestamp,
})
.await;
}

View File

@@ -197,10 +197,11 @@ pub fn compute_digest_auth(
use crate::Endpoint;
/// Parse the audio media port and connection address from an SDP body.
/// Parse the audio media port, connection address, and preferred codec from an SDP body.
pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
let mut addr: Option<&str> = None;
let mut port: Option<u16> = None;
let mut codec_pt: Option<u8> = None;
let normalized = sdp.replace("\r\n", "\n");
for raw in normalized.split('\n') {
@@ -208,10 +209,16 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
if let Some(rest) = line.strip_prefix("c=IN IP4 ") {
addr = Some(rest.trim());
} else if let Some(rest) = line.strip_prefix("m=audio ") {
// m=audio <port> RTP/AVP <pt1> [<pt2> ...]
let parts: Vec<&str> = rest.split_whitespace().collect();
if !parts.is_empty() {
port = parts[0].parse().ok();
}
// parts[1] is "RTP/AVP" or similar, parts[2..] are payload types.
// The first PT is the preferred codec.
if parts.len() > 2 {
codec_pt = parts[2].parse::<u8>().ok();
}
}
}
@@ -219,6 +226,7 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
(Some(a), Some(p)) => Some(Endpoint {
address: a.to_string(),
port: p,
codec_pt,
}),
_ => None,
}

View File

@@ -9,9 +9,11 @@ pub mod dialog;
pub mod helpers;
pub mod rewrite;
/// Network endpoint (address + port).
/// Network endpoint (address + port + optional negotiated codec).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Endpoint {
pub address: String,
pub port: u16,
/// First payload type from the SDP `m=audio` line (the preferred codec).
pub codec_pt: Option<u8>,
}

View File

@@ -92,7 +92,7 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option<Endpoint>
.collect();
let original = match (orig_addr, orig_port) {
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p }),
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }),
_ => None,
};

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.16.0',
version: '1.17.1',
description: 'undefined'
}

View File

@@ -425,9 +425,9 @@ async function startProxyEngine(): Promise<void> {
id: data.leg_id,
type: data.kind,
state: data.state,
codec: null,
rtpPort: null,
remoteMedia: null,
codec: data.codec ?? null,
rtpPort: data.rtpPort ?? null,
remoteMedia: data.remoteMedia ?? null,
metadata: data.metadata || {},
});
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.16.0',
version: '1.17.1',
description: 'undefined'
}