fix(proxy-engine,codec-lib,sip-proto,ts): preserve negotiated media details and improve RTP audio handling across call legs

This commit is contained in:
2026-04-10 16:57:07 +00:00
parent 2aca5f1510
commit f78639dd19
15 changed files with 260 additions and 81 deletions

View File

@@ -20,6 +20,35 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
/// Emit a `leg_added` event with full leg information.
/// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed.
fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
let metadata: serde_json::Value = if leg.metadata.is_empty() {
serde_json::json!({})
} else {
serde_json::Value::Object(
leg.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)
};
emit_event(
tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg.id,
"kind": leg.kind.as_str(),
"state": leg.state.as_str(),
"codec": sip_proto::helpers::codec_name(leg.codec_pt),
"rtpPort": leg.rtp_port,
"remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
"metadata": metadata,
}),
);
}
pub struct CallManager {
/// All active calls, keyed by internal call ID.
pub calls: HashMap<String, Call>,
@@ -265,6 +294,11 @@ impl CallManager {
dev_leg.state = LegState::Connected;
}
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": dev_leg_id, "state": "connected" }),
);
// Wire device leg to mixer.
if let Some(dev_remote_addr) = dev_remote {
@@ -324,6 +358,8 @@ impl CallManager {
leg.state = LegState::Terminated;
}
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
self.terminate_call(call_id).await;
@@ -529,21 +565,30 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Connected;
// Learn remote media from SDP.
// Learn remote media and negotiated codec from SDP answer.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
leg.remote_media = Some(addr);
}
// Use the codec from the SDP answer (what the remote actually selected).
if let Some(pt) = ep.codec_pt {
leg.codec_pt = pt;
}
}
}
needs_wiring = true;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
if call.state != CallState::Connected {
call.state = CallState::Connected;
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
@@ -689,15 +734,19 @@ impl CallManager {
call.callee_number = Some(called_number);
call.state = CallState::Ringing;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
let mut codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
// Provider leg — extract media from SDP.
// Provider leg — extract media and negotiated codec from SDP.
let mut provider_media: Option<SocketAddr> = None;
if invite.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&invite.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
provider_media = Some(addr);
}
// Use the codec from the provider's SDP offer (what they actually want to use).
if let Some(pt) = ep.codec_pt {
codec_pt = pt;
}
}
}
@@ -767,6 +816,16 @@ impl CallManager {
// Store the call.
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs.
if let Some(call) = self.calls.get(&call_id) {
if let Some(leg) = call.legs.get(&provider_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
if let Some(leg) = call.legs.get(&device_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -854,6 +913,14 @@ impl CallManager {
.insert(sip_call_id, (call_id.clone(), leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -1002,6 +1069,14 @@ impl CallManager {
.insert(provider_sip_call_id, (call_id.clone(), provider_leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs (device + provider).
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -1069,17 +1144,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-provider",
"state": "inviting",
"number": number,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1145,17 +1214,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-device",
"state": "inviting",
"device_id": device_id,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1242,6 +1305,13 @@ impl CallManager {
None => return false,
};
// Emit leg_removed for source call.
emit_event(
&self.out_tx,
"leg_removed",
serde_json::json!({ "call_id": source_call_id, "leg_id": leg_id }),
);
// Update SIP index to point to the target call.
if let Some(sip_cid) = &leg_info.sip_call_id {
self.sip_index.insert(
@@ -1274,15 +1344,12 @@ impl CallManager {
let target_call = self.calls.get_mut(target_call_id).unwrap();
target_call.legs.insert(leg_id.to_string(), leg_info);
emit_event(
&self.out_tx,
"leg_transferred",
serde_json::json!({
"leg_id": leg_id,
"source_call_id": source_call_id,
"target_call_id": target_call_id,
}),
);
// Emit leg_added for target call.
if let Some(target) = self.calls.get(target_call_id) {
if let Some(leg) = target.legs.get(leg_id) {
emit_leg_added_event(&self.out_tx, target_call_id, leg);
}
}
// Check if source call has too few legs remaining.
let source_call = self.calls.get(source_call_id).unwrap();
@@ -1385,6 +1452,11 @@ impl CallManager {
}
}
leg.state = LegState::Terminated;
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg.id, "state": "terminated" }),
);
}
emit_event(
@@ -1503,6 +1575,13 @@ impl CallManager {
);
self.calls.insert(call_id.to_string(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
// Build recording path.
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)