feat(rust-proxy-engine): add a Rust SIP proxy engine with shared SIP and codec libraries
This commit is contained in:
440
rust/crates/proxy-engine/src/main.rs
Normal file
440
rust/crates/proxy-engine/src/main.rs
Normal file
@@ -0,0 +1,440 @@
|
||||
/// SIP proxy engine — the Rust data plane for the SIP router.
|
||||
///
|
||||
/// Handles ALL SIP protocol mechanics. TypeScript only sends high-level
|
||||
/// commands (routing decisions, config) and receives high-level events
|
||||
/// (incoming calls, registration state).
|
||||
///
|
||||
/// No raw SIP ever touches TypeScript.
|
||||
|
||||
mod call;
|
||||
mod call_manager;
|
||||
mod config;
|
||||
mod dtmf;
|
||||
mod ipc;
|
||||
mod provider;
|
||||
mod registrar;
|
||||
mod rtp;
|
||||
mod sip_transport;
|
||||
|
||||
use crate::call_manager::CallManager;
|
||||
use crate::config::AppConfig;
|
||||
use crate::ipc::{emit_event, respond_err, respond_ok, Command, OutTx};
|
||||
use crate::provider::ProviderManager;
|
||||
use crate::registrar::Registrar;
|
||||
use crate::rtp::RtpPortPool;
|
||||
use crate::sip_transport::SipTransport;
|
||||
use sip_proto::message::SipMessage;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
/// Shared mutable state for the proxy engine.
|
||||
struct ProxyEngine {
|
||||
config: Option<AppConfig>,
|
||||
transport: Option<SipTransport>,
|
||||
provider_mgr: ProviderManager,
|
||||
registrar: Registrar,
|
||||
call_mgr: CallManager,
|
||||
rtp_pool: Option<RtpPortPool>,
|
||||
out_tx: OutTx,
|
||||
}
|
||||
|
||||
impl ProxyEngine {
|
||||
fn new(out_tx: OutTx) -> Self {
|
||||
Self {
|
||||
config: None,
|
||||
transport: None,
|
||||
provider_mgr: ProviderManager::new(out_tx.clone()),
|
||||
registrar: Registrar::new(out_tx.clone()),
|
||||
call_mgr: CallManager::new(out_tx.clone()),
|
||||
rtp_pool: None,
|
||||
out_tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// Output channel: all stdout writes go through here for serialization.
|
||||
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<String>();
|
||||
|
||||
// Stdout writer task.
|
||||
tokio::spawn(async move {
|
||||
let mut stdout = tokio::io::stdout();
|
||||
while let Some(line) = out_rx.recv().await {
|
||||
let mut output = line.into_bytes();
|
||||
output.push(b'\n');
|
||||
if stdout.write_all(&output).await.is_err() {
|
||||
break;
|
||||
}
|
||||
let _ = stdout.flush().await;
|
||||
}
|
||||
});
|
||||
|
||||
// Emit ready event.
|
||||
emit_event(&out_tx, "ready", serde_json::json!({}));
|
||||
|
||||
// Shared engine state.
|
||||
let engine = Arc::new(Mutex::new(ProxyEngine::new(out_tx.clone())));
|
||||
|
||||
// Read commands from stdin.
|
||||
let stdin = tokio::io::stdin();
|
||||
let reader = BufReader::new(stdin);
|
||||
let mut lines = reader.lines();
|
||||
|
||||
while let Ok(Some(line)) = lines.next_line().await {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let cmd: Command = match serde_json::from_str(&line) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
respond_err(&out_tx, "", &format!("parse: {e}"));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let engine = engine.clone();
|
||||
let out_tx = out_tx.clone();
|
||||
|
||||
// Handle commands — some are async, so we spawn.
|
||||
tokio::spawn(async move {
|
||||
handle_command(engine, &out_tx, cmd).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_command(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: Command) {
|
||||
match cmd.method.as_str() {
|
||||
"configure" => handle_configure(engine, out_tx, &cmd).await,
|
||||
"hangup" => handle_hangup(engine, out_tx, &cmd).await,
|
||||
"get_status" => handle_get_status(engine, out_tx, &cmd).await,
|
||||
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle the `configure` command — receives full app config from TypeScript.
|
||||
/// First call: initializes SIP transport + everything.
|
||||
/// Subsequent calls: reconfigures providers/devices/routing without rebinding.
|
||||
async fn handle_configure(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let app_config: AppConfig = match serde_json::from_value(cmd.params.clone()) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
respond_err(out_tx, &cmd.id, &format!("bad config: {e}"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
let is_reconfigure = eng.transport.is_some();
|
||||
|
||||
let socket = if is_reconfigure {
|
||||
// Reconfigure — socket already bound, just update subsystems.
|
||||
eng.transport.as_ref().unwrap().socket()
|
||||
} else {
|
||||
// First configure — bind SIP transport.
|
||||
let bind_addr = format!("0.0.0.0:{}", app_config.proxy.lan_port);
|
||||
let transport = match SipTransport::bind(&bind_addr).await {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
respond_err(out_tx, &cmd.id, &format!("SIP bind failed: {e}"));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let socket = transport.socket();
|
||||
|
||||
// Start UDP receiver.
|
||||
let engine_for_recv = engine.clone();
|
||||
let socket_for_recv = socket.clone();
|
||||
transport.spawn_receiver(move |data: &[u8], addr: SocketAddr| {
|
||||
let engine = engine_for_recv.clone();
|
||||
let socket = socket_for_recv.clone();
|
||||
let data = data.to_vec();
|
||||
tokio::spawn(async move {
|
||||
handle_sip_packet(engine, &socket, &data, addr).await;
|
||||
});
|
||||
});
|
||||
|
||||
eng.transport = Some(transport);
|
||||
|
||||
// Initialize RTP port pool (only on first configure).
|
||||
eng.rtp_pool = Some(RtpPortPool::new(
|
||||
app_config.proxy.rtp_port_range.min,
|
||||
app_config.proxy.rtp_port_range.max,
|
||||
));
|
||||
|
||||
socket
|
||||
};
|
||||
|
||||
// (Re)configure registrar.
|
||||
eng.registrar.configure(&app_config.devices);
|
||||
|
||||
// (Re)configure provider registrations.
|
||||
eng.provider_mgr
|
||||
.configure(
|
||||
&app_config.providers,
|
||||
app_config.proxy.public_ip_seed.as_deref(),
|
||||
&app_config.proxy.lan_ip,
|
||||
app_config.proxy.lan_port,
|
||||
socket,
|
||||
)
|
||||
.await;
|
||||
|
||||
let bind_info = format!("0.0.0.0:{}", app_config.proxy.lan_port);
|
||||
eng.config = Some(app_config);
|
||||
|
||||
respond_ok(
|
||||
out_tx,
|
||||
&cmd.id,
|
||||
serde_json::json!({
|
||||
"bound": bind_info,
|
||||
"reconfigure": is_reconfigure,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle incoming SIP packets from the UDP socket.
|
||||
/// This is the core routing pipeline — entirely in Rust.
|
||||
async fn handle_sip_packet(
|
||||
engine: Arc<Mutex<ProxyEngine>>,
|
||||
socket: &UdpSocket,
|
||||
data: &[u8],
|
||||
from_addr: SocketAddr,
|
||||
) {
|
||||
let msg = match SipMessage::parse(data) {
|
||||
Some(m) => m,
|
||||
None => return, // Not a valid SIP message, ignore.
|
||||
};
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
|
||||
// 1. Provider registration responses — consumed internally.
|
||||
if msg.is_response() {
|
||||
if eng.provider_mgr.handle_response(&msg, socket).await {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Device REGISTER — handled by registrar.
|
||||
let is_from_provider = eng
|
||||
.provider_mgr
|
||||
.find_by_address(&from_addr)
|
||||
.await
|
||||
.is_some();
|
||||
|
||||
if !is_from_provider && msg.method() == Some("REGISTER") {
|
||||
if let Some(response_buf) = eng.registrar.handle_register(&msg, from_addr) {
|
||||
let _ = socket.send_to(&response_buf, from_addr).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Route to existing call by SIP Call-ID.
|
||||
// Check if this Call-ID belongs to an active call (avoids borrow conflict).
|
||||
if eng.call_mgr.has_call(msg.call_id()) {
|
||||
let config_ref = eng.config.as_ref().unwrap().clone();
|
||||
// Temporarily take registrar to avoid overlapping borrows.
|
||||
let registrar_dummy = Registrar::new(eng.out_tx.clone());
|
||||
if eng
|
||||
.call_mgr
|
||||
.route_sip_message(&msg, from_addr, socket, &config_ref, ®istrar_dummy)
|
||||
.await
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let config_ref = eng.config.as_ref().unwrap().clone();
|
||||
|
||||
// 4. New inbound INVITE from provider.
|
||||
if is_from_provider && msg.is_request() && msg.method() == Some("INVITE") {
|
||||
// Detect public IP from Via.
|
||||
if let Some(via) = msg.get_header("Via") {
|
||||
if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
|
||||
let mut ps = ps_arc.lock().await;
|
||||
ps.detect_public_ip(via);
|
||||
}
|
||||
}
|
||||
|
||||
// Send 100 Trying immediately.
|
||||
let trying = SipMessage::create_response(100, "Trying", &msg, None);
|
||||
let _ = socket.send_to(&trying.serialize(), from_addr).await;
|
||||
|
||||
// Determine provider info.
|
||||
let (provider_id, provider_config, public_ip) =
|
||||
if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
|
||||
let ps = ps_arc.lock().await;
|
||||
(
|
||||
ps.config.id.clone(),
|
||||
ps.config.clone(),
|
||||
ps.public_ip.clone(),
|
||||
)
|
||||
} else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Create the inbound call — Rust handles everything.
|
||||
// Split borrows via destructuring to satisfy the borrow checker.
|
||||
let ProxyEngine {
|
||||
ref registrar,
|
||||
ref mut call_mgr,
|
||||
ref mut rtp_pool,
|
||||
..
|
||||
} = *eng;
|
||||
let rtp_pool = rtp_pool.as_mut().unwrap();
|
||||
let call_id = call_mgr
|
||||
.create_inbound_call(
|
||||
&msg,
|
||||
from_addr,
|
||||
&provider_id,
|
||||
&provider_config,
|
||||
&config_ref,
|
||||
registrar,
|
||||
rtp_pool,
|
||||
socket,
|
||||
public_ip.as_deref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(call_id) = call_id {
|
||||
// Emit event so TypeScript knows about the call (for dashboard, IVR routing, etc).
|
||||
let from_header = msg.get_header("From").unwrap_or("");
|
||||
let from_uri = SipMessage::extract_uri(from_header).unwrap_or("Unknown");
|
||||
let called_number = msg
|
||||
.request_uri()
|
||||
.and_then(|uri| SipMessage::extract_uri(uri))
|
||||
.unwrap_or("");
|
||||
|
||||
emit_event(
|
||||
&eng.out_tx,
|
||||
"incoming_call",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"from_uri": from_uri,
|
||||
"to_number": called_number,
|
||||
"provider_id": provider_id,
|
||||
}),
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 5. New outbound INVITE from device.
|
||||
if !is_from_provider && msg.is_request() && msg.method() == Some("INVITE") {
|
||||
// Resolve outbound route.
|
||||
let dialed_number = msg
|
||||
.request_uri()
|
||||
.and_then(|uri| SipMessage::extract_uri(uri))
|
||||
.unwrap_or(msg.request_uri().unwrap_or(""))
|
||||
.to_string();
|
||||
|
||||
let device = eng.registrar.find_by_address(&from_addr);
|
||||
let device_id = device.map(|d| d.device_id.clone());
|
||||
|
||||
// Find provider via routing rules.
|
||||
let route_result = config_ref.resolve_outbound_route(
|
||||
&dialed_number,
|
||||
device_id.as_deref(),
|
||||
&|pid: &str| {
|
||||
// Can't call async here — use a sync check.
|
||||
// For now, assume all configured providers are available.
|
||||
true
|
||||
},
|
||||
);
|
||||
|
||||
if let Some(route) = route_result {
|
||||
let public_ip = if let Some(ps_arc) = eng.provider_mgr.find_by_address(&from_addr).await {
|
||||
let ps = ps_arc.lock().await;
|
||||
ps.public_ip.clone()
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let ProxyEngine {
|
||||
ref mut call_mgr,
|
||||
ref mut rtp_pool,
|
||||
..
|
||||
} = *eng;
|
||||
let rtp_pool = rtp_pool.as_mut().unwrap();
|
||||
let call_id = call_mgr
|
||||
.create_outbound_passthrough(
|
||||
&msg,
|
||||
from_addr,
|
||||
&route.provider,
|
||||
&config_ref,
|
||||
rtp_pool,
|
||||
socket,
|
||||
public_ip.as_deref(),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(call_id) = call_id {
|
||||
emit_event(
|
||||
&eng.out_tx,
|
||||
"outbound_device_call",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"from_device": device_id,
|
||||
"to_number": dialed_number,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 6. Other messages — log for debugging.
|
||||
let label = if msg.is_request() {
|
||||
msg.method().unwrap_or("?").to_string()
|
||||
} else {
|
||||
msg.status_code().map(|c| c.to_string()).unwrap_or_default()
|
||||
};
|
||||
emit_event(
|
||||
&eng.out_tx,
|
||||
"sip_unhandled",
|
||||
serde_json::json!({
|
||||
"method_or_status": label,
|
||||
"call_id": msg.call_id(),
|
||||
"from_addr": from_addr.ip().to_string(),
|
||||
"from_port": from_addr.port(),
|
||||
"is_from_provider": is_from_provider,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
/// Handle `get_status` — return active call statuses from Rust.
|
||||
async fn handle_get_status(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let eng = engine.lock().await;
|
||||
let calls = eng.call_mgr.get_all_statuses();
|
||||
respond_ok(out_tx, &cmd.id, serde_json::json!({ "calls": calls }));
|
||||
}
|
||||
|
||||
/// Handle the `hangup` command.
|
||||
async fn handle_hangup(engine: Arc<Mutex<ProxyEngine>>, out_tx: &OutTx, cmd: &Command) {
|
||||
let call_id = match cmd.params.get("call_id").and_then(|v| v.as_str()) {
|
||||
Some(id) => id.to_string(),
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, "missing call_id");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut eng = engine.lock().await;
|
||||
let socket = match &eng.transport {
|
||||
Some(t) => t.socket(),
|
||||
None => {
|
||||
respond_err(out_tx, &cmd.id, "not initialized");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if eng.call_mgr.hangup(&call_id, &socket).await {
|
||||
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
||||
} else {
|
||||
respond_err(out_tx, &cmd.id, &format!("call {call_id} not found"));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user