diff --git a/changelog.md b/changelog.md index 73d1bc1..9e30479 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-04-10 - 1.17.1 - fix(proxy-engine,codec-lib,sip-proto,ts) +preserve negotiated media details and improve RTP audio handling across call legs + +- Use native Opus float encode/decode to avoid unnecessary i16 quantization in the f32 audio path. +- Parse full RTP headers including extensions and sequence numbers, then sort inbound packets before decoding to keep codec state stable for out-of-order audio. +- Capture negotiated codec payload types from SDP offers and answers and include codec, RTP port, remote media, and metadata in leg_added events. +- Emit leg_state_changed and leg_removed events more consistently so the dashboard reflects leg lifecycle updates accurately. + ## 2026-04-10 - 1.17.0 - feat(proxy-engine) upgrade the internal audio bus to 48kHz f32 with per-leg denoising and improve SIP leg routing diff --git a/nogit/voicemail/default/msg-1775840000387.wav b/nogit/voicemail/default/msg-1775840000387.wav new file mode 100644 index 0000000..780f54d Binary files /dev/null and b/nogit/voicemail/default/msg-1775840000387.wav differ diff --git a/nogit/voicemail/default/msg-1775840014276.wav b/nogit/voicemail/default/msg-1775840014276.wav new file mode 100644 index 0000000..a593e5f Binary files /dev/null and b/nogit/voicemail/default/msg-1775840014276.wav differ diff --git a/rust/crates/codec-lib/src/lib.rs b/rust/crates/codec-lib/src/lib.rs index e23a179..f18ba82 100644 --- a/rust/crates/codec-lib/src/lib.rs +++ b/rust/crates/codec-lib/src/lib.rs @@ -301,19 +301,59 @@ impl TranscodeState { /// Decode an encoded audio payload to f32 PCM samples in [-1.0, 1.0]. /// Returns (samples, sample_rate). + /// + /// For Opus, uses native float decode (no i16 quantization). + /// For G.722/G.711, decodes to i16 then converts (codec is natively i16). pub fn decode_to_f32(&mut self, data: &[u8], pt: u8) -> Result<(Vec, u32), String> { - let (pcm_i16, rate) = self.decode_to_pcm(data, pt)?; - let pcm_f32 = pcm_i16.iter().map(|&s| s as f32 / 32768.0).collect(); - Ok((pcm_f32, rate)) + match pt { + PT_OPUS => { + let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz + let packet = + OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?; + let out = + MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?; + let n: usize = self + .opus_dec + .decode_float(Some(packet), out, false) + .map_err(|e| format!("opus decode_float: {e}"))? + .into(); + pcm.truncate(n); + Ok((pcm, 48000)) + } + _ => { + // G.722, PCMU, PCMA: natively i16 codecs — decode then convert. + let (pcm_i16, rate) = self.decode_to_pcm(data, pt)?; + let pcm_f32 = pcm_i16.iter().map(|&s| s as f32 / 32768.0).collect(); + Ok((pcm_f32, rate)) + } + } } /// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec. + /// + /// For Opus, uses native float encode (no i16 quantization). + /// For G.722/G.711, converts to i16 then encodes (codec is natively i16). pub fn encode_from_f32(&mut self, pcm: &[f32], pt: u8) -> Result, String> { - let pcm_i16: Vec = pcm - .iter() - .map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16) - .collect(); - self.encode_from_pcm(&pcm_i16, pt) + match pt { + PT_OPUS => { + let mut buf = vec![0u8; 4000]; + let n: usize = self + .opus_enc + .encode_float(pcm, &mut buf) + .map_err(|e| format!("opus encode_float: {e}"))? + .into(); + buf.truncate(n); + Ok(buf) + } + _ => { + // G.722, PCMU, PCMA: natively i16 codecs. + let pcm_i16: Vec = pcm + .iter() + .map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16) + .collect(); + self.encode_from_pcm(&pcm_i16, pt) + } + } } /// High-quality sample rate conversion for f32 PCM using rubato FFT resampler. diff --git a/rust/crates/proxy-engine/src/call_manager.rs b/rust/crates/proxy-engine/src/call_manager.rs index dda6b07..39827f4 100644 --- a/rust/crates/proxy-engine/src/call_manager.rs +++ b/rust/crates/proxy-engine/src/call_manager.rs @@ -20,6 +20,35 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::net::UdpSocket; +/// Emit a `leg_added` event with full leg information. +/// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed. +fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) { + let metadata: serde_json::Value = if leg.metadata.is_empty() { + serde_json::json!({}) + } else { + serde_json::Value::Object( + leg.metadata + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(), + ) + }; + emit_event( + tx, + "leg_added", + serde_json::json!({ + "call_id": call_id, + "leg_id": leg.id, + "kind": leg.kind.as_str(), + "state": leg.state.as_str(), + "codec": sip_proto::helpers::codec_name(leg.codec_pt), + "rtpPort": leg.rtp_port, + "remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())), + "metadata": metadata, + }), + ); +} + pub struct CallManager { /// All active calls, keyed by internal call ID. pub calls: HashMap, @@ -265,6 +294,11 @@ impl CallManager { dev_leg.state = LegState::Connected; } } + emit_event( + &self.out_tx, + "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": dev_leg_id, "state": "connected" }), + ); // Wire device leg to mixer. if let Some(dev_remote_addr) = dev_remote { @@ -324,6 +358,8 @@ impl CallManager { leg.state = LegState::Terminated; } } + emit_event(&self.out_tx, "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" })); emit_event(&self.out_tx, "call_ended", serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration })); self.terminate_call(call_id).await; @@ -529,21 +565,30 @@ impl CallManager { if let Some(leg) = call.legs.get_mut(this_leg_id) { leg.state = LegState::Ringing; } + emit_event(&self.out_tx, "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" })); } else if code >= 200 && code < 300 { let mut needs_wiring = false; if let Some(leg) = call.legs.get_mut(this_leg_id) { leg.state = LegState::Connected; - // Learn remote media from SDP. + // Learn remote media and negotiated codec from SDP answer. if msg.has_sdp_body() { if let Some(ep) = parse_sdp_endpoint(&msg.body) { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { leg.remote_media = Some(addr); } + // Use the codec from the SDP answer (what the remote actually selected). + if let Some(pt) = ep.codec_pt { + leg.codec_pt = pt; + } } } needs_wiring = true; } + emit_event(&self.out_tx, "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" })); + if call.state != CallState::Connected { call.state = CallState::Connected; emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id })); @@ -689,15 +734,19 @@ impl CallManager { call.callee_number = Some(called_number); call.state = CallState::Ringing; - let codec_pt = provider_config.codecs.first().copied().unwrap_or(9); + let mut codec_pt = provider_config.codecs.first().copied().unwrap_or(9); - // Provider leg — extract media from SDP. + // Provider leg — extract media and negotiated codec from SDP. let mut provider_media: Option = None; if invite.has_sdp_body() { if let Some(ep) = parse_sdp_endpoint(&invite.body) { if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() { provider_media = Some(addr); } + // Use the codec from the provider's SDP offer (what they actually want to use). + if let Some(pt) = ep.codec_pt { + codec_pt = pt; + } } } @@ -767,6 +816,16 @@ impl CallManager { // Store the call. self.calls.insert(call_id.clone(), call); + // Emit leg_added for both initial legs. + if let Some(call) = self.calls.get(&call_id) { + if let Some(leg) = call.legs.get(&provider_leg_id) { + emit_leg_added_event(&self.out_tx, &call_id, leg); + } + if let Some(leg) = call.legs.get(&device_leg_id) { + emit_leg_added_event(&self.out_tx, &call_id, leg); + } + } + Some(call_id) } @@ -854,6 +913,14 @@ impl CallManager { .insert(sip_call_id, (call_id.clone(), leg_id)); self.calls.insert(call_id.clone(), call); + + // Emit leg_added for the provider leg. + if let Some(call) = self.calls.get(&call_id) { + for leg in call.legs.values() { + emit_leg_added_event(&self.out_tx, &call_id, leg); + } + } + Some(call_id) } @@ -1002,6 +1069,14 @@ impl CallManager { .insert(provider_sip_call_id, (call_id.clone(), provider_leg_id)); self.calls.insert(call_id.clone(), call); + + // Emit leg_added for both initial legs (device + provider). + if let Some(call) = self.calls.get(&call_id) { + for leg in call.legs.values() { + emit_leg_added_event(&self.out_tx, &call_id, leg); + } + } + Some(call_id) } @@ -1069,17 +1144,11 @@ impl CallManager { let call = self.calls.get_mut(call_id).unwrap(); call.legs.insert(leg_id.clone(), leg_info); - emit_event( - &self.out_tx, - "leg_added", - serde_json::json!({ - "call_id": call_id, - "leg_id": leg_id, - "kind": "sip-provider", - "state": "inviting", - "number": number, - }), - ); + if let Some(call) = self.calls.get(call_id) { + if let Some(leg) = call.legs.get(&leg_id) { + emit_leg_added_event(&self.out_tx, call_id, leg); + } + } Some(leg_id) } @@ -1145,17 +1214,11 @@ impl CallManager { let call = self.calls.get_mut(call_id).unwrap(); call.legs.insert(leg_id.clone(), leg_info); - emit_event( - &self.out_tx, - "leg_added", - serde_json::json!({ - "call_id": call_id, - "leg_id": leg_id, - "kind": "sip-device", - "state": "inviting", - "device_id": device_id, - }), - ); + if let Some(call) = self.calls.get(call_id) { + if let Some(leg) = call.legs.get(&leg_id) { + emit_leg_added_event(&self.out_tx, call_id, leg); + } + } Some(leg_id) } @@ -1242,6 +1305,13 @@ impl CallManager { None => return false, }; + // Emit leg_removed for source call. + emit_event( + &self.out_tx, + "leg_removed", + serde_json::json!({ "call_id": source_call_id, "leg_id": leg_id }), + ); + // Update SIP index to point to the target call. if let Some(sip_cid) = &leg_info.sip_call_id { self.sip_index.insert( @@ -1274,15 +1344,12 @@ impl CallManager { let target_call = self.calls.get_mut(target_call_id).unwrap(); target_call.legs.insert(leg_id.to_string(), leg_info); - emit_event( - &self.out_tx, - "leg_transferred", - serde_json::json!({ - "leg_id": leg_id, - "source_call_id": source_call_id, - "target_call_id": target_call_id, - }), - ); + // Emit leg_added for target call. + if let Some(target) = self.calls.get(target_call_id) { + if let Some(leg) = target.legs.get(leg_id) { + emit_leg_added_event(&self.out_tx, target_call_id, leg); + } + } // Check if source call has too few legs remaining. let source_call = self.calls.get(source_call_id).unwrap(); @@ -1385,6 +1452,11 @@ impl CallManager { } } leg.state = LegState::Terminated; + emit_event( + &self.out_tx, + "leg_state_changed", + serde_json::json!({ "call_id": call_id, "leg_id": leg.id, "state": "terminated" }), + ); } emit_event( @@ -1503,6 +1575,13 @@ impl CallManager { ); self.calls.insert(call_id.to_string(), call); + // Emit leg_added for the provider leg. + if let Some(call) = self.calls.get(call_id) { + for leg in call.legs.values() { + emit_leg_added_event(&self.out_tx, call_id, leg); + } + } + // Build recording path. let timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/rust/crates/proxy-engine/src/leg_io.rs b/rust/crates/proxy-engine/src/leg_io.rs index ce2e6ff..dc516e4 100644 --- a/rust/crates/proxy-engine/src/leg_io.rs +++ b/rust/crates/proxy-engine/src/leg_io.rs @@ -35,7 +35,8 @@ pub fn create_leg_channels() -> LegChannels { } /// Spawn the inbound I/O task for a SIP leg. -/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer. +/// Reads RTP from the socket, parses the variable-length header (RFC 3550), +/// and sends the payload to the mixer. /// Returns the JoinHandle (exits when the inbound_tx channel is dropped). pub fn spawn_sip_inbound( rtp_socket: Arc, @@ -51,12 +52,29 @@ pub fn spawn_sip_inbound( } let pt = buf[1] & 0x7F; let marker = (buf[1] & 0x80) != 0; + let seq = u16::from_be_bytes([buf[2], buf[3]]); let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]); - let payload = buf[12..n].to_vec(); + + // RFC 3550: header length = 12 + (CC * 4) + optional extension. + let cc = (buf[0] & 0x0F) as usize; + let has_extension = (buf[0] & 0x10) != 0; + let mut offset = 12 + cc * 4; + if has_extension { + if offset + 4 > n { + continue; // Malformed: extension header truncated. + } + let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize; + offset += 4 + ext_len * 4; + } + if offset >= n { + continue; // No payload after header. + } + + let payload = buf[offset..n].to_vec(); if payload.is_empty() { continue; } - if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, timestamp }).await.is_err() { + if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() { break; // Channel closed — leg removed. } } diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index b08b2be..baec084 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -677,6 +677,10 @@ async fn handle_webrtc_link( "leg_id": session_id, "kind": "webrtc", "state": "connected", + "codec": "Opus", + "rtpPort": 0, + "remoteMedia": null, + "metadata": {}, })); respond_ok(out_tx, &cmd.id, serde_json::json!({ @@ -1125,8 +1129,11 @@ async fn handle_add_tool_leg( "call_id": call_id, "leg_id": tool_leg_id, "kind": "tool", - "tool_type": tool_type_str, "state": "connected", + "codec": null, + "rtpPort": 0, + "remoteMedia": null, + "metadata": { "tool_type": tool_type_str }, }), ); diff --git a/rust/crates/proxy-engine/src/mixer.rs b/rust/crates/proxy-engine/src/mixer.rs index 56a4fb4..ccbbc8c 100644 --- a/rust/crates/proxy-engine/src/mixer.rs +++ b/rust/crates/proxy-engine/src/mixer.rs @@ -35,6 +35,8 @@ pub struct RtpPacket { pub payload_type: u8, /// RTP marker bit (first packet of a DTMF event, etc.). pub marker: bool, + /// RTP sequence number for reordering. + pub seq: u16, /// RTP timestamp from the original packet header. pub timestamp: u32, } @@ -319,16 +321,18 @@ async fn mixer_loop( continue; } - // ── 2. Drain inbound packets, decode to 16kHz PCM. ───────── + // ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ──── // DTMF (PT 101) packets are collected separately. + // Audio packets are sorted by sequence number and decoded + // in order to maintain codec state (critical for G.722 ADPCM). let leg_ids: Vec = legs.keys().cloned().collect(); let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new(); for lid in &leg_ids { let slot = legs.get_mut(lid).unwrap(); - // Drain channel — collect DTMF packets separately, keep latest audio. - let mut latest_audio: Option = None; + // Drain channel — collect DTMF separately, collect ALL audio packets. + let mut audio_packets: Vec = Vec::new(); loop { match slot.inbound_rx.try_recv() { Ok(pkt) => { @@ -336,35 +340,47 @@ async fn mixer_loop( // DTMF telephone-event: collect for processing. dtmf_forward.push((lid.clone(), pkt)); } else { - latest_audio = Some(pkt); + audio_packets.push(pkt); } } Err(_) => break, } } - if let Some(pkt) = latest_audio { + if !audio_packets.is_empty() { slot.silent_ticks = 0; - match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) { - Ok((pcm, rate)) => { - // Resample to 48kHz mixing rate if needed. - let pcm_48k = if rate == MIX_RATE { - pcm - } else { - slot.transcoder - .resample_f32(&pcm, rate, MIX_RATE) - .unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE]) - }; - // Per-leg inbound denoising at 48kHz. - let denoised = TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k); - // Pad or truncate to exactly MIX_FRAME_SIZE. - let mut frame = denoised; - frame.resize(MIX_FRAME_SIZE, 0.0); - slot.last_pcm_frame = frame; - } - Err(_) => { - // Decode failed — use silence. - slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE]; + + // Sort by sequence number for correct codec state progression. + // This prevents G.722 ADPCM state corruption from out-of-order packets. + audio_packets.sort_by_key(|p| p.seq); + + // Decode ALL packets in order (maintains codec state), + // but only keep the last decoded frame for mixing. + for pkt in &audio_packets { + match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) { + Ok((pcm, rate)) => { + // Resample to 48kHz mixing rate if needed. + let pcm_48k = if rate == MIX_RATE { + pcm + } else { + slot.transcoder + .resample_f32(&pcm, rate, MIX_RATE) + .unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE]) + }; + // Per-leg inbound denoising at 48kHz. + // Skip for Opus/WebRTC legs — browsers already apply + // their own noise suppression via getUserMedia. + let processed = if slot.codec_pt != codec_lib::PT_OPUS { + TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k) + } else { + pcm_48k + }; + // Pad or truncate to exactly MIX_FRAME_SIZE. + let mut frame = processed; + frame.resize(MIX_FRAME_SIZE, 0.0); + slot.last_pcm_frame = frame; + } + Err(_) => {} } } } else if dtmf_forward.iter().any(|(src, _)| src == lid) { diff --git a/rust/crates/proxy-engine/src/webrtc_engine.rs b/rust/crates/proxy-engine/src/webrtc_engine.rs index a31b130..3c76937 100644 --- a/rust/crates/proxy-engine/src/webrtc_engine.rs +++ b/rust/crates/proxy-engine/src/webrtc_engine.rs @@ -290,8 +290,9 @@ async fn browser_to_mixer_loop( .send(RtpPacket { payload: payload.to_vec(), payload_type: PT_OPUS, - marker: false, - timestamp: 0, + marker: rtp_packet.header.marker, + seq: rtp_packet.header.sequence_number, + timestamp: rtp_packet.header.timestamp, }) .await; } diff --git a/rust/crates/sip-proto/src/helpers.rs b/rust/crates/sip-proto/src/helpers.rs index 225ae41..3bcc455 100644 --- a/rust/crates/sip-proto/src/helpers.rs +++ b/rust/crates/sip-proto/src/helpers.rs @@ -197,10 +197,11 @@ pub fn compute_digest_auth( use crate::Endpoint; -/// Parse the audio media port and connection address from an SDP body. +/// Parse the audio media port, connection address, and preferred codec from an SDP body. pub fn parse_sdp_endpoint(sdp: &str) -> Option { let mut addr: Option<&str> = None; let mut port: Option = None; + let mut codec_pt: Option = None; let normalized = sdp.replace("\r\n", "\n"); for raw in normalized.split('\n') { @@ -208,10 +209,16 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option { if let Some(rest) = line.strip_prefix("c=IN IP4 ") { addr = Some(rest.trim()); } else if let Some(rest) = line.strip_prefix("m=audio ") { + // m=audio RTP/AVP [ ...] let parts: Vec<&str> = rest.split_whitespace().collect(); if !parts.is_empty() { port = parts[0].parse().ok(); } + // parts[1] is "RTP/AVP" or similar, parts[2..] are payload types. + // The first PT is the preferred codec. + if parts.len() > 2 { + codec_pt = parts[2].parse::().ok(); + } } } @@ -219,6 +226,7 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option { (Some(a), Some(p)) => Some(Endpoint { address: a.to_string(), port: p, + codec_pt, }), _ => None, } diff --git a/rust/crates/sip-proto/src/lib.rs b/rust/crates/sip-proto/src/lib.rs index 0b71bc3..319c7d9 100644 --- a/rust/crates/sip-proto/src/lib.rs +++ b/rust/crates/sip-proto/src/lib.rs @@ -9,9 +9,11 @@ pub mod dialog; pub mod helpers; pub mod rewrite; -/// Network endpoint (address + port). +/// Network endpoint (address + port + optional negotiated codec). #[derive(Debug, Clone, PartialEq, Eq)] pub struct Endpoint { pub address: String, pub port: u16, + /// First payload type from the SDP `m=audio` line (the preferred codec). + pub codec_pt: Option, } diff --git a/rust/crates/sip-proto/src/rewrite.rs b/rust/crates/sip-proto/src/rewrite.rs index f890dcc..1f60478 100644 --- a/rust/crates/sip-proto/src/rewrite.rs +++ b/rust/crates/sip-proto/src/rewrite.rs @@ -92,7 +92,7 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option .collect(); let original = match (orig_addr, orig_port) { - (Some(a), Some(p)) => Some(Endpoint { address: a, port: p }), + (Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }), _ => None, }; diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 9124309..68e53fc 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.17.0', + version: '1.17.1', description: 'undefined' } diff --git a/ts/sipproxy.ts b/ts/sipproxy.ts index 02612ec..01f07df 100644 --- a/ts/sipproxy.ts +++ b/ts/sipproxy.ts @@ -425,9 +425,9 @@ async function startProxyEngine(): Promise { id: data.leg_id, type: data.kind, state: data.state, - codec: null, - rtpPort: null, - remoteMedia: null, + codec: data.codec ?? null, + rtpPort: data.rtpPort ?? null, + remoteMedia: data.remoteMedia ?? null, metadata: data.metadata || {}, }); } diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 9124309..68e53fc 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.17.0', + version: '1.17.1', description: 'undefined' }