feat(fax): add fax routing, job tracking, inbox management, and T.38/UDPTL media support
This commit is contained in:
@@ -11,7 +11,7 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub type LegId = String;
|
||||
@@ -114,6 +114,13 @@ pub struct LegInfo {
|
||||
pub kind: LegKind,
|
||||
pub state: LegState,
|
||||
pub codec_pt: u8,
|
||||
/// Media transport currently negotiated for this leg.
|
||||
///
|
||||
/// `rtp` covers classic SIP audio media, `t38-udptl` covers T.38 fax,
|
||||
/// `webrtc` is used for browser legs, and `internal` for proxy-local media/tool paths.
|
||||
pub media_protocol: &'static str,
|
||||
/// Whether this leg is currently wired into an active media bridge.
|
||||
pub media_io_active: bool,
|
||||
|
||||
/// For SIP legs: the SIP dialog manager (handles 407 auth, BYE, etc).
|
||||
pub sip_leg: Option<SipLeg>,
|
||||
@@ -146,6 +153,15 @@ pub struct LegInfo {
|
||||
pub metadata: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PendingDialogBridge {
|
||||
pub source_leg_id: LegId,
|
||||
pub target_leg_id: LegId,
|
||||
pub source_request: SipMessage,
|
||||
pub target_request: SipMessage,
|
||||
pub method: String,
|
||||
}
|
||||
|
||||
/// A multiparty call with N legs and a central mixer.
|
||||
pub struct Call {
|
||||
// Duplicated from the HashMap key in CallManager. Kept for future
|
||||
@@ -169,12 +185,21 @@ pub struct Call {
|
||||
/// Used to construct proper 180/200/error responses back to the device.
|
||||
pub device_invite: Option<SipMessage>,
|
||||
|
||||
/// Pending in-dialog B2BUA transaction bridged across two different SIP dialogs.
|
||||
pub pending_dialog_bridge: Option<PendingDialogBridge>,
|
||||
|
||||
/// All legs in this call, keyed by leg ID.
|
||||
pub legs: HashMap<LegId, LegInfo>,
|
||||
|
||||
/// Channel to send commands to the mixer task.
|
||||
pub mixer_cmd_tx: mpsc::Sender<MixerCommand>,
|
||||
|
||||
/// Active passthrough media bridge mode, if any.
|
||||
pub media_bridge_mode: Option<String>,
|
||||
|
||||
/// Cancellation handles for non-mixer passthrough media tasks.
|
||||
media_bridge_cancel_txs: Vec<watch::Sender<bool>>,
|
||||
|
||||
/// Handle to the mixer task (aborted on call teardown).
|
||||
mixer_task: Option<JoinHandle<()>>,
|
||||
}
|
||||
@@ -196,8 +221,11 @@ impl Call {
|
||||
callee_number: None,
|
||||
provider_id,
|
||||
device_invite: None,
|
||||
pending_dialog_bridge: None,
|
||||
legs: HashMap::new(),
|
||||
mixer_cmd_tx,
|
||||
media_bridge_mode: None,
|
||||
media_bridge_cancel_txs: Vec::new(),
|
||||
mixer_task: Some(mixer_task),
|
||||
}
|
||||
}
|
||||
@@ -235,8 +263,31 @@ impl Call {
|
||||
self.created_at.elapsed().as_secs()
|
||||
}
|
||||
|
||||
pub fn clear_media_bridge(&mut self) {
|
||||
for cancel_tx in self.media_bridge_cancel_txs.drain(..) {
|
||||
let _ = cancel_tx.send(true);
|
||||
}
|
||||
self.media_bridge_mode = None;
|
||||
}
|
||||
|
||||
pub fn install_media_bridge(
|
||||
&mut self,
|
||||
mode: &str,
|
||||
cancel_txs: Vec<watch::Sender<bool>>,
|
||||
) {
|
||||
self.clear_media_bridge();
|
||||
self.media_bridge_mode = Some(mode.to_string());
|
||||
self.media_bridge_cancel_txs = cancel_txs;
|
||||
}
|
||||
|
||||
pub fn note_mixer_bridge(&mut self, mode: &str) {
|
||||
self.clear_media_bridge();
|
||||
self.media_bridge_mode = Some(mode.to_string());
|
||||
}
|
||||
|
||||
/// Shut down the mixer and abort its task.
|
||||
pub async fn shutdown_mixer(&mut self) {
|
||||
self.clear_media_bridge();
|
||||
let _ = self.mixer_cmd_tx.send(MixerCommand::Shutdown).await;
|
||||
if let Some(handle) = self.mixer_task.take() {
|
||||
handle.abort();
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -105,6 +105,8 @@ pub struct RouteAction {
|
||||
pub ring_browsers: Option<bool>,
|
||||
#[serde(rename = "voicemailBox")]
|
||||
pub voicemail_box: Option<String>,
|
||||
#[serde(rename = "faxBox")]
|
||||
pub fax_box: Option<String>,
|
||||
#[serde(rename = "ivrMenuId")]
|
||||
pub ivr_menu_id: Option<String>,
|
||||
#[serde(rename = "noAnswerTimeout")]
|
||||
@@ -161,6 +163,8 @@ pub struct AppConfig {
|
||||
pub devices: Vec<DeviceConfig>,
|
||||
pub routing: RoutingConfig,
|
||||
#[serde(default)]
|
||||
pub faxboxes: Vec<FaxBoxConfig>,
|
||||
#[serde(default)]
|
||||
pub voiceboxes: Vec<VoiceboxConfig>,
|
||||
#[serde(default)]
|
||||
pub ivr: Option<IvrConfig>,
|
||||
@@ -191,6 +195,16 @@ pub struct VoiceboxConfig {
|
||||
pub max_recording_sec: Option<u32>,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct FaxBoxConfig {
|
||||
pub id: String,
|
||||
#[serde(default)]
|
||||
pub enabled: bool,
|
||||
#[serde(rename = "maxMessages")]
|
||||
pub max_messages: Option<u32>,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// IVR config
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -415,6 +429,7 @@ pub struct InboundRouteResult {
|
||||
pub ring_all_devices: bool,
|
||||
pub ring_browsers: bool,
|
||||
pub voicemail_box: Option<String>,
|
||||
pub fax_box: Option<String>,
|
||||
pub ivr_menu_id: Option<String>,
|
||||
pub no_answer_timeout: Option<u32>,
|
||||
}
|
||||
@@ -525,6 +540,7 @@ impl AppConfig {
|
||||
ring_all_devices: explicit_targets.is_none(),
|
||||
ring_browsers: route.action.ring_browsers.unwrap_or(false),
|
||||
voicemail_box: route.action.voicemail_box.clone(),
|
||||
fax_box: route.action.fax_box.clone(),
|
||||
ivr_menu_id: route.action.ivr_menu_id.clone(),
|
||||
no_answer_timeout: route.action.no_answer_timeout,
|
||||
});
|
||||
@@ -574,6 +590,7 @@ mod tests {
|
||||
extension: "100".to_string(),
|
||||
}],
|
||||
routing: RoutingConfig { routes },
|
||||
faxboxes: vec![],
|
||||
voiceboxes: vec![],
|
||||
ivr: None,
|
||||
}
|
||||
@@ -620,6 +637,7 @@ mod tests {
|
||||
targets: Some(vec!["desk".to_string()]),
|
||||
ring_browsers: Some(true),
|
||||
voicemail_box: None,
|
||||
fax_box: None,
|
||||
ivr_menu_id: None,
|
||||
no_answer_timeout: None,
|
||||
provider: None,
|
||||
@@ -644,6 +662,7 @@ mod tests {
|
||||
targets: None,
|
||||
ring_browsers: Some(false),
|
||||
voicemail_box: Some("support-box".to_string()),
|
||||
fax_box: None,
|
||||
ivr_menu_id: None,
|
||||
no_answer_timeout: Some(20),
|
||||
provider: None,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -10,7 +10,7 @@ use crate::mixer::RtpPacket;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::{mpsc, watch};
|
||||
|
||||
/// Channel pair for connecting a leg to the mixer.
|
||||
pub struct LegChannels {
|
||||
@@ -109,3 +109,56 @@ pub fn spawn_sip_outbound(
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn a raw UDP inbound task for non-RTP passthrough media such as T.38 UDPTL.
|
||||
pub fn spawn_raw_udp_inbound(
|
||||
media_socket: Arc<UdpSocket>,
|
||||
inbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
mut cancel_rx: watch::Receiver<bool>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
let mut buf = vec![0u8; 2048];
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_rx.changed() => break,
|
||||
recv = media_socket.recv_from(&mut buf) => {
|
||||
match recv {
|
||||
Ok((n, _from)) => {
|
||||
if n == 0 {
|
||||
continue;
|
||||
}
|
||||
if inbound_tx.send(buf[..n].to_vec()).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn a raw UDP outbound task for non-RTP passthrough media such as T.38 UDPTL.
|
||||
pub fn spawn_raw_udp_outbound(
|
||||
media_socket: Arc<UdpSocket>,
|
||||
remote_media: SocketAddr,
|
||||
mut outbound_rx: mpsc::Receiver<Vec<u8>>,
|
||||
mut cancel_rx: watch::Receiver<bool>,
|
||||
) -> tokio::task::JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_rx.changed() => break,
|
||||
pkt = outbound_rx.recv() => {
|
||||
match pkt {
|
||||
Some(packet) => {
|
||||
let _ = media_socket.send_to(&packet, remote_media).await;
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -9,6 +9,8 @@ mod audio_player;
|
||||
mod call;
|
||||
mod call_manager;
|
||||
mod config;
|
||||
#[allow(dead_code)]
|
||||
mod fax_engine;
|
||||
mod ipc;
|
||||
mod jitter_buffer;
|
||||
mod leg_io;
|
||||
@@ -139,6 +141,7 @@ async fn handle_command(
|
||||
"configure" => handle_configure(engine, out_tx, &cmd).await,
|
||||
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
|
||||
"make_call" => handle_make_call(engine, out_tx, &cmd).await,
|
||||
"send_fax" => handle_send_fax(engine, out_tx, &cmd).await,
|
||||
"add_leg" => handle_add_leg(engine, out_tx, &cmd).await,
|
||||
"remove_leg" => handle_remove_leg(engine, out_tx, &cmd).await,
|
||||
// WebRTC commands — lock webrtc only (no engine contention).
|
||||
@@ -576,6 +579,162 @@ async fn handle_make_call(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd:
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle `send_fax` — place an outbound server-side fax call via SpanDSP over G.711 audio.
|
||||
async fn handle_send_fax(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let number = match cmd.params.get("number").and_then(|v| v.as_str()) {
|
||||
Some(n) => n.to_string(),
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, "missing number");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let file_path = match cmd.params.get("file_path").and_then(|v| v.as_str()) {
|
||||
Some(path) if std::path::Path::new(path).exists() => path.to_string(),
|
||||
Some(_) => {
|
||||
respond_err(out_tx, &cmd.id, "fax file does not exist");
|
||||
return;
|
||||
}
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, "missing file_path");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let provider_id = cmd.params.get("provider_id").and_then(|v| v.as_str());
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
let config_ref = match &eng.config {
|
||||
Some(c) => c.clone(),
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, "not configured");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let provider_config = if let Some(pid) = provider_id {
|
||||
config_ref.providers.iter().find(|p| p.id == pid).cloned()
|
||||
} else {
|
||||
let route = config_ref.resolve_outbound_route(&number, None, &|_| true);
|
||||
route.map(|r| r.provider)
|
||||
};
|
||||
|
||||
let mut provider_config = match provider_config {
|
||||
Some(p) => p,
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, "no provider available");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let fax_codec = if provider_config.codecs.contains(&codec_lib::PT_PCMU) {
|
||||
codec_lib::PT_PCMU
|
||||
} else if provider_config.codecs.contains(&codec_lib::PT_PCMA) {
|
||||
codec_lib::PT_PCMA
|
||||
} else {
|
||||
respond_err(
|
||||
out_tx,
|
||||
&cmd.id,
|
||||
&format!(
|
||||
"provider {} does not advertise PCMU/PCMA, which outbound fax currently requires",
|
||||
provider_config.id
|
||||
),
|
||||
);
|
||||
return;
|
||||
};
|
||||
provider_config.codecs = vec![fax_codec];
|
||||
|
||||
let (public_ip, registered_aor) = if let Some(ps_arc) = eng
|
||||
.provider_mgr
|
||||
.find_by_address(
|
||||
&provider_config
|
||||
.outbound_proxy
|
||||
.to_socket_addr()
|
||||
.unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
let ps = ps_arc.lock().await;
|
||||
(ps.public_ip.clone(), ps.registered_aor.clone())
|
||||
} else {
|
||||
(
|
||||
None,
|
||||
format!(
|
||||
"sip:{}@{}",
|
||||
provider_config.username, provider_config.domain
|
||||
),
|
||||
)
|
||||
};
|
||||
|
||||
let socket = match &eng.transport {
|
||||
Some(t) => t.socket(),
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, "not initialized");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let ProxyEngine {
|
||||
ref mut call_mgr,
|
||||
ref mut rtp_pool,
|
||||
..
|
||||
} = *eng;
|
||||
let rtp_pool = rtp_pool.as_mut().unwrap();
|
||||
|
||||
let call_id = call_mgr
|
||||
.make_outbound_call(
|
||||
&number,
|
||||
&provider_config,
|
||||
&config_ref,
|
||||
rtp_pool,
|
||||
&socket,
|
||||
public_ip.as_deref(),
|
||||
®istered_aor,
|
||||
)
|
||||
.await;
|
||||
|
||||
let call_id = match call_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
respond_err(
|
||||
out_tx,
|
||||
&cmd.id,
|
||||
"fax origination failed — provider not registered or no ports available",
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(call) = call_mgr.calls.get_mut(&call_id) {
|
||||
let provider_leg_id = format!("{call_id}-prov");
|
||||
if let Some(leg) = call.legs.get_mut(&provider_leg_id) {
|
||||
leg.codec_pt = fax_codec;
|
||||
leg.metadata
|
||||
.insert("fax_mode".to_string(), serde_json::json!("outbound-audio"));
|
||||
leg.metadata
|
||||
.insert("fax_file_path".to_string(), serde_json::json!(file_path));
|
||||
}
|
||||
}
|
||||
|
||||
emit_event(
|
||||
out_tx,
|
||||
"outbound_call_started",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"number": number,
|
||||
"provider_id": provider_config.id,
|
||||
"ring_browsers": false,
|
||||
}),
|
||||
);
|
||||
|
||||
respond_ok(
|
||||
out_tx,
|
||||
&cmd.id,
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"codec": if fax_codec == codec_lib::PT_PCMU { "PCMU" } else { "PCMA" },
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle the `hangup` command.
|
||||
async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
||||
@@ -738,6 +897,8 @@ async fn handle_webrtc_link(
|
||||
kind: crate::call::LegKind::WebRtc,
|
||||
state: crate::call::LegState::Connected,
|
||||
codec_pt: codec_lib::PT_OPUS,
|
||||
media_protocol: "webrtc",
|
||||
media_io_active: true,
|
||||
sip_leg: None,
|
||||
sip_call_id: None,
|
||||
webrtc_session_id: Some(session_id.clone()),
|
||||
@@ -762,6 +923,7 @@ async fn handle_webrtc_link(
|
||||
"state": "connected",
|
||||
"codec": "Opus",
|
||||
"rtpPort": 0,
|
||||
"mediaProtocol": "webrtc",
|
||||
"remoteMedia": null,
|
||||
"metadata": {},
|
||||
}),
|
||||
@@ -1462,6 +1624,8 @@ async fn handle_add_tool_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
|
||||
kind: crate::call::LegKind::Tool,
|
||||
state: crate::call::LegState::Connected,
|
||||
codec_pt: 0,
|
||||
media_protocol: "internal",
|
||||
media_io_active: true,
|
||||
sip_leg: None,
|
||||
sip_call_id: None,
|
||||
webrtc_session_id: None,
|
||||
@@ -1485,6 +1649,7 @@ async fn handle_add_tool_leg(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cm
|
||||
"state": "connected",
|
||||
"codec": null,
|
||||
"rtpPort": 0,
|
||||
"mediaProtocol": "internal",
|
||||
"remoteMedia": null,
|
||||
"metadata": { "tool_type": tool_type_str },
|
||||
}),
|
||||
|
||||
@@ -108,6 +108,24 @@ impl SipLeg {
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
self.send_invite_with_sdp(from_uri, to_uri, sip_call_id, socket, sdp)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn send_invite_with_sdp(
|
||||
&mut self,
|
||||
from_uri: &str,
|
||||
to_uri: &str,
|
||||
sip_call_id: &str,
|
||||
socket: &UdpSocket,
|
||||
sdp: String,
|
||||
) {
|
||||
let ip = self
|
||||
.config
|
||||
.public_ip
|
||||
.as_deref()
|
||||
.unwrap_or(&self.config.lan_ip);
|
||||
|
||||
let invite = SipMessage::create_request(
|
||||
"INVITE",
|
||||
to_uri,
|
||||
@@ -401,6 +419,10 @@ impl SipLeg {
|
||||
return SipLegAction::Send(ok.serialize());
|
||||
}
|
||||
|
||||
if method == "INVITE" || method == "UPDATE" {
|
||||
return SipLegAction::InDialogRequest(method.to_string());
|
||||
}
|
||||
|
||||
SipLegAction::None
|
||||
}
|
||||
|
||||
@@ -436,6 +458,9 @@ pub enum SipLegAction {
|
||||
StateChange(LegState),
|
||||
/// Connected — send this ACK.
|
||||
ConnectedWithAck(Vec<u8>),
|
||||
/// Provider sent an in-dialog request (re-INVITE / UPDATE) that needs
|
||||
/// call-manager-specific handling.
|
||||
InDialogRequest(String),
|
||||
/// Terminated with a reason.
|
||||
Terminated(String),
|
||||
/// Send 200 OK and terminate.
|
||||
|
||||
Reference in New Issue
Block a user