From 45f9b9c15c6d73313c9418cbf7c4337775c166f6 Mon Sep 17 00:00:00 2001 From: Juergen Kunz Date: Fri, 10 Apr 2026 15:12:30 +0000 Subject: [PATCH] feat(proxy-engine): add device leg, leg transfer, and leg replacement call controls --- changelog.md | 8 + rust/crates/proxy-engine/src/call_manager.rs | 205 +++++++++++++++++++ rust/crates/proxy-engine/src/main.rs | 118 +++++++++++ ts/00_commitinfo_data.ts | 2 +- ts/frontend.ts | 11 +- ts/proxybridge.ts | 77 ++++++- ts_web/00_commitinfo_data.ts | 2 +- 7 files changed, 417 insertions(+), 6 deletions(-) diff --git a/changelog.md b/changelog.md index 6c0cc2c..f40b977 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # Changelog +## 2026-04-10 - 1.15.0 - feat(proxy-engine) +add device leg, leg transfer, and leg replacement call controls + +- adds proxy-engine commands and call manager support for inviting a registered SIP device into an active call +- supports transferring an existing leg between calls while preserving the active connection and updating mixer routing +- supports replacing a call leg by removing the current leg and dialing a new outbound destination +- wires the frontend add-leg API and TypeScript bridge to the new device leg and leg control commands + ## 2026-04-10 - 1.14.0 - feat(proxy-engine) add multiparty call mixing with dynamic SIP and WebRTC leg management diff --git a/rust/crates/proxy-engine/src/call_manager.rs b/rust/crates/proxy-engine/src/call_manager.rs index 00a0c3c..5f17a4c 100644 --- a/rust/crates/proxy-engine/src/call_manager.rs +++ b/rust/crates/proxy-engine/src/call_manager.rs @@ -1065,6 +1065,82 @@ impl CallManager { Some(leg_id) } + /// Add a local SIP device to an existing call (mid-call INVITE to desk phone). + pub async fn add_device_leg( + &mut self, + call_id: &str, + device_id: &str, + registrar: &Registrar, + config: &AppConfig, + rtp_pool: &mut RtpPortPool, + socket: &UdpSocket, + ) -> Option { + let device_addr = registrar.get_device_contact(device_id)?; + let call = self.calls.get(call_id)?; + let lan_ip = &config.proxy.lan_ip; + let lan_port = config.proxy.lan_port; + + let rtp_alloc = rtp_pool.allocate().await?; + let sip_call_id = generate_call_id(None); + let leg_id = self.next_leg_id(); + + // Use G.722 by default for local devices (most SIP phones support it). + let codec_pt: u8 = 9; + + // Build a B2BUA SipLeg targeting the device. + let leg_config = SipLegConfig { + lan_ip: lan_ip.clone(), + lan_port, + public_ip: None, // local device — no public IP needed + sip_target: device_addr, + username: None, + password: None, + registered_aor: None, + codecs: vec![codec_pt, 0], // G.722, PCMU fallback + rtp_port: rtp_alloc.port, + }; + + let mut sip_leg = SipLeg::new(leg_id.clone(), leg_config); + let to_uri = format!("sip:{}@{}:{}", device_id, device_addr.ip(), device_addr.port()); + let from_uri = format!("sip:sipproxy@{lan_ip}:{lan_port}"); + sip_leg.send_invite(&from_uri, &to_uri, &sip_call_id, socket).await; + + let leg_info = LegInfo { + id: leg_id.clone(), + kind: LegKind::SipDevice, + state: LegState::Inviting, + codec_pt, + sip_leg: Some(sip_leg), + sip_call_id: Some(sip_call_id.clone()), + webrtc_session_id: None, + rtp_socket: Some(rtp_alloc.socket.clone()), + rtp_port: rtp_alloc.port, + remote_media: None, + signaling_addr: Some(device_addr), + metadata: HashMap::new(), + }; + + self.sip_index + .insert(sip_call_id, (call_id.to_string(), leg_id.clone())); + + let call = self.calls.get_mut(call_id).unwrap(); + call.legs.insert(leg_id.clone(), leg_info); + + emit_event( + &self.out_tx, + "leg_added", + serde_json::json!({ + "call_id": call_id, + "leg_id": leg_id, + "kind": "sip-device", + "state": "inviting", + "device_id": device_id, + }), + ); + + Some(leg_id) + } + /// Remove a leg from a call. pub async fn remove_leg( &mut self, @@ -1120,6 +1196,135 @@ impl CallManager { true } + /// Transfer a leg from one call to another. + /// The leg stays connected (same RTP socket, same SIP dialog) but moves + /// between mixers so it hears the new call's participants. + pub async fn transfer_leg( + &mut self, + source_call_id: &str, + leg_id: &str, + target_call_id: &str, + ) -> bool { + // Validate both calls exist and the leg is in the source call. + if !self.calls.contains_key(source_call_id) + || !self.calls.contains_key(target_call_id) + { + return false; + } + + // Remove from source mixer (drops old channels → old I/O tasks exit). + let source_call = self.calls.get(source_call_id).unwrap(); + source_call.remove_leg_from_mixer(leg_id).await; + + // Take the LegInfo out of the source call. + let source_call = self.calls.get_mut(source_call_id).unwrap(); + let leg_info = match source_call.legs.remove(leg_id) { + Some(l) => l, + None => return false, + }; + + // Update SIP index to point to the target call. + if let Some(sip_cid) = &leg_info.sip_call_id { + self.sip_index.insert( + sip_cid.clone(), + (target_call_id.to_string(), leg_id.to_string()), + ); + } + + // Create new channels and I/O tasks for the target mixer. + let channels = create_leg_channels(); + if let Some(rtp_socket) = &leg_info.rtp_socket { + spawn_sip_inbound(rtp_socket.clone(), channels.inbound_tx); + if let Some(remote_media) = leg_info.remote_media { + spawn_sip_outbound(rtp_socket.clone(), remote_media, channels.outbound_rx); + } + } + + // Add to target mixer. + let target_call = self.calls.get(target_call_id).unwrap(); + target_call + .add_leg_to_mixer( + leg_id, + leg_info.codec_pt, + channels.inbound_rx, + channels.outbound_tx, + ) + .await; + + // Insert leg into target call. + let target_call = self.calls.get_mut(target_call_id).unwrap(); + target_call.legs.insert(leg_id.to_string(), leg_info); + + emit_event( + &self.out_tx, + "leg_transferred", + serde_json::json!({ + "leg_id": leg_id, + "source_call_id": source_call_id, + "target_call_id": target_call_id, + }), + ); + + // Check if source call has too few legs remaining. + let source_call = self.calls.get(source_call_id).unwrap(); + let active_legs = source_call + .legs + .values() + .filter(|l| l.state != LegState::Terminated) + .count(); + if active_legs <= 1 { + let duration = source_call.duration_secs(); + emit_event( + &self.out_tx, + "call_ended", + serde_json::json!({ + "call_id": source_call_id, + "reason": "leg_transferred", + "duration": duration, + }), + ); + self.terminate_call(source_call_id).await; + } + + true + } + + /// Replace a leg: terminate the old leg, then dial a new one into the same call. + /// Returns the new leg ID on success. + pub async fn replace_leg( + &mut self, + call_id: &str, + old_leg_id: &str, + number: &str, + provider_config: &ProviderConfig, + config: &AppConfig, + rtp_pool: &mut RtpPortPool, + socket: &UdpSocket, + public_ip: Option<&str>, + registered_aor: &str, + ) -> Option { + // Terminate the old leg. + self.remove_leg(call_id, old_leg_id, socket).await; + + // If the call was terminated because it had too few legs, bail. + if !self.calls.contains_key(call_id) { + return None; + } + + // Dial the replacement. + self.add_external_leg( + call_id, + number, + provider_config, + config, + rtp_pool, + socket, + public_ip, + registered_aor, + ) + .await + } + // ----------------------------------------------------------------------- // Hangup + cleanup // ----------------------------------------------------------------------- diff --git a/rust/crates/proxy-engine/src/main.rs b/rust/crates/proxy-engine/src/main.rs index bf01113..5150aad 100644 --- a/rust/crates/proxy-engine/src/main.rs +++ b/rust/crates/proxy-engine/src/main.rs @@ -142,6 +142,9 @@ async fn handle_command( "webrtc_close" => handle_webrtc_close(webrtc, out_tx, &cmd).await, // webrtc_link needs both: engine (for mixer channels) and webrtc (for session). "webrtc_link" => handle_webrtc_link(engine, webrtc, out_tx, &cmd).await, + "add_device_leg" => handle_add_device_leg(engine, out_tx, &cmd).await, + "transfer_leg" => handle_transfer_leg(engine, out_tx, &cmd).await, + "replace_leg" => handle_replace_leg(engine, out_tx, &cmd).await, // Leg interaction and tool leg commands. "start_interaction" => handle_start_interaction(engine, out_tx, &cmd).await, "add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await, @@ -746,6 +749,121 @@ async fn handle_add_leg(engine: Arc>, out_tx: &OutTx, cmd: &C } } +/// Handle `add_device_leg` — add a local SIP device to an existing call. +async fn handle_add_device_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let device_id = match cmd.params.get("device_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing device_id"); return; } + }; + + let mut eng = engine.lock().await; + let config_ref = match &eng.config { + Some(c) => c.clone(), + None => { respond_err(out_tx, &cmd.id, "not configured"); return; } + }; + let socket = match &eng.transport { + Some(t) => t.socket(), + None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + }; + + let ProxyEngine { ref registrar, ref mut call_mgr, ref mut rtp_pool, .. } = *eng; + let rtp_pool = rtp_pool.as_mut().unwrap(); + + let leg_id = call_mgr.add_device_leg( + &call_id, &device_id, registrar, &config_ref, rtp_pool, &socket, + ).await; + + match leg_id { + Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "leg_id": lid })), + None => respond_err(out_tx, &cmd.id, "failed to add device leg — device not registered or call not found"), + } +} + +/// Handle `transfer_leg` — move a leg from one call to another. +async fn handle_transfer_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { + let source_call_id = match cmd.params.get("source_call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing source_call_id"); return; } + }; + let leg_id = match cmd.params.get("leg_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing leg_id"); return; } + }; + let target_call_id = match cmd.params.get("target_call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing target_call_id"); return; } + }; + + let mut eng = engine.lock().await; + if eng.call_mgr.transfer_leg(&source_call_id, &leg_id, &target_call_id).await { + respond_ok(out_tx, &cmd.id, serde_json::json!({})); + } else { + respond_err(out_tx, &cmd.id, "transfer failed — call or leg not found"); + } +} + +/// Handle `replace_leg` — terminate a leg and dial a replacement into the same call. +async fn handle_replace_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { + let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing call_id"); return; } + }; + let old_leg_id = match cmd.params.get("old_leg_id").and_then(|v| v.as_str()) { + Some(s) => s.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing old_leg_id"); return; } + }; + let number = match cmd.params.get("number").and_then(|v| v.as_str()) { + Some(n) => n.to_string(), + None => { respond_err(out_tx, &cmd.id, "missing number"); return; } + }; + let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str()); + + let mut eng = engine.lock().await; + let config_ref = match &eng.config { + Some(c) => c.clone(), + None => { respond_err(out_tx, &cmd.id, "not configured"); return; } + }; + let socket = match &eng.transport { + Some(t) => t.socket(), + None => { respond_err(out_tx, &cmd.id, "not initialized"); return; } + }; + + // Resolve provider. + let provider_config = if let Some(pid) = provider_id { + config_ref.providers.iter().find(|p| p.id == pid).cloned() + } else { + config_ref.resolve_outbound_route(&number, None, &|_| true).map(|r| r.provider) + }; + let provider_config = match provider_config { + Some(p) => p, + None => { respond_err(out_tx, &cmd.id, "no provider available"); return; } + }; + + let (public_ip, registered_aor) = if let Some(ps_arc) = eng.provider_mgr.find_by_provider_id(&provider_config.id).await { + let ps = ps_arc.lock().await; + (ps.public_ip.clone(), ps.registered_aor.clone()) + } else { + (None, format!("sip:{}@{}", provider_config.username, provider_config.domain)) + }; + + let ProxyEngine { ref mut call_mgr, ref mut rtp_pool, .. } = *eng; + let rtp_pool = rtp_pool.as_mut().unwrap(); + + let new_leg_id = call_mgr.replace_leg( + &call_id, &old_leg_id, &number, &provider_config, &config_ref, + rtp_pool, &socket, public_ip.as_deref(), ®istered_aor, + ).await; + + match new_leg_id { + Some(lid) => respond_ok(out_tx, &cmd.id, serde_json::json!({ "new_leg_id": lid })), + None => respond_err(out_tx, &cmd.id, "replace failed — call ended or dial failed"), + } +} + /// Handle `remove_leg` — remove a leg from a call. async fn handle_remove_leg(engine: Arc>, out_tx: &OutTx, cmd: &Command) { let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) { diff --git a/ts/00_commitinfo_data.ts b/ts/00_commitinfo_data.ts index 07d68dd..207d2eb 100644 --- a/ts/00_commitinfo_data.ts +++ b/ts/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.14.0', + version: '1.15.0', description: 'undefined' } diff --git a/ts/frontend.ts b/ts/frontend.ts index 2a1b3bb..a7c799d 100644 --- a/ts/frontend.ts +++ b/ts/frontend.ts @@ -128,14 +128,19 @@ async function handleRequest( } } - // API: add leg to call (device — not yet implemented, needs device-to-call routing). + // API: add a SIP device to a call (mid-call INVITE to desk phone). if (url.pathname.startsWith('/api/call/') && url.pathname.endsWith('/addleg') && method === 'POST') { try { const callId = url.pathname.split('/')[3]; const body = await readJsonBody(req); if (!body?.deviceId) return sendJson(res, { ok: false, error: 'missing deviceId' }, 400); - // TODO: implement device leg addition (needs SIP INVITE to device). - return sendJson(res, { ok: false, error: 'not yet implemented' }, 501); + const { addDeviceLeg } = await import('./proxybridge.ts'); + const legId = await addDeviceLeg(callId, body.deviceId); + if (legId) { + return sendJson(res, { ok: true, legId }); + } else { + return sendJson(res, { ok: false, error: 'device not registered or call not found' }, 404); + } } catch (e: any) { return sendJson(res, { ok: false, error: e.message }, 400); } diff --git a/ts/proxybridge.ts b/ts/proxybridge.ts index 0ec4892..eb4fcfc 100644 --- a/ts/proxybridge.ts +++ b/ts/proxybridge.ts @@ -41,6 +41,18 @@ type TProxyCommands = { params: { call_id: string }; result: { file_path: string; duration_ms: number }; }; + add_device_leg: { + params: { call_id: string; device_id: string }; + result: { leg_id: string }; + }; + transfer_leg: { + params: { source_call_id: string; leg_id: string; target_call_id: string }; + result: Record; + }; + replace_leg: { + params: { call_id: string; old_leg_id: string; number: string; provider_id?: string }; + result: { new_leg_id: string }; + }; start_interaction: { params: { call_id: string; @@ -307,9 +319,72 @@ export async function webrtcClose(sessionId: string): Promise { } // --------------------------------------------------------------------------- -// Leg interaction & tool leg commands +// Device leg & interaction commands // --------------------------------------------------------------------------- +/** + * Add a local SIP device to an existing call (mid-call INVITE to desk phone). + */ +export async function addDeviceLeg(callId: string, deviceId: string): Promise { + if (!bridge || !initialized) return null; + try { + const result = await bridge.sendCommand('add_device_leg', { + call_id: callId, + device_id: deviceId, + } as any); + return (result as any)?.leg_id || null; + } catch (e: any) { + logFn?.(`[proxy-engine] add_device_leg error: ${e?.message || e}`); + return null; + } +} + +/** + * Transfer a leg from one call to another (leg stays connected, switches mixer). + */ +export async function transferLeg( + sourceCallId: string, + legId: string, + targetCallId: string, +): Promise { + if (!bridge || !initialized) return false; + try { + await bridge.sendCommand('transfer_leg', { + source_call_id: sourceCallId, + leg_id: legId, + target_call_id: targetCallId, + } as any); + return true; + } catch (e: any) { + logFn?.(`[proxy-engine] transfer_leg error: ${e?.message || e}`); + return false; + } +} + +/** + * Replace a leg: terminate the old leg and dial a new number into the same call. + */ +export async function replaceLeg( + callId: string, + oldLegId: string, + number: string, + providerId?: string, +): Promise { + if (!bridge || !initialized) return null; + try { + const result = await bridge.sendCommand('replace_leg', { + call_id: callId, + old_leg_id: oldLegId, + number, + provider_id: providerId, + } as any); + return (result as any)?.new_leg_id || null; + } catch (e: any) { + logFn?.(`[proxy-engine] replace_leg error: ${e?.message || e}`); + return null; + } +} + /** * Start an interaction on a specific leg — isolate it, play a prompt, collect DTMF. * Blocks until the interaction completes (digit pressed, timeout, or cancelled). diff --git a/ts_web/00_commitinfo_data.ts b/ts_web/00_commitinfo_data.ts index 07d68dd..207d2eb 100644 --- a/ts_web/00_commitinfo_data.ts +++ b/ts_web/00_commitinfo_data.ts @@ -3,6 +3,6 @@ */ export const commitinfo = { name: 'siprouter', - version: '1.14.0', + version: '1.15.0', description: 'undefined' }