feat(proxy-engine): add device leg, leg transfer, and leg replacement call controls

This commit is contained in:
2026-04-10 15:12:30 +00:00
parent 7d59361352
commit 45f9b9c15c
7 changed files with 417 additions and 6 deletions

View File

@@ -1065,6 +1065,82 @@ impl CallManager {
Some(leg_id)
}
/// Add a local SIP device to an existing call (mid-call INVITE to desk phone).
pub async fn add_device_leg(
&mut self,
call_id: &str,
device_id: &str,
registrar: &Registrar,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
) -> Option<String> {
let device_addr = registrar.get_device_contact(device_id)?;
let call = self.calls.get(call_id)?;
let lan_ip = &config.proxy.lan_ip;
let lan_port = config.proxy.lan_port;
let rtp_alloc = rtp_pool.allocate().await?;
let sip_call_id = generate_call_id(None);
let leg_id = self.next_leg_id();
// Use G.722 by default for local devices (most SIP phones support it).
let codec_pt: u8 = 9;
// Build a B2BUA SipLeg targeting the device.
let leg_config = SipLegConfig {
lan_ip: lan_ip.clone(),
lan_port,
public_ip: None, // local device — no public IP needed
sip_target: device_addr,
username: None,
password: None,
registered_aor: None,
codecs: vec![codec_pt, 0], // G.722, PCMU fallback
rtp_port: rtp_alloc.port,
};
let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config);
let to_uri = format!("sip:{}@{}:{}", device_id, device_addr.ip(), device_addr.port());
let from_uri = format!("sip:sipproxy@{lan_ip}:{lan_port}");
sip_leg.send_invite(&from_uri, &to_uri, &sip_call_id, socket).await;
let leg_info = LegInfo {
id: leg_id.clone(),
kind: LegKind::SipDevice,
state: LegState::Inviting,
codec_pt,
sip_leg: Some(sip_leg),
sip_call_id: Some(sip_call_id.clone()),
webrtc_session_id: None,
rtp_socket: Some(rtp_alloc.socket.clone()),
rtp_port: rtp_alloc.port,
remote_media: None,
signaling_addr: Some(device_addr),
metadata: HashMap::new(),
};
self.sip_index
.insert(sip_call_id, (call_id.to_string(), leg_id.clone()));
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-device",
"state": "inviting",
"device_id": device_id,
}),
);
Some(leg_id)
}
/// Remove a leg from a call.
pub async fn remove_leg(
&mut self,
@@ -1120,6 +1196,135 @@ impl CallManager {
true
}
/// Transfer a leg from one call to another.
/// The leg stays connected (same RTP socket, same SIP dialog) but moves
/// between mixers so it hears the new call's participants.
pub async fn transfer_leg(
&mut self,
source_call_id: &str,
leg_id: &str,
target_call_id: &str,
) -> bool {
// Validate both calls exist and the leg is in the source call.
if !self.calls.contains_key(source_call_id)
|| !self.calls.contains_key(target_call_id)
{
return false;
}
// Remove from source mixer (drops old channels → old I/O tasks exit).
let source_call = self.calls.get(source_call_id).unwrap();
source_call.remove_leg_from_mixer(leg_id).await;
// Take the LegInfo out of the source call.
let source_call = self.calls.get_mut(source_call_id).unwrap();
let leg_info = match source_call.legs.remove(leg_id) {
Some(l) => l,
None => return false,
};
// Update SIP index to point to the target call.
if let Some(sip_cid) = &leg_info.sip_call_id {
self.sip_index.insert(
sip_cid.clone(),
(target_call_id.to_string(), leg_id.to_string()),
);
}
// Create new channels and I/O tasks for the target mixer.
let channels = create_leg_channels();
if let Some(rtp_socket) = &leg_info.rtp_socket {
spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx);
if let Some(remote_media) = leg_info.remote_media {
spawn_sip_outbound(rtp_socket.clone(), remote_media, channels.outbound_rx);
}
}
// Add to target mixer.
let target_call = self.calls.get(target_call_id).unwrap();
target_call
.add_leg_to_mixer(
leg_id,
leg_info.codec_pt,
channels.inbound_rx,
channels.outbound_tx,
)
.await;
// Insert leg into target call.
let target_call = self.calls.get_mut(target_call_id).unwrap();
target_call.legs.insert(leg_id.to_string(), leg_info);
emit_event(
&self.out_tx,
"leg_transferred",
serde_json::json!({
"leg_id": leg_id,
"source_call_id": source_call_id,
"target_call_id": target_call_id,
}),
);
// Check if source call has too few legs remaining.
let source_call = self.calls.get(source_call_id).unwrap();
let active_legs = source_call
.legs
.values()
.filter(|l| l.state != LegState::Terminated)
.count();
if active_legs <= 1 {
let duration = source_call.duration_secs();
emit_event(
&self.out_tx,
"call_ended",
serde_json::json!({
"call_id": source_call_id,
"reason": "leg_transferred",
"duration": duration,
}),
);
self.terminate_call(source_call_id).await;
}
true
}
/// Replace a leg: terminate the old leg, then dial a new one into the same call.
/// Returns the new leg ID on success.
pub async fn replace_leg(
&mut self,
call_id: &str,
old_leg_id: &str,
number: &str,
provider_config: &ProviderConfig,
config: &AppConfig,
rtp_pool: &mut RtpPortPool,
socket: &UdpSocket,
public_ip: Option<&str>,
registered_aor: &str,
) -> Option<String> {
// Terminate the old leg.
self.remove_leg(call_id, old_leg_id, socket).await;
// If the call was terminated because it had too few legs, bail.
if !self.calls.contains_key(call_id) {
return None;
}
// Dial the replacement.
self.add_external_leg(
call_id,
number,
provider_config,
config,
rtp_pool,
socket,
public_ip,
registered_aor,
)
.await
}
// -----------------------------------------------------------------------
// Hangup + cleanup
// -----------------------------------------------------------------------