6 Commits

30 changed files with 773 additions and 1327 deletions

View File

@@ -1,5 +1,29 @@
# Changelog
## 2026-04-10 - 1.17.1 - fix(proxy-engine,codec-lib,sip-proto,ts)
preserve negotiated media details and improve RTP audio handling across call legs
- Use native Opus float encode/decode to avoid unnecessary i16 quantization in the f32 audio path.
- Parse full RTP headers including extensions and sequence numbers, then sort inbound packets before decoding to keep codec state stable for out-of-order audio.
- Capture negotiated codec payload types from SDP offers and answers and include codec, RTP port, remote media, and metadata in leg_added events.
- Emit leg_state_changed and leg_removed events more consistently so the dashboard reflects leg lifecycle updates accurately.
## 2026-04-10 - 1.17.0 - feat(proxy-engine)
upgrade the internal audio bus to 48kHz f32 with per-leg denoising and improve SIP leg routing
- switch mixer, prompt playback, and tool leg audio handling from 16kHz i16 to 48kHz f32 for higher-quality internal processing
- add f32 decode/encode and resampling support plus standalone RNNoise denoiser creation in codec-lib
- apply per-leg inbound noise suppression in the mixer before mix-minus generation
- fix passthrough call routing by matching the actual leg from the signaling source address when Call-IDs are shared
- correct dialed number extraction from bare SIP request URIs by parsing the user part directly
## 2026-04-10 - 1.16.0 - feat(proxy-engine)
integrate Kokoro TTS generation into proxy-engine and simplify TypeScript prompt handling to use cached WAV files
- adds a generate_tts command to proxy-engine with lazy-loaded Kokoro model support and WAV output generation
- removes standalone opus-codec and tts-engine workspace binaries by consolidating TTS generation into proxy-engine
- updates announcement and prompt cache flows to generate and cache WAV files on disk instead of pre-encoding RTP frames in TypeScript
## 2026-04-10 - 1.15.0 - feat(proxy-engine)
add device leg, leg transfer, and leg replacement call controls

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,6 @@
{
"name": "siprouter",
"version": "1.15.0",
"version": "1.17.1",
"private": true,
"type": "module",
"scripts": {

23
rust/Cargo.lock generated
View File

@@ -1881,16 +1881,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "opus-codec"
version = "0.2.0"
dependencies = [
"base64 0.22.1",
"codec-lib",
"serde",
"serde_json",
]
[[package]]
name = "ort"
version = "2.0.0-rc.11"
@@ -2188,6 +2178,9 @@ dependencies = [
"base64 0.22.1",
"codec-lib",
"hound",
"kokoro-tts",
"nnnoiseless",
"ort",
"rand 0.8.5",
"regex-lite",
"serde",
@@ -3008,16 +3001,6 @@ dependencies = [
"strength_reduce",
]
[[package]]
name = "tts-engine"
version = "0.1.0"
dependencies = [
"hound",
"kokoro-tts",
"ort",
"tokio",
]
[[package]]
name = "turn"
version = "0.6.1"

View File

@@ -1,8 +1,6 @@
[workspace]
members = [
"crates/codec-lib",
"crates/opus-codec",
"crates/tts-engine",
"crates/sip-proto",
"crates/proxy-engine",
]

View File

@@ -1,7 +1,7 @@
//! Audio codec library for the SIP router.
//!
//! Handles Opus ↔ G.722 ↔ PCMU/PCMA transcoding with ML noise suppression.
//! Used by both the standalone `opus-codec` CLI and the `proxy-engine` binary.
//! Used by the `proxy-engine` binary for all audio transcoding.
use audiopus::coder::{Decoder as OpusDecoder, Encoder as OpusEncoder};
use audiopus::packet::Packet as OpusPacket;
@@ -104,6 +104,8 @@ pub struct TranscodeState {
g722_dec: libg722::decoder::Decoder,
/// Cached FFT resamplers keyed by (from_rate, to_rate, chunk_size).
resamplers: HashMap<(u32, u32, usize), FftFixedIn<f64>>,
/// Cached f32 FFT resamplers keyed by (from_rate, to_rate, chunk_size).
resamplers_f32: HashMap<(u32, u32, usize), FftFixedIn<f32>>,
/// ML noise suppression for the SIP-bound direction.
denoiser_to_sip: Box<DenoiseState<'static>>,
/// ML noise suppression for the browser-bound direction.
@@ -133,6 +135,7 @@ impl TranscodeState {
g722_enc,
g722_dec,
resamplers: HashMap::new(),
resamplers_f32: HashMap::new(),
denoiser_to_sip: DenoiseState::new(),
denoiser_to_browser: DenoiseState::new(),
})
@@ -293,6 +296,126 @@ impl TranscodeState {
_ => Err(format!("unsupported target PT {pt}")),
}
}
// ---- f32 API for high-quality internal bus ----------------------------
/// Decode an encoded audio payload to f32 PCM samples in [-1.0, 1.0].
/// Returns (samples, sample_rate).
///
/// For Opus, uses native float decode (no i16 quantization).
/// For G.722/G.711, decodes to i16 then converts (codec is natively i16).
pub fn decode_to_f32(&mut self, data: &[u8], pt: u8) -> Result<(Vec<f32>, u32), String> {
match pt {
PT_OPUS => {
let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz
let packet =
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
let n: usize = self
.opus_dec
.decode_float(Some(packet), out, false)
.map_err(|e| format!("opus decode_float: {e}"))?
.into();
pcm.truncate(n);
Ok((pcm, 48000))
}
_ => {
// G.722, PCMU, PCMA: natively i16 codecs — decode then convert.
let (pcm_i16, rate) = self.decode_to_pcm(data, pt)?;
let pcm_f32 = pcm_i16.iter().map(|&s| s as f32 / 32768.0).collect();
Ok((pcm_f32, rate))
}
}
}
/// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec.
///
/// For Opus, uses native float encode (no i16 quantization).
/// For G.722/G.711, converts to i16 then encodes (codec is natively i16).
pub fn encode_from_f32(&mut self, pcm: &[f32], pt: u8) -> Result<Vec<u8>, String> {
match pt {
PT_OPUS => {
let mut buf = vec![0u8; 4000];
let n: usize = self
.opus_enc
.encode_float(pcm, &mut buf)
.map_err(|e| format!("opus encode_float: {e}"))?
.into();
buf.truncate(n);
Ok(buf)
}
_ => {
// G.722, PCMU, PCMA: natively i16 codecs.
let pcm_i16: Vec<i16> = pcm
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();
self.encode_from_pcm(&pcm_i16, pt)
}
}
}
/// High-quality sample rate conversion for f32 PCM using rubato FFT resampler.
/// Uses a separate cache from the i16 resampler.
pub fn resample_f32(
&mut self,
pcm: &[f32],
from_rate: u32,
to_rate: u32,
) -> Result<Vec<f32>, String> {
if from_rate == to_rate || pcm.is_empty() {
return Ok(pcm.to_vec());
}
let chunk = pcm.len();
let key = (from_rate, to_rate, chunk);
if !self.resamplers_f32.contains_key(&key) {
let r =
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, chunk, 1, 1)
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
self.resamplers_f32.insert(key, r);
}
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
let input = vec![pcm.to_vec()];
let result = resampler
.process(&input, None)
.map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?;
Ok(result[0].clone())
}
/// Apply RNNoise ML noise suppression to 48kHz f32 PCM audio.
/// Processes in 480-sample (10ms) frames. State persists across calls.
/// Operates natively in f32 — no i16 conversion overhead.
pub fn denoise_f32(denoiser: &mut DenoiseState, pcm: &[f32]) -> Vec<f32> {
let frame_size = DenoiseState::FRAME_SIZE; // 480
let total = pcm.len();
let whole = (total / frame_size) * frame_size;
let mut output = Vec::with_capacity(total);
let mut out_buf = [0.0f32; 480];
// nnnoiseless expects f32 samples scaled as i16 range (-32768..32767).
for offset in (0..whole).step_by(frame_size) {
let input: Vec<f32> = pcm[offset..offset + frame_size]
.iter()
.map(|&s| s * 32768.0)
.collect();
denoiser.process_frame(&mut out_buf, &input);
output.extend(out_buf.iter().map(|&s| s / 32768.0));
}
if whole < total {
output.extend_from_slice(&pcm[whole..]);
}
output
}
}
/// Create a new standalone denoiser for per-leg inbound processing.
pub fn new_denoiser() -> Box<DenoiseState<'static>> {
DenoiseState::new()
}
#[cfg(test)]

View File

@@ -1,14 +0,0 @@
[package]
name = "opus-codec"
version = "0.2.0"
edition = "2021"
[[bin]]
name = "opus-codec"
path = "src/main.rs"
[dependencies]
codec-lib = { path = "../codec-lib" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
base64 = "0.22"

View File

@@ -1,286 +0,0 @@
/// Audio transcoding bridge for smartrust.
///
/// Thin CLI wrapper around `codec-lib`. Handles Opus ↔ G.722 ↔ PCMU transcoding.
///
/// Protocol:
/// -> {"id":"1","method":"init","params":{}}
/// <- {"id":"1","success":true,"result":{}}
/// -> {"id":"2","method":"create_session","params":{"session_id":"call-abc"}}
/// <- {"id":"2","success":true,"result":{}}
/// -> {"id":"3","method":"transcode","params":{"session_id":"call-abc","data_b64":"...","from_pt":111,"to_pt":9}}
/// <- {"id":"3","success":true,"result":{"data_b64":"..."}}
/// -> {"id":"4","method":"destroy_session","params":{"session_id":"call-abc"}}
/// <- {"id":"4","success":true,"result":{}}
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine as _;
use codec_lib::{codec_sample_rate, TranscodeState};
use serde::Deserialize;
use std::collections::HashMap;
use std::io::{self, BufRead, Write};
#[derive(Deserialize)]
struct Request {
id: String,
method: String,
#[serde(default)]
params: serde_json::Value,
}
fn respond(
out: &mut impl Write,
id: &str,
success: bool,
result: Option<serde_json::Value>,
error: Option<&str>,
) {
let mut resp = serde_json::json!({ "id": id, "success": success });
if let Some(r) = result {
resp["result"] = r;
}
if let Some(e) = error {
resp["error"] = serde_json::Value::String(e.to_string());
}
let _ = writeln!(out, "{}", resp);
let _ = out.flush();
}
/// Resolve a session: if session_id is provided, look it up in the sessions map;
/// otherwise fall back to the default state (backward compat with `init`).
fn get_session<'a>(
sessions: &'a mut HashMap<String, TranscodeState>,
default: &'a mut Option<TranscodeState>,
params: &serde_json::Value,
) -> Option<&'a mut TranscodeState> {
if let Some(sid) = params.get("session_id").and_then(|v| v.as_str()) {
sessions.get_mut(sid)
} else {
default.as_mut()
}
}
fn main() {
let stdin = io::stdin();
let stdout = io::stdout();
let mut out = io::BufWriter::new(stdout.lock());
let _ = writeln!(out, r#"{{"event":"ready","data":{{}}}}"#);
let _ = out.flush();
let mut default_state: Option<TranscodeState> = None;
let mut sessions: HashMap<String, TranscodeState> = HashMap::new();
for line in stdin.lock().lines() {
let line = match line {
Ok(l) if !l.trim().is_empty() => l,
Ok(_) => continue,
Err(_) => break,
};
let req: Request = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
respond(&mut out, "", false, None, Some(&format!("parse: {e}")));
continue;
}
};
match req.method.as_str() {
"init" => match TranscodeState::new() {
Ok(s) => {
default_state = Some(s);
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
}
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
},
"create_session" => {
let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => {
respond(&mut out, &req.id, false, None, Some("missing session_id"));
continue;
}
};
if sessions.contains_key(&session_id) {
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
continue;
}
match TranscodeState::new() {
Ok(s) => {
sessions.insert(session_id, s);
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
}
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
}
}
"destroy_session" => {
let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s,
None => {
respond(&mut out, &req.id, false, None, Some("missing session_id"));
continue;
}
};
sessions.remove(session_id);
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
}
"transcode" => {
let st = match get_session(&mut sessions, &mut default_state, &req.params) {
Some(s) => s,
None => {
respond(
&mut out,
&req.id,
false,
None,
Some("not initialized (no session or default state)"),
);
continue;
}
};
let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) {
Some(s) => s,
None => {
respond(&mut out, &req.id, false, None, Some("missing data_b64"));
continue;
}
};
let from_pt =
req.params.get("from_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
let direction = req.params.get("direction").and_then(|v| v.as_str());
let data = match B64.decode(data_b64) {
Ok(b) => b,
Err(e) => {
respond(
&mut out,
&req.id,
false,
None,
Some(&format!("b64: {e}")),
);
continue;
}
};
match st.transcode(&data, from_pt, to_pt, direction) {
Ok(result) => {
respond(
&mut out,
&req.id,
true,
Some(serde_json::json!({ "data_b64": B64.encode(&result) })),
None,
);
}
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
}
}
"encode_pcm" => {
let st = match get_session(&mut sessions, &mut default_state, &req.params) {
Some(s) => s,
None => {
respond(
&mut out,
&req.id,
false,
None,
Some("not initialized (no session or default state)"),
);
continue;
}
};
let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) {
Some(s) => s,
None => {
respond(&mut out, &req.id, false, None, Some("missing data_b64"));
continue;
}
};
let sample_rate = req
.params
.get("sample_rate")
.and_then(|v| v.as_u64())
.unwrap_or(22050) as u32;
let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8;
let data = match B64.decode(data_b64) {
Ok(b) => b,
Err(e) => {
respond(
&mut out,
&req.id,
false,
None,
Some(&format!("b64: {e}")),
);
continue;
}
};
if data.len() % 2 != 0 {
respond(
&mut out,
&req.id,
false,
None,
Some("PCM data has odd byte count (expected 16-bit LE samples)"),
);
continue;
}
let pcm: Vec<i16> = data
.chunks_exact(2)
.map(|c| i16::from_le_bytes([c[0], c[1]]))
.collect();
let target_rate = codec_sample_rate(to_pt);
let resampled = match st.resample(&pcm, sample_rate, target_rate) {
Ok(r) => r,
Err(e) => {
respond(&mut out, &req.id, false, None, Some(&e));
continue;
}
};
match st.encode_from_pcm(&resampled, to_pt) {
Ok(encoded) => {
respond(
&mut out,
&req.id,
true,
Some(serde_json::json!({ "data_b64": B64.encode(&encoded) })),
None,
);
}
Err(e) => {
respond(&mut out, &req.id, false, None, Some(&e));
}
}
}
"encode" | "decode" => {
respond(
&mut out,
&req.id,
false,
None,
Some("use 'transcode' command instead"),
);
}
_ => respond(
&mut out,
&req.id,
false,
None,
Some(&format!("unknown: {}", req.method)),
),
}
}
}

View File

@@ -10,6 +10,7 @@ path = "src/main.rs"
[dependencies]
codec-lib = { path = "../codec-lib" }
sip-proto = { path = "../sip-proto" }
nnnoiseless = { version = "0.5", default-features = false }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
@@ -18,3 +19,8 @@ regex-lite = "0.1"
webrtc = "0.8"
rand = "0.8"
hound = "3.5"
kokoro-tts = { version = "0.3", default-features = false }
ort = { version = "=2.0.0-rc.11", default-features = false, features = [
"std", "download-binaries", "copy-dylibs", "ndarray",
"tls-native-vendored"
] }

View File

@@ -10,9 +10,9 @@ use tokio::net::UdpSocket;
use tokio::time::{self, Duration};
/// Mixing sample rate used by the mixer (must stay in sync with mixer::MIX_RATE).
const MIX_RATE: u32 = 16000;
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320;
const MIX_FRAME_SIZE: usize = 960;
/// Play a WAV file as RTP to a destination.
/// Returns when playback is complete.
@@ -178,9 +178,9 @@ pub async fn play_beep(
Ok((seq, ts))
}
/// Load a WAV file and split it into 20ms PCM frames at 16kHz.
/// Load a WAV file and split it into 20ms f32 PCM frames at 48kHz.
/// Used by the leg interaction system to prepare prompt audio for the mixer.
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
let path = Path::new(wav_path);
if !path.exists() {
return Err(format!("WAV file not found: {wav_path}"));
@@ -191,17 +191,17 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
let spec = reader.spec();
let wav_rate = spec.sample_rate;
// Read all samples as i16.
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
// Read all samples as f32 in [-1.0, 1.0].
let samples: Vec<f32> = if spec.bits_per_sample == 16 {
reader
.samples::<i16>()
.filter_map(|s| s.ok())
.map(|s| s as f32 / 32768.0)
.collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
.filter_map(|s| s.ok())
.map(|s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect()
} else {
return Err(format!(
@@ -214,24 +214,24 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
return Ok(vec![]);
}
// Resample to MIX_RATE (16kHz) if needed.
// Resample to MIX_RATE (48kHz) if needed.
let resampled = if wav_rate != MIX_RATE {
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
transcoder
.resample(&samples, wav_rate, MIX_RATE)
.resample_f32(&samples, wav_rate, MIX_RATE)
.map_err(|e| format!("resample: {e}"))?
} else {
samples
};
// Split into MIX_FRAME_SIZE (320) sample frames.
// Split into MIX_FRAME_SIZE (960) sample frames.
let mut frames = Vec::new();
let mut offset = 0;
while offset < resampled.len() {
let end = (offset + MIX_FRAME_SIZE).min(resampled.len());
let mut frame = resampled[offset..end].to_vec();
// Pad short final frame with silence.
frame.resize(MIX_FRAME_SIZE, 0);
frame.resize(MIX_FRAME_SIZE, 0.0);
frames.push(frame);
offset += MIX_FRAME_SIZE;
}

View File

@@ -20,6 +20,35 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
/// Emit a `leg_added` event with full leg information.
/// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed.
fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
let metadata: serde_json::Value = if leg.metadata.is_empty() {
serde_json::json!({})
} else {
serde_json::Value::Object(
leg.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)
};
emit_event(
tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg.id,
"kind": leg.kind.as_str(),
"state": leg.state.as_str(),
"codec": sip_proto::helpers::codec_name(leg.codec_pt),
"rtpPort": leg.rtp_port,
"remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
"metadata": metadata,
}),
);
}
pub struct CallManager {
/// All active calls, keyed by internal call ID.
pub calls: HashMap<String, Call>,
@@ -120,7 +149,19 @@ impl CallManager {
}
// Passthrough-style routing for inbound/outbound device↔provider calls.
self.route_passthrough_message(&call_id, &leg_id, msg, from_addr, socket, config)
// The sip_index only stores one leg for shared Call-IDs, so we need to
// determine which leg the message actually belongs to by comparing from_addr.
let actual_leg_id = self
.calls
.get(&call_id)
.and_then(|call| {
call.legs
.values()
.find(|l| l.signaling_addr == Some(from_addr))
.map(|l| l.id.clone())
})
.unwrap_or(leg_id);
self.route_passthrough_message(&call_id, &actual_leg_id, msg, from_addr, socket, config)
.await
}
@@ -253,6 +294,11 @@ impl CallManager {
dev_leg.state = LegState::Connected;
}
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": dev_leg_id, "state": "connected" }),
);
// Wire device leg to mixer.
if let Some(dev_remote_addr) = dev_remote {
@@ -312,6 +358,8 @@ impl CallManager {
leg.state = LegState::Terminated;
}
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
self.terminate_call(call_id).await;
@@ -517,21 +565,30 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Connected;
// Learn remote media from SDP.
// Learn remote media and negotiated codec from SDP answer.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
leg.remote_media = Some(addr);
}
// Use the codec from the SDP answer (what the remote actually selected).
if let Some(pt) = ep.codec_pt {
leg.codec_pt = pt;
}
}
}
needs_wiring = true;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
if call.state != CallState::Connected {
call.state = CallState::Connected;
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
@@ -677,15 +734,19 @@ impl CallManager {
call.callee_number = Some(called_number);
call.state = CallState::Ringing;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
let mut codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
// Provider leg — extract media from SDP.
// Provider leg — extract media and negotiated codec from SDP.
let mut provider_media: Option<SocketAddr> = None;
if invite.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&invite.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
provider_media = Some(addr);
}
// Use the codec from the provider's SDP offer (what they actually want to use).
if let Some(pt) = ep.codec_pt {
codec_pt = pt;
}
}
}
@@ -755,6 +816,16 @@ impl CallManager {
// Store the call.
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs.
if let Some(call) = self.calls.get(&call_id) {
if let Some(leg) = call.legs.get(&provider_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
if let Some(leg) = call.legs.get(&device_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -842,6 +913,14 @@ impl CallManager {
.insert(sip_call_id, (call_id.clone(), leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -866,11 +945,18 @@ impl CallManager {
let lan_port = config.proxy.lan_port;
let device_sip_call_id = invite.call_id().to_string();
// Extract just the user part from the request URI (e.g., "sip:16196000@10.0.0.1" → "16196000").
// extract_uri is for header values with angle brackets, not bare request URIs.
let dialed_number = invite
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or(invite.request_uri().unwrap_or(""))
.to_string();
.map(|uri| {
let stripped = uri
.strip_prefix("sip:")
.or_else(|| uri.strip_prefix("sips:"))
.unwrap_or(uri);
stripped.split('@').next().unwrap_or(stripped).to_string()
})
.unwrap_or_default();
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
Some(a) => a,
@@ -983,6 +1069,14 @@ impl CallManager {
.insert(provider_sip_call_id, (call_id.clone(), provider_leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs (device + provider).
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -1050,17 +1144,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-provider",
"state": "inviting",
"number": number,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1126,17 +1214,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-device",
"state": "inviting",
"device_id": device_id,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1223,6 +1305,13 @@ impl CallManager {
None => return false,
};
// Emit leg_removed for source call.
emit_event(
&self.out_tx,
"leg_removed",
serde_json::json!({ "call_id": source_call_id, "leg_id": leg_id }),
);
// Update SIP index to point to the target call.
if let Some(sip_cid) = &leg_info.sip_call_id {
self.sip_index.insert(
@@ -1255,15 +1344,12 @@ impl CallManager {
let target_call = self.calls.get_mut(target_call_id).unwrap();
target_call.legs.insert(leg_id.to_string(), leg_info);
emit_event(
&self.out_tx,
"leg_transferred",
serde_json::json!({
"leg_id": leg_id,
"source_call_id": source_call_id,
"target_call_id": target_call_id,
}),
);
// Emit leg_added for target call.
if let Some(target) = self.calls.get(target_call_id) {
if let Some(leg) = target.legs.get(leg_id) {
emit_leg_added_event(&self.out_tx, target_call_id, leg);
}
}
// Check if source call has too few legs remaining.
let source_call = self.calls.get(source_call_id).unwrap();
@@ -1366,6 +1452,11 @@ impl CallManager {
}
}
leg.state = LegState::Terminated;
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg.id, "state": "terminated" }),
);
}
emit_event(
@@ -1484,6 +1575,13 @@ impl CallManager {
);
self.calls.insert(call_id.to_string(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
// Build recording path.
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)

View File

@@ -35,7 +35,8 @@ pub fn create_leg_channels() -> LegChannels {
}
/// Spawn the inbound I/O task for a SIP leg.
/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer.
/// Reads RTP from the socket, parses the variable-length header (RFC 3550),
/// and sends the payload to the mixer.
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
pub fn spawn_sip_inbound(
rtp_socket: Arc<UdpSocket>,
@@ -51,12 +52,29 @@ pub fn spawn_sip_inbound(
}
let pt = buf[1] & 0x7F;
let marker = (buf[1] & 0x80) != 0;
let seq = u16::from_be_bytes([buf[2], buf[3]]);
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
let payload = buf[12..n].to_vec();
// RFC 3550: header length = 12 + (CC * 4) + optional extension.
let cc = (buf[0] & 0x0F) as usize;
let has_extension = (buf[0] & 0x10) != 0;
let mut offset = 12 + cc * 4;
if has_extension {
if offset + 4 > n {
continue; // Malformed: extension header truncated.
}
let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_len * 4;
}
if offset >= n {
continue; // No payload after header.
}
let payload = buf[offset..n].to_vec();
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, timestamp }).await.is_err() {
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() {
break; // Channel closed — leg removed.
}
}

View File

@@ -21,6 +21,7 @@ mod rtp;
mod sip_leg;
mod sip_transport;
mod tool_leg;
mod tts;
mod voicemail;
mod webrtc_engine;
@@ -93,6 +94,9 @@ async fn main() {
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
// TTS engine — separate lock, lazy-loads model on first use.
let tts_engine = Arc::new(Mutex::new(tts::TtsEngine::new()));
// Read commands from stdin.
let stdin = tokio::io::stdin();
let reader = BufReader::new(stdin);
@@ -113,11 +117,12 @@ async fn main() {
let engine = engine.clone();
let webrtc = webrtc.clone();
let tts_engine = tts_engine.clone();
let out_tx = out_tx.clone();
// Handle commands — some are async, so we spawn.
tokio::spawn(async move {
handle_command(engine, webrtc, &out_tx, cmd).await;
handle_command(engine, webrtc, tts_engine, &out_tx, cmd).await;
});
}
}
@@ -125,6 +130,7 @@ async fn main() {
async fn handle_command(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: &OutTx,
cmd: Command,
) {
@@ -150,6 +156,8 @@ async fn handle_command(
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
// TTS command — lock tts_engine only (no SIP/WebRTC contention).
"generate_tts" => handle_generate_tts(tts_engine, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
}
@@ -669,6 +677,10 @@ async fn handle_webrtc_link(
"leg_id": session_id,
"kind": "webrtc",
"state": "connected",
"codec": "Opus",
"rtpPort": 0,
"remoteMedia": null,
"metadata": {},
}));
respond_ok(out_tx, &cmd.id, serde_json::json!({
@@ -1117,8 +1129,11 @@ async fn handle_add_tool_leg(
"call_id": call_id,
"leg_id": tool_leg_id,
"kind": "tool",
"tool_type": tool_type_str,
"state": "connected",
"codec": null,
"rtpPort": 0,
"remoteMedia": null,
"metadata": { "tool_type": tool_type_str },
}),
);
@@ -1218,3 +1233,16 @@ async fn handle_set_leg_metadata(
leg.metadata.insert(key, value);
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
}
/// Handle `generate_tts` — synthesize text to a WAV file using Kokoro TTS.
async fn handle_generate_tts(
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let mut tts = tts_engine.lock().await;
match tts.generate(&cmd.params).await {
Ok(result) => respond_ok(out_tx, &cmd.id, result),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}

View File

@@ -3,9 +3,12 @@
//! Each Call spawns one mixer task. Legs communicate with the mixer via
//! tokio mpsc channels — no shared mutable state, no lock contention.
//!
//! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame).
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
//! 2. Compute total mix (sum of all **participant** legs' PCM as i32)
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
//! 5. For each tool leg: send per-source unmerged audio batch
@@ -13,16 +16,18 @@
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, TranscodeState};
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
use nnnoiseless::DenoiseState;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
const MIX_RATE: u32 = 16000;
/// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample.
/// All processing (denoising, mixing) happens at this rate in f32 for maximum quality.
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
@@ -30,6 +35,8 @@ pub struct RtpPacket {
pub payload_type: u8,
/// RTP marker bit (first packet of a DTMF event, etc.).
pub marker: bool,
/// RTP sequence number for reordering.
pub seq: u16,
/// RTP timestamp from the original packet header.
pub timestamp: u32,
}
@@ -47,8 +54,8 @@ enum LegRole {
}
struct IsolationState {
/// PCM frames at MIX_RATE (320 samples each) queued for playback.
prompt_frames: VecDeque<Vec<i16>>,
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
prompt_frames: VecDeque<Vec<f32>>,
/// Digits that complete the interaction (e.g., ['1', '2']).
expected_digits: Vec<char>,
/// Ticks remaining before timeout (decremented each tick after prompt ends).
@@ -88,8 +95,8 @@ pub struct ToolAudioBatch {
/// One participant's 20ms audio frame.
pub struct ToolAudioSource {
pub leg_id: String,
/// PCM at 16kHz, MIX_FRAME_SIZE (320) samples.
pub pcm_16k: Vec<i16>,
/// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples.
pub pcm_48k: Vec<f32>,
}
/// Internal storage for a tool leg inside the mixer.
@@ -122,8 +129,8 @@ pub enum MixerCommand {
/// DTMF from the leg is checked against expected_digits.
StartInteraction {
leg_id: String,
/// PCM frames at MIX_RATE (16kHz), each 320 samples.
prompt_pcm_frames: Vec<Vec<i16>>,
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
prompt_pcm_frames: Vec<Vec<f32>>,
expected_digits: Vec<char>,
timeout_ms: u32,
result_tx: oneshot::Sender<InteractionResult>,
@@ -149,10 +156,12 @@ pub enum MixerCommand {
struct MixerLegSlot {
codec_pt: u8,
transcoder: TranscodeState,
/// Per-leg inbound denoiser (48kHz, 480-sample frames).
denoiser: Box<DenoiseState<'static>>,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
last_pcm_frame: Vec<i16>,
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
last_pcm_frame: Vec<f32>,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
// RTP output state.
@@ -220,9 +229,10 @@ async fn mixer_loop(
MixerLegSlot {
codec_pt,
transcoder,
denoiser: new_denoiser(),
inbound_rx,
outbound_tx,
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
@@ -311,16 +321,18 @@ async fn mixer_loop(
continue;
}
// ── 2. Drain inbound packets, decode to 16kHz PCM. ─────────
// ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ────
// DTMF (PT 101) packets are collected separately.
// Audio packets are sorted by sequence number and decoded
// in order to maintain codec state (critical for G.722 ADPCM).
let leg_ids: Vec<String> = legs.keys().cloned().collect();
let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new();
for lid in &leg_ids {
let slot = legs.get_mut(lid).unwrap();
// Drain channel — collect DTMF packets separately, keep latest audio.
let mut latest_audio: Option<RtpPacket> = None;
// Drain channel — collect DTMF separately, collect ALL audio packets.
let mut audio_packets: Vec<RtpPacket> = Vec::new();
loop {
match slot.inbound_rx.try_recv() {
Ok(pkt) => {
@@ -328,33 +340,47 @@ async fn mixer_loop(
// DTMF telephone-event: collect for processing.
dtmf_forward.push((lid.clone(), pkt));
} else {
latest_audio = Some(pkt);
audio_packets.push(pkt);
}
}
Err(_) => break,
}
}
if let Some(pkt) = latest_audio {
if !audio_packets.is_empty() {
slot.silent_ticks = 0;
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to mixing rate if needed.
let pcm_mix = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = pcm_mix;
frame.resize(MIX_FRAME_SIZE, 0);
slot.last_pcm_frame = frame;
}
Err(_) => {
// Decode failed — use silence.
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
// Sort by sequence number for correct codec state progression.
// This prevents G.722 ADPCM state corruption from out-of-order packets.
audio_packets.sort_by_key(|p| p.seq);
// Decode ALL packets in order (maintains codec state),
// but only keep the last decoded frame for mixing.
for pkt in &audio_packets {
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to 48kHz mixing rate if needed.
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
// Per-leg inbound denoising at 48kHz.
// Skip for Opus/WebRTC legs — browsers already apply
// their own noise suppression via getUserMedia.
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = processed;
frame.resize(MIX_FRAME_SIZE, 0.0);
slot.last_pcm_frame = frame;
}
Err(_) => {}
}
}
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
@@ -364,17 +390,18 @@ async fn mixer_loop(
slot.silent_ticks += 1;
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
}
}
}
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
// Accumulate as f64 to prevent precision loss when summing f32.
let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE];
for slot in legs.values() {
if matches!(slot.role, LegRole::Participant) {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as i32;
total_mix[i] += s as f64;
}
}
}
@@ -387,27 +414,27 @@ async fn mixer_loop(
for (lid, slot) in legs.iter_mut() {
match &mut slot.role {
LegRole::Participant => {
// Mix-minus: total minus this leg's own contribution.
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32)
.clamp(-32768, 32767) as i16;
mix_minus.push(sample);
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(sample.clamp(-1.0, 1.0));
}
// Resample from 16kHz to the leg's codec native rate.
// Resample from 48kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample(&mix_minus, MIX_RATE, target_rate)
.resample_f32(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Encode to the leg's codec.
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
let encoded =
match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
@@ -456,21 +483,21 @@ async fn mixer_loop(
frame
} else {
state.prompt_done = true;
vec![0i16; MIX_FRAME_SIZE]
vec![0.0f32; MIX_FRAME_SIZE]
};
// Encode prompt frame to the leg's codec (reuses existing encode path).
// Encode prompt frame to the leg's codec.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
pcm_frame
} else {
slot.transcoder
.resample(&pcm_frame, MIX_RATE, target_rate)
.resample_f32(&pcm_frame, MIX_RATE, target_rate)
.unwrap_or_default()
};
if let Ok(encoded) =
slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt)
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
{
if !encoded.is_empty() {
let header = build_rtp_header(
@@ -523,7 +550,7 @@ async fn mixer_loop(
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
.map(|(lid, s)| ToolAudioSource {
leg_id: lid.clone(),
pcm_16k: s.last_pcm_frame.clone(),
pcm_48k: s.last_pcm_frame.clone(),
})
.collect();
@@ -533,7 +560,7 @@ async fn mixer_loop(
.iter()
.map(|s| ToolAudioSource {
leg_id: s.leg_id.clone(),
pcm_16k: s.pcm_16k.clone(),
pcm_48k: s.pcm_48k.clone(),
})
.collect(),
};

View File

@@ -2,7 +2,7 @@
//!
//! Tool legs are observer legs that receive individual audio streams from each
//! participant in a call. The mixer pipes `ToolAudioBatch` every 20ms containing
//! each participant's decoded PCM@16kHz tagged with source leg ID.
//! each participant's decoded PCM@48kHz f32 tagged with source leg ID.
//!
//! Consumers:
//! - **Recording**: writes per-source WAV files for speaker-separated recording.
@@ -37,20 +37,25 @@ pub fn spawn_recording_tool(
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
// Skip silence-only frames (all zeros = no audio activity).
let has_audio = source.pcm_16k.iter().any(|&s| s != 0);
// Skip silence-only frames (near-zero = no audio activity).
let has_audio = source.pcm_48k.iter().any(|&s| s.abs() > 1e-6);
if !has_audio && !recorders.contains_key(&source.leg_id) {
continue; // Don't create a file for silence-only sources.
}
let recorder = recorders.entry(source.leg_id.clone()).or_insert_with(|| {
let path = format!("{}/{}-{}.wav", base_dir, call_id, source.leg_id);
Recorder::new_pcm(&path, 16000, None).unwrap_or_else(|e| {
Recorder::new_pcm(&path, 48000, None).unwrap_or_else(|e| {
panic!("failed to create recorder for {}: {e}", source.leg_id);
})
});
if !recorder.write_pcm(&source.pcm_16k) {
// Convert f32 [-1.0, 1.0] to i16 for WAV writing.
let pcm_i16: Vec<i16> = source.pcm_48k
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();
if !recorder.write_pcm(&pcm_i16) {
// Max duration reached — stop recording this source.
break;
}
@@ -88,7 +93,7 @@ pub fn spawn_recording_tool(
/// Spawn a transcription tool leg.
///
/// The plumbing is fully real: it receives per-source unmerged PCM@16kHz from
/// The plumbing is fully real: it receives per-source unmerged PCM@48kHz f32 from
/// the mixer every 20ms. The consumer is a stub that accumulates audio and
/// reports metadata on close. Future: will stream to a Whisper HTTP endpoint.
pub fn spawn_transcription_tool(
@@ -105,7 +110,7 @@ pub fn spawn_transcription_tool(
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
*source_samples.entry(source.leg_id.clone()).or_insert(0) +=
source.pcm_16k.len() as u64;
source.pcm_48k.len() as u64;
// TODO: Future — accumulate chunks and stream to Whisper endpoint.
// For now, the audio is received and counted but not processed.
@@ -118,7 +123,7 @@ pub fn spawn_transcription_tool(
.map(|(leg_id, samples)| {
serde_json::json!({
"source_leg_id": leg_id,
"duration_ms": (samples * 1000) / 16000,
"duration_ms": (samples * 1000) / 48000,
})
})
.collect();

View File

@@ -0,0 +1,138 @@
//! Text-to-speech engine — synthesizes text to WAV files using Kokoro neural TTS.
//!
//! The model is loaded lazily on first use. If the model/voices files are not
//! present, the generate command returns an error and the TS side falls back
//! to espeak-ng.
use kokoro_tts::{KokoroTts, Voice};
use std::path::Path;
/// Wraps the Kokoro TTS engine with lazy model loading.
pub struct TtsEngine {
tts: Option<KokoroTts>,
/// Path that was used to load the current model (for cache invalidation).
loaded_model_path: String,
loaded_voices_path: String,
}
impl TtsEngine {
pub fn new() -> Self {
Self {
tts: None,
loaded_model_path: String::new(),
loaded_voices_path: String::new(),
}
}
/// Generate a WAV file from text.
///
/// Params (from IPC JSON):
/// - `model`: path to the ONNX model file
/// - `voices`: path to the voices.bin file
/// - `voice`: voice name (e.g. "af_bella")
/// - `text`: text to synthesize
/// - `output`: output WAV file path
pub async fn generate(&mut self, params: &serde_json::Value) -> Result<serde_json::Value, String> {
let model_path = params.get("model").and_then(|v| v.as_str())
.ok_or("missing 'model' param")?;
let voices_path = params.get("voices").and_then(|v| v.as_str())
.ok_or("missing 'voices' param")?;
let voice_name = params.get("voice").and_then(|v| v.as_str())
.unwrap_or("af_bella");
let text = params.get("text").and_then(|v| v.as_str())
.ok_or("missing 'text' param")?;
let output_path = params.get("output").and_then(|v| v.as_str())
.ok_or("missing 'output' param")?;
if text.is_empty() {
return Err("empty text".into());
}
// Check that model/voices files exist.
if !Path::new(model_path).exists() {
return Err(format!("model not found: {model_path}"));
}
if !Path::new(voices_path).exists() {
return Err(format!("voices not found: {voices_path}"));
}
// Lazy-load or reload if paths changed.
if self.tts.is_none()
|| self.loaded_model_path != model_path
|| self.loaded_voices_path != voices_path
{
eprintln!("[tts] loading model: {model_path}");
let tts = KokoroTts::new(model_path, voices_path)
.await
.map_err(|e| format!("model load failed: {e:?}"))?;
self.tts = Some(tts);
self.loaded_model_path = model_path.to_string();
self.loaded_voices_path = voices_path.to_string();
}
let tts = self.tts.as_ref().unwrap();
let voice = select_voice(voice_name);
eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\"");
let (samples, duration) = tts.synth(text, voice)
.await
.map_err(|e| format!("synthesis failed: {e:?}"))?;
eprintln!("[tts] synthesized {} samples in {duration:?}", samples.len());
// Write 24kHz 16-bit mono WAV.
let spec = hound::WavSpec {
channels: 1,
sample_rate: 24000,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = hound::WavWriter::create(output_path, spec)
.map_err(|e| format!("WAV create failed: {e}"))?;
for &sample in &samples {
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
writer.write_sample(s16).map_err(|e| format!("WAV write: {e}"))?;
}
writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?;
eprintln!("[tts] wrote {output_path}");
Ok(serde_json::json!({ "output": output_path }))
}
}
/// Map voice name string to Kokoro Voice enum variant.
fn select_voice(name: &str) -> Voice {
match name {
"af_bella" => Voice::AfBella(1.0),
"af_heart" => Voice::AfHeart(1.0),
"af_jessica" => Voice::AfJessica(1.0),
"af_nicole" => Voice::AfNicole(1.0),
"af_nova" => Voice::AfNova(1.0),
"af_sarah" => Voice::AfSarah(1.0),
"af_sky" => Voice::AfSky(1.0),
"af_river" => Voice::AfRiver(1.0),
"af_alloy" => Voice::AfAlloy(1.0),
"af_aoede" => Voice::AfAoede(1.0),
"af_kore" => Voice::AfKore(1.0),
"am_adam" => Voice::AmAdam(1.0),
"am_echo" => Voice::AmEcho(1.0),
"am_eric" => Voice::AmEric(1.0),
"am_fenrir" => Voice::AmFenrir(1.0),
"am_liam" => Voice::AmLiam(1.0),
"am_michael" => Voice::AmMichael(1.0),
"am_onyx" => Voice::AmOnyx(1.0),
"am_puck" => Voice::AmPuck(1.0),
"bf_alice" => Voice::BfAlice(1.0),
"bf_emma" => Voice::BfEmma(1.0),
"bf_isabella" => Voice::BfIsabella(1.0),
"bf_lily" => Voice::BfLily(1.0),
"bm_daniel" => Voice::BmDaniel(1.0),
"bm_fable" => Voice::BmFable(1.0),
"bm_george" => Voice::BmGeorge(1.0),
"bm_lewis" => Voice::BmLewis(1.0),
_ => {
eprintln!("[tts] unknown voice '{name}', falling back to af_bella");
Voice::AfBella(1.0)
}
}
}

View File

@@ -290,8 +290,9 @@ async fn browser_to_mixer_loop(
.send(RtpPacket {
payload: payload.to_vec(),
payload_type: PT_OPUS,
marker: false,
timestamp: 0,
marker: rtp_packet.header.marker,
seq: rtp_packet.header.sequence_number,
timestamp: rtp_packet.header.timestamp,
})
.await;
}

View File

@@ -197,10 +197,11 @@ pub fn compute_digest_auth(
use crate::Endpoint;
/// Parse the audio media port and connection address from an SDP body.
/// Parse the audio media port, connection address, and preferred codec from an SDP body.
pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
let mut addr: Option<&str> = None;
let mut port: Option<u16> = None;
let mut codec_pt: Option<u8> = None;
let normalized = sdp.replace("\r\n", "\n");
for raw in normalized.split('\n') {
@@ -208,10 +209,16 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
if let Some(rest) = line.strip_prefix("c=IN IP4 ") {
addr = Some(rest.trim());
} else if let Some(rest) = line.strip_prefix("m=audio ") {
// m=audio <port> RTP/AVP <pt1> [<pt2> ...]
let parts: Vec<&str> = rest.split_whitespace().collect();
if !parts.is_empty() {
port = parts[0].parse().ok();
}
// parts[1] is "RTP/AVP" or similar, parts[2..] are payload types.
// The first PT is the preferred codec.
if parts.len() > 2 {
codec_pt = parts[2].parse::<u8>().ok();
}
}
}
@@ -219,6 +226,7 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
(Some(a), Some(p)) => Some(Endpoint {
address: a.to_string(),
port: p,
codec_pt,
}),
_ => None,
}

View File

@@ -9,9 +9,11 @@ pub mod dialog;
pub mod helpers;
pub mod rewrite;
/// Network endpoint (address + port).
/// Network endpoint (address + port + optional negotiated codec).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Endpoint {
pub address: String,
pub port: u16,
/// First payload type from the SDP `m=audio` line (the preferred codec).
pub codec_pt: Option<u8>,
}

View File

@@ -92,7 +92,7 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option<Endpoint>
.collect();
let original = match (orig_addr, orig_port) {
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p }),
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }),
_ => None,
};

View File

@@ -1,18 +0,0 @@
[package]
name = "tts-engine"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "tts-engine"
path = "src/main.rs"
[dependencies]
kokoro-tts = { version = "0.3", default-features = false }
# Pin to rc.11 matching kokoro-tts's expectation; enable vendored TLS to avoid system libssl-dev.
ort = { version = "=2.0.0-rc.11", default-features = false, features = [
"std", "download-binaries", "copy-dylibs", "ndarray",
"tls-native-vendored"
] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
hound = "3.5"

View File

@@ -1,149 +0,0 @@
/// TTS engine CLI — synthesizes text to a WAV file using Kokoro neural TTS.
///
/// Usage:
/// echo "Hello world" | tts-engine --model kokoro-v1.0.onnx --voices voices.bin --output out.wav
/// tts-engine --model kokoro-v1.0.onnx --voices voices.bin --output out.wav --text "Hello world"
///
/// Outputs 24kHz 16-bit mono WAV.
use kokoro_tts::{KokoroTts, Voice};
use std::io::{self, Read};
fn parse_args() -> Result<(String, String, String, String, Option<String>), String> {
let args: Vec<String> = std::env::args().collect();
let mut model = String::new();
let mut voices = String::new();
let mut output = String::new();
let mut text: Option<String> = None;
let mut voice_name: Option<String> = None;
let mut i = 1;
while i < args.len() {
match args[i].as_str() {
"--model" => { i += 1; model = args.get(i).cloned().unwrap_or_default(); }
"--voices" => { i += 1; voices = args.get(i).cloned().unwrap_or_default(); }
"--output" | "--output_file" => { i += 1; output = args.get(i).cloned().unwrap_or_default(); }
"--text" => { i += 1; text = args.get(i).cloned(); }
"--voice" => { i += 1; voice_name = args.get(i).cloned(); }
_ => {}
}
i += 1;
}
if model.is_empty() { return Err("--model required".into()); }
if voices.is_empty() { return Err("--voices required".into()); }
if output.is_empty() { return Err("--output required".into()); }
let voice_str = voice_name.unwrap_or_else(|| "af_bella".into());
Ok((model, voices, output, voice_str, text))
}
fn select_voice(name: &str) -> Voice {
match name {
"af_bella" => Voice::AfBella(1.0),
"af_heart" => Voice::AfHeart(1.0),
"af_jessica" => Voice::AfJessica(1.0),
"af_nicole" => Voice::AfNicole(1.0),
"af_nova" => Voice::AfNova(1.0),
"af_sarah" => Voice::AfSarah(1.0),
"af_sky" => Voice::AfSky(1.0),
"af_river" => Voice::AfRiver(1.0),
"af_alloy" => Voice::AfAlloy(1.0),
"af_aoede" => Voice::AfAoede(1.0),
"af_kore" => Voice::AfKore(1.0),
"am_adam" => Voice::AmAdam(1.0),
"am_echo" => Voice::AmEcho(1.0),
"am_eric" => Voice::AmEric(1.0),
"am_fenrir" => Voice::AmFenrir(1.0),
"am_liam" => Voice::AmLiam(1.0),
"am_michael" => Voice::AmMichael(1.0),
"am_onyx" => Voice::AmOnyx(1.0),
"am_puck" => Voice::AmPuck(1.0),
"bf_alice" => Voice::BfAlice(1.0),
"bf_emma" => Voice::BfEmma(1.0),
"bf_isabella" => Voice::BfIsabella(1.0),
"bf_lily" => Voice::BfLily(1.0),
"bm_daniel" => Voice::BmDaniel(1.0),
"bm_fable" => Voice::BmFable(1.0),
"bm_george" => Voice::BmGeorge(1.0),
"bm_lewis" => Voice::BmLewis(1.0),
_ => {
eprintln!("[tts-engine] unknown voice '{}', falling back to af_bella", name);
Voice::AfBella(1.0)
}
}
}
#[tokio::main]
async fn main() {
let (model_path, voices_path, output_path, voice_name, text_arg) = match parse_args() {
Ok(v) => v,
Err(e) => {
eprintln!("Error: {}", e);
eprintln!("Usage: tts-engine --model <model.onnx> --voices <voices.bin> --output <output.wav> [--text <text>] [--voice <voice_name>]");
std::process::exit(1);
}
};
// Get text from --text arg or stdin.
let text = match text_arg {
Some(t) => t,
None => {
let mut buf = String::new();
io::stdin().read_to_string(&mut buf).expect("failed to read stdin");
buf.trim().to_string()
}
};
if text.is_empty() {
eprintln!("[tts-engine] no text provided");
std::process::exit(1);
}
eprintln!("[tts-engine] loading model: {}", model_path);
let tts = match KokoroTts::new(&model_path, &voices_path).await {
Ok(t) => t,
Err(e) => {
eprintln!("[tts-engine] failed to load model: {:?}", e);
std::process::exit(1);
}
};
let voice = select_voice(&voice_name);
eprintln!("[tts-engine] synthesizing with voice '{}': \"{}\"", voice_name, text);
let (samples, duration) = match tts.synth(&text, voice).await {
Ok(r) => r,
Err(e) => {
eprintln!("[tts-engine] synthesis failed: {:?}", e);
std::process::exit(1);
}
};
eprintln!("[tts-engine] synthesized {} samples in {:?}", samples.len(), duration);
// Write WAV: 24kHz, 16-bit, mono (same format announcement.ts expects).
let spec = hound::WavSpec {
channels: 1,
sample_rate: 24000,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = match hound::WavWriter::create(&output_path, spec) {
Ok(w) => w,
Err(e) => {
eprintln!("[tts-engine] failed to create WAV: {}", e);
std::process::exit(1);
}
};
for &sample in &samples {
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
writer.write_sample(s16).unwrap();
}
writer.finalize().unwrap();
eprintln!("[tts-engine] wrote {}", output_path);
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.15.0',
version: '1.17.1',
description: 'undefined'
}

View File

@@ -1,59 +1,22 @@
/**
* TTS announcement module — pre-generates audio announcements using espeak-ng
* and caches them as encoded RTP packets for playback during call setup.
* TTS announcement module — generates announcement WAV files at startup.
*
* On startup, generates the announcement WAV via espeak-ng (formant-based TTS
* with highly accurate pronunciation), encodes each 20ms frame to G.722 (for
* SIP) and Opus (for WebRTC) via the Rust transcoder, and caches the packets.
* Engine priority: espeak-ng (formant TTS, fast) → Kokoro neural TTS via
* proxy-engine → disabled.
*
* Falls back to the Rust tts-engine (Kokoro neural TTS) if espeak-ng is not
* installed, and disables announcements if neither is available.
* The generated WAV is left on disk for Rust's audio_player / start_interaction
* to play during calls. No encoding or RTP playback happens in TypeScript.
*/
import { execSync } from 'node:child_process';
import fs from 'node:fs';
import path from 'node:path';
import { Buffer } from 'node:buffer';
import { encodePcm, isCodecReady } from './opusbridge.ts';
/** RTP clock increment per 20ms frame for each codec. */
function rtpClockIncrement(pt: number): number {
if (pt === 111) return 960;
if (pt === 9) return 160;
return 160;
}
/** Build a fresh RTP header. */
function buildRtpHeader(pt: number, seq: number, ts: number, ssrc: number, marker: boolean): Buffer {
const hdr = Buffer.alloc(12);
hdr[0] = 0x80;
hdr[1] = (marker ? 0x80 : 0) | (pt & 0x7f);
hdr.writeUInt16BE(seq & 0xffff, 2);
hdr.writeUInt32BE(ts >>> 0, 4);
hdr.writeUInt32BE(ssrc >>> 0, 8);
return hdr;
}
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** A pre-encoded announcement ready for RTP playback. */
export interface IAnnouncementCache {
/** G.722 encoded frames (each is a 20ms frame payload, no RTP header). */
g722Frames: Buffer[];
/** Opus encoded frames for WebRTC playback. */
opusFrames: Buffer[];
/** Total duration in milliseconds. */
durationMs: number;
}
import { sendProxyCommand, isProxyReady } from './proxybridge.ts';
// ---------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------
let cachedAnnouncement: IAnnouncementCache | null = null;
const TTS_DIR = path.join(process.cwd(), '.nogit', 'tts');
const ANNOUNCEMENT_TEXT = "Hello. I'm connecting your call now.";
const CACHE_WAV = path.join(TTS_DIR, 'announcement.wav');
@@ -64,12 +27,10 @@ const KOKORO_VOICES = 'voices.bin';
const KOKORO_VOICE = 'af_bella';
// ---------------------------------------------------------------------------
// Initialization
// TTS generators
// ---------------------------------------------------------------------------
/**
* Check if espeak-ng is available on the system.
*/
/** Check if espeak-ng is available on the system. */
function isEspeakAvailable(): boolean {
try {
execSync('which espeak-ng', { stdio: 'pipe' });
@@ -79,10 +40,7 @@ function isEspeakAvailable(): boolean {
}
}
/**
* Generate announcement WAV via espeak-ng (primary engine).
* Returns true on success.
*/
/** Generate announcement WAV via espeak-ng (primary engine). */
function generateViaEspeak(wavPath: string, text: string, log: (msg: string) => void): boolean {
log('[tts] generating announcement audio via espeak-ng...');
try {
@@ -98,11 +56,8 @@ function generateViaEspeak(wavPath: string, text: string, log: (msg: string) =>
}
}
/**
* Generate announcement WAV via Kokoro TTS (fallback engine).
* Returns true on success.
*/
function generateViaKokoro(wavPath: string, text: string, log: (msg: string) => void): boolean {
/** Generate announcement WAV via Kokoro TTS (fallback, runs inside proxy-engine). */
async function generateViaKokoro(wavPath: string, text: string, log: (msg: string) => void): Promise<boolean> {
const modelPath = path.join(TTS_DIR, KOKORO_MODEL);
const voicesPath = path.join(TTS_DIR, KOKORO_VOICES);
@@ -111,25 +66,21 @@ function generateViaKokoro(wavPath: string, text: string, log: (msg: string) =>
return false;
}
const root = process.cwd();
const ttsBinPaths = [
path.join(root, 'dist_rust', 'tts-engine'),
path.join(root, 'rust', 'target', 'release', 'tts-engine'),
path.join(root, 'rust', 'target', 'debug', 'tts-engine'),
];
const ttsBin = ttsBinPaths.find((p) => fs.existsSync(p));
if (!ttsBin) {
log('[tts] tts-engine binary not found — Kokoro fallback unavailable');
if (!isProxyReady()) {
log('[tts] proxy-engine not ready — Kokoro fallback unavailable');
return false;
}
log('[tts] generating announcement audio via Kokoro TTS (fallback)...');
try {
execSync(
`"${ttsBin}" --model "${modelPath}" --voices "${voicesPath}" --voice "${KOKORO_VOICE}" --output "${wavPath}" --text "${text}"`,
{ timeout: 120000, stdio: 'pipe' },
);
log('[tts] Kokoro WAV generated');
await sendProxyCommand('generate_tts', {
model: modelPath,
voices: voicesPath,
voice: KOKORO_VOICE,
text,
output: wavPath,
});
log('[tts] Kokoro WAV generated (via proxy-engine)');
return true;
} catch (e: any) {
log(`[tts] Kokoro failed: ${e.message}`);
@@ -137,40 +88,13 @@ function generateViaKokoro(wavPath: string, text: string, log: (msg: string) =>
}
}
/**
* Read a WAV file and detect its sample rate from the fmt chunk.
* Returns { pcm, sampleRate } or null on failure.
*/
function readWavWithRate(wavPath: string): { pcm: Buffer; sampleRate: number } | null {
const wav = fs.readFileSync(wavPath);
if (wav.length < 44) return null;
if (wav.toString('ascii', 0, 4) !== 'RIFF') return null;
if (wav.toString('ascii', 8, 12) !== 'WAVE') return null;
let sampleRate = 22050; // default
let offset = 12;
let pcm: Buffer | null = null;
while (offset < wav.length - 8) {
const chunkId = wav.toString('ascii', offset, offset + 4);
const chunkSize = wav.readUInt32LE(offset + 4);
if (chunkId === 'fmt ') {
sampleRate = wav.readUInt32LE(offset + 12);
}
if (chunkId === 'data') {
pcm = wav.subarray(offset + 8, offset + 8 + chunkSize);
}
offset += 8 + chunkSize;
if (offset % 2 !== 0) offset++;
}
if (!pcm) return null;
return { pcm, sampleRate };
}
// ---------------------------------------------------------------------------
// Initialization
// ---------------------------------------------------------------------------
/**
* Pre-generate the announcement audio and encode to G.722 + Opus frames.
* Must be called after the codec bridge is initialized.
* Pre-generate the announcement WAV file.
* Must be called after the proxy engine is initialized.
*
* Engine priority: espeak-ng → Kokoro → disabled.
*/
@@ -178,7 +102,6 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
fs.mkdirSync(TTS_DIR, { recursive: true });
try {
// Generate WAV if not cached.
if (!fs.existsSync(CACHE_WAV)) {
let generated = false;
@@ -189,9 +112,9 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
log('[tts] espeak-ng not installed — trying Kokoro fallback');
}
// Fall back to Kokoro.
// Fall back to Kokoro (via proxy-engine).
if (!generated) {
generated = generateViaKokoro(CACHE_WAV, ANNOUNCEMENT_TEXT, log);
generated = await generateViaKokoro(CACHE_WAV, ANNOUNCEMENT_TEXT, log);
}
if (!generated) {
@@ -200,49 +123,7 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
}
}
// Read WAV and extract raw PCM + sample rate.
const result = readWavWithRate(CACHE_WAV);
if (!result) {
log('[tts] failed to parse WAV file');
return false;
}
const { pcm, sampleRate } = result;
// Wait for codec bridge to be ready.
if (!isCodecReady()) {
log('[tts] codec bridge not ready — will retry');
return false;
}
// Encode in 20ms chunks. The Rust encoder resamples to each codec's native rate.
const FRAME_SAMPLES = Math.floor(sampleRate * 0.02);
const FRAME_BYTES = FRAME_SAMPLES * 2; // 16-bit = 2 bytes per sample
const totalFrames = Math.floor(pcm.length / FRAME_BYTES);
const g722Frames: Buffer[] = [];
const opusFrames: Buffer[] = [];
log(`[tts] encoding ${totalFrames} frames (${FRAME_SAMPLES} samples/frame @ ${sampleRate}Hz)...`);
for (let i = 0; i < totalFrames; i++) {
const framePcm = pcm.subarray(i * FRAME_BYTES, (i + 1) * FRAME_BYTES);
const pcmBuf = Buffer.from(framePcm);
const [g722, opus] = await Promise.all([
encodePcm(pcmBuf, sampleRate, 9), // G.722 for SIP devices
encodePcm(pcmBuf, sampleRate, 111), // Opus for WebRTC browsers
]);
if (g722) g722Frames.push(g722);
if (opus) opusFrames.push(opus);
if (!g722 && !opus && i < 3) log(`[tts] frame ${i} encode failed`);
}
cachedAnnouncement = {
g722Frames,
opusFrames,
durationMs: totalFrames * 20,
};
log(`[tts] announcement cached: ${g722Frames.length} frames (${(totalFrames * 20 / 1000).toFixed(1)}s)`);
log('[tts] announcement WAV ready');
return true;
} catch (e: any) {
log(`[tts] init error: ${e.message}`);
@@ -250,100 +131,7 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
}
}
// ---------------------------------------------------------------------------
// Playback
// ---------------------------------------------------------------------------
/**
* Play the pre-cached announcement to an RTP endpoint.
*
* @param sendPacket - function to send a raw RTP packet
* @param ssrc - SSRC to use in RTP headers
* @param onDone - called when the announcement finishes
* @returns a cancel function, or null if no announcement is cached
*/
export function playAnnouncement(
sendPacket: (pkt: Buffer) => void,
ssrc: number,
onDone?: () => void,
): (() => void) | null {
if (!cachedAnnouncement || cachedAnnouncement.g722Frames.length === 0) {
onDone?.();
return null;
}
const frames = cachedAnnouncement.g722Frames;
const PT = 9; // G.722
let frameIdx = 0;
let seq = Math.floor(Math.random() * 0xffff);
let rtpTs = Math.floor(Math.random() * 0xffffffff);
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, seq & 0xffff, rtpTs >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendPacket(pkt);
seq++;
rtpTs += rtpClockIncrement(PT);
frameIdx++;
}, 20);
// Return cancel function.
return () => clearInterval(timer);
/** Get the path to the cached announcement WAV, or null if not generated. */
export function getAnnouncementWavPath(): string | null {
return fs.existsSync(CACHE_WAV) ? CACHE_WAV : null;
}
/**
* Play pre-cached Opus announcement to a WebRTC PeerConnection sender.
*
* @param sendRtpPacket - function to send a raw RTP packet via sender.sendRtp()
* @param ssrc - SSRC to use in RTP headers
* @param onDone - called when announcement finishes
* @returns cancel function, or null if no announcement cached
*/
export function playAnnouncementToWebRtc(
sendRtpPacket: (pkt: Buffer) => void,
ssrc: number,
counters: { seq: number; ts: number },
onDone?: () => void,
): (() => void) | null {
if (!cachedAnnouncement || cachedAnnouncement.opusFrames.length === 0) {
onDone?.();
return null;
}
const frames = cachedAnnouncement.opusFrames;
const PT = 111; // Opus
let frameIdx = 0;
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, counters.seq & 0xffff, counters.ts >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendRtpPacket(pkt);
counters.seq++;
counters.ts += 960; // Opus at 48kHz: 960 samples per 20ms
frameIdx++;
}, 20);
return () => clearInterval(timer);
}
/** Check if an announcement is cached and ready. */
export function isAnnouncementReady(): boolean {
return cachedAnnouncement !== null && cachedAnnouncement.g722Frames.length > 0;
}

View File

@@ -1,55 +1,31 @@
/**
* PromptCache — manages multiple named audio prompts for IVR and voicemail.
* PromptCache — manages named audio prompt WAV files for IVR and voicemail.
*
* Each prompt is pre-encoded as both G.722 frames (for SIP legs) and Opus
* frames (for WebRTC legs), ready for 20ms RTP playback.
* Generates WAV files via espeak-ng (primary) or Kokoro TTS through the
* proxy-engine (fallback). Also supports loading pre-existing WAV files
* and programmatic tone generation.
*
* Supports three sources:
* 1. TTS generation via espeak-ng (primary) or Kokoro (fallback)
* 2. Loading from a pre-existing WAV file
* 3. Programmatic tone generation (beep, etc.)
*
* The existing announcement.ts system continues to work independently;
* this module provides generalized prompt management for IVR/voicemail.
* All audio playback happens in Rust (audio_player / start_interaction).
* This module only manages WAV files on disk.
*/
import { execSync } from 'node:child_process';
import fs from 'node:fs';
import path from 'node:path';
import { Buffer } from 'node:buffer';
import { encodePcm, isCodecReady } from '../opusbridge.ts';
/** RTP clock increment per 20ms frame for each codec. */
function rtpClockIncrement(pt: number): number {
if (pt === 111) return 960;
if (pt === 9) return 160;
return 160;
}
/** Build a fresh RTP header. */
function buildRtpHeader(pt: number, seq: number, ts: number, ssrc: number, marker: boolean): Buffer {
const hdr = Buffer.alloc(12);
hdr[0] = 0x80;
hdr[1] = (marker ? 0x80 : 0) | (pt & 0x7f);
hdr.writeUInt16BE(seq & 0xffff, 2);
hdr.writeUInt32BE(ts >>> 0, 4);
hdr.writeUInt32BE(ssrc >>> 0, 8);
return hdr;
}
import { sendProxyCommand, isProxyReady } from '../proxybridge.ts';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** A pre-encoded prompt ready for RTP playback. */
/** A cached prompt — just a WAV file path and metadata. */
export interface ICachedPrompt {
/** Unique prompt identifier. */
id: string;
/** G.722 encoded frames (20ms each, no RTP header). */
g722Frames: Buffer[];
/** Opus encoded frames (20ms each, no RTP header). */
opusFrames: Buffer[];
/** Total duration in milliseconds. */
/** Path to the WAV file on disk. */
wavPath: string;
/** Total duration in milliseconds (approximate, from WAV header). */
durationMs: number;
}
@@ -82,84 +58,61 @@ function generateViaEspeak(wavPath: string, text: string): boolean {
}
}
/** Generate WAV via Kokoro TTS. */
function generateViaKokoro(wavPath: string, text: string, voice: string): boolean {
/** Generate WAV via Kokoro TTS (runs inside proxy-engine). */
async function generateViaKokoro(wavPath: string, text: string, voice: string): Promise<boolean> {
const modelPath = path.join(TTS_DIR, 'kokoro-v1.0.onnx');
const voicesPath = path.join(TTS_DIR, 'voices.bin');
if (!fs.existsSync(modelPath) || !fs.existsSync(voicesPath)) return false;
const root = process.cwd();
const ttsBin = [
path.join(root, 'dist_rust', 'tts-engine'),
path.join(root, 'rust', 'target', 'release', 'tts-engine'),
path.join(root, 'rust', 'target', 'debug', 'tts-engine'),
].find((p) => fs.existsSync(p));
if (!ttsBin) return false;
if (!isProxyReady()) return false;
try {
execSync(
`"${ttsBin}" --model "${modelPath}" --voices "${voicesPath}" --voice "${voice}" --output "${wavPath}" --text "${text}"`,
{ timeout: 120000, stdio: 'pipe' },
);
await sendProxyCommand('generate_tts', {
model: modelPath,
voices: voicesPath,
voice,
text,
output: wavPath,
});
return true;
} catch {
return false;
}
}
/** Read a WAV file and return raw PCM + sample rate. */
function readWavWithRate(wavPath: string): { pcm: Buffer; sampleRate: number } | null {
const wav = fs.readFileSync(wavPath);
if (wav.length < 44) return null;
if (wav.toString('ascii', 0, 4) !== 'RIFF') return null;
if (wav.toString('ascii', 8, 12) !== 'WAVE') return null;
/** Read a WAV file's duration from its header. */
function getWavDurationMs(wavPath: string): number {
try {
const wav = fs.readFileSync(wavPath);
if (wav.length < 44) return 0;
if (wav.toString('ascii', 0, 4) !== 'RIFF') return 0;
let sampleRate = 22050;
let pcm: Buffer | null = null;
let offset = 12;
let sampleRate = 16000;
let dataSize = 0;
let bitsPerSample = 16;
let channels = 1;
let offset = 12;
while (offset < wav.length - 8) {
const chunkId = wav.toString('ascii', offset, offset + 4);
const chunkSize = wav.readUInt32LE(offset + 4);
if (chunkId === 'fmt ') {
sampleRate = wav.readUInt32LE(offset + 12);
while (offset < wav.length - 8) {
const chunkId = wav.toString('ascii', offset, offset + 4);
const chunkSize = wav.readUInt32LE(offset + 4);
if (chunkId === 'fmt ') {
channels = wav.readUInt16LE(offset + 10);
sampleRate = wav.readUInt32LE(offset + 12);
bitsPerSample = wav.readUInt16LE(offset + 22);
}
if (chunkId === 'data') {
dataSize = chunkSize;
}
offset += 8 + chunkSize;
if (offset % 2 !== 0) offset++;
}
if (chunkId === 'data') {
pcm = wav.subarray(offset + 8, offset + 8 + chunkSize);
}
offset += 8 + chunkSize;
if (offset % 2 !== 0) offset++;
const bytesPerSample = (bitsPerSample / 8) * channels;
const totalSamples = bytesPerSample > 0 ? dataSize / bytesPerSample : 0;
return sampleRate > 0 ? Math.round((totalSamples / sampleRate) * 1000) : 0;
} catch {
return 0;
}
return pcm ? { pcm, sampleRate } : null;
}
/** Encode raw PCM frames to G.722 + Opus. */
async function encodePcmFrames(
pcm: Buffer,
sampleRate: number,
log: (msg: string) => void,
): Promise<{ g722Frames: Buffer[]; opusFrames: Buffer[] } | null> {
if (!isCodecReady()) return null;
const frameSamples = Math.floor(sampleRate * 0.02); // 20ms
const frameBytes = frameSamples * 2; // 16-bit
const totalFrames = Math.floor(pcm.length / frameBytes);
const g722Frames: Buffer[] = [];
const opusFrames: Buffer[] = [];
for (let i = 0; i < totalFrames; i++) {
const framePcm = Buffer.from(pcm.subarray(i * frameBytes, (i + 1) * frameBytes));
const [g722, opus] = await Promise.all([
encodePcm(framePcm, sampleRate, 9), // G.722
encodePcm(framePcm, sampleRate, 111), // Opus
]);
if (g722) g722Frames.push(g722);
if (opus) opusFrames.push(opus);
}
return { g722Frames, opusFrames };
}
// ---------------------------------------------------------------------------
@@ -195,7 +148,7 @@ export class PromptCache {
}
/**
* Generate a TTS prompt and cache it.
* Generate a TTS prompt WAV and cache its path.
* Uses espeak-ng (primary) or Kokoro (fallback).
*/
async generatePrompt(id: string, text: string, voice = 'af_bella'): Promise<ICachedPrompt | null> {
@@ -207,14 +160,14 @@ export class PromptCache {
this.espeakAvailable = isEspeakAvailable();
}
// Generate WAV.
let generated = false;
// Generate WAV if not already on disk.
if (!fs.existsSync(wavPath)) {
let generated = false;
if (this.espeakAvailable) {
generated = generateViaEspeak(wavPath, text);
}
if (!generated) {
generated = generateViaKokoro(wavPath, text, voice);
generated = await generateViaKokoro(wavPath, text, voice);
}
if (!generated) {
this.log(`[prompt-cache] failed to generate TTS for "${id}"`);
@@ -223,49 +176,22 @@ export class PromptCache {
this.log(`[prompt-cache] generated WAV for "${id}"`);
}
return this.loadWavPrompt(id, wavPath);
return this.registerWav(id, wavPath);
}
/**
* Load a WAV file as a prompt and cache it.
* Load a pre-existing WAV file as a prompt.
*/
async loadWavPrompt(id: string, wavPath: string): Promise<ICachedPrompt | null> {
if (!fs.existsSync(wavPath)) {
this.log(`[prompt-cache] WAV not found: ${wavPath}`);
return null;
}
const result = readWavWithRate(wavPath);
if (!result) {
this.log(`[prompt-cache] failed to parse WAV: ${wavPath}`);
return null;
}
const encoded = await encodePcmFrames(result.pcm, result.sampleRate, this.log);
if (!encoded) {
this.log(`[prompt-cache] encoding failed for "${id}" (codec bridge not ready?)`);
return null;
}
const durationMs = encoded.g722Frames.length * 20;
const prompt: ICachedPrompt = {
id,
g722Frames: encoded.g722Frames,
opusFrames: encoded.opusFrames,
durationMs,
};
this.prompts.set(id, prompt);
this.log(`[prompt-cache] cached "${id}": ${encoded.g722Frames.length} frames (${(durationMs / 1000).toFixed(1)}s)`);
return prompt;
return this.registerWav(id, wavPath);
}
/**
* Generate a beep tone prompt (sine wave).
* @param id - prompt ID
* @param freqHz - tone frequency (default 1000 Hz)
* @param durationMs - tone duration (default 500ms)
* @param amplitude - 16-bit amplitude (default 8000)
* Generate a beep tone WAV and cache it.
*/
async generateBeep(
id: string,
@@ -273,149 +199,77 @@ export class PromptCache {
durationMs = 500,
amplitude = 8000,
): Promise<ICachedPrompt | null> {
// Generate at 16kHz for decent quality.
const sampleRate = 16000;
const totalSamples = Math.floor((sampleRate * durationMs) / 1000);
const pcm = Buffer.alloc(totalSamples * 2);
fs.mkdirSync(TTS_DIR, { recursive: true });
const wavPath = path.join(TTS_DIR, `prompt-${id}.wav`);
for (let i = 0; i < totalSamples; i++) {
const t = i / sampleRate;
// Apply a short fade-in/fade-out to avoid click artifacts.
const fadeLen = Math.floor(sampleRate * 0.01); // 10ms fade
let envelope = 1.0;
if (i < fadeLen) envelope = i / fadeLen;
else if (i > totalSamples - fadeLen) envelope = (totalSamples - i) / fadeLen;
if (!fs.existsSync(wavPath)) {
// Generate 16kHz 16-bit mono sine wave WAV.
const sampleRate = 16000;
const totalSamples = Math.floor((sampleRate * durationMs) / 1000);
const pcm = Buffer.alloc(totalSamples * 2);
const sample = Math.round(Math.sin(2 * Math.PI * freqHz * t) * amplitude * envelope);
pcm.writeInt16LE(Math.max(-32768, Math.min(32767, sample)), i * 2);
for (let i = 0; i < totalSamples; i++) {
const t = i / sampleRate;
const fadeLen = Math.floor(sampleRate * 0.01); // 10ms fade
let envelope = 1.0;
if (i < fadeLen) envelope = i / fadeLen;
else if (i > totalSamples - fadeLen) envelope = (totalSamples - i) / fadeLen;
const sample = Math.round(Math.sin(2 * Math.PI * freqHz * t) * amplitude * envelope);
pcm.writeInt16LE(Math.max(-32768, Math.min(32767, sample)), i * 2);
}
// Write WAV file.
const headerSize = 44;
const dataSize = pcm.length;
const wav = Buffer.alloc(headerSize + dataSize);
// RIFF header
wav.write('RIFF', 0);
wav.writeUInt32LE(36 + dataSize, 4);
wav.write('WAVE', 8);
// fmt chunk
wav.write('fmt ', 12);
wav.writeUInt32LE(16, 16); // chunk size
wav.writeUInt16LE(1, 20); // PCM format
wav.writeUInt16LE(1, 22); // mono
wav.writeUInt32LE(sampleRate, 24);
wav.writeUInt32LE(sampleRate * 2, 28); // byte rate
wav.writeUInt16LE(2, 32); // block align
wav.writeUInt16LE(16, 34); // bits per sample
// data chunk
wav.write('data', 36);
wav.writeUInt32LE(dataSize, 40);
pcm.copy(wav, 44);
fs.writeFileSync(wavPath, wav);
this.log(`[prompt-cache] beep WAV generated for "${id}"`);
}
const encoded = await encodePcmFrames(pcm, sampleRate, this.log);
if (!encoded) {
this.log(`[prompt-cache] beep encoding failed for "${id}"`);
return null;
}
const actualDuration = encoded.g722Frames.length * 20;
const prompt: ICachedPrompt = {
id,
g722Frames: encoded.g722Frames,
opusFrames: encoded.opusFrames,
durationMs: actualDuration,
};
this.prompts.set(id, prompt);
this.log(`[prompt-cache] beep "${id}" cached: ${actualDuration}ms @ ${freqHz}Hz`);
return prompt;
return this.registerWav(id, wavPath);
}
/**
* Remove a prompt from the cache.
*/
/** Remove a prompt from the cache. */
remove(id: string): void {
this.prompts.delete(id);
}
/**
* Clear all cached prompts.
*/
/** Clear all cached prompts. */
clear(): void {
this.prompts.clear();
}
}
// ---------------------------------------------------------------------------
// Standalone playback helpers (for use by SystemLeg)
// ---------------------------------------------------------------------------
// -------------------------------------------------------------------------
// Internal
// -------------------------------------------------------------------------
/**
* Play a cached prompt's G.722 frames as RTP packets at 20ms intervals.
*
* @param prompt - the cached prompt to play
* @param sendPacket - function to send a raw RTP packet (12-byte header + payload)
* @param ssrc - SSRC for RTP headers
* @param onDone - called when playback finishes
* @returns cancel function, or null if prompt has no G.722 frames
*/
export function playPromptG722(
prompt: ICachedPrompt,
sendPacket: (pkt: Buffer) => void,
ssrc: number,
onDone?: () => void,
): (() => void) | null {
if (prompt.g722Frames.length === 0) {
onDone?.();
return null;
private registerWav(id: string, wavPath: string): ICachedPrompt {
const durationMs = getWavDurationMs(wavPath);
const prompt: ICachedPrompt = { id, wavPath, durationMs };
this.prompts.set(id, prompt);
this.log(`[prompt-cache] cached "${id}": ${wavPath} (${(durationMs / 1000).toFixed(1)}s)`);
return prompt;
}
const frames = prompt.g722Frames;
const PT = 9;
let frameIdx = 0;
let seq = Math.floor(Math.random() * 0xffff);
let rtpTs = Math.floor(Math.random() * 0xffffffff);
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, seq & 0xffff, rtpTs >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendPacket(pkt);
seq++;
rtpTs += rtpClockIncrement(PT);
frameIdx++;
}, 20);
return () => clearInterval(timer);
}
/**
* Play a cached prompt's Opus frames as RTP packets at 20ms intervals.
*
* @param prompt - the cached prompt to play
* @param sendPacket - function to send a raw RTP packet
* @param ssrc - SSRC for RTP headers
* @param counters - shared seq/ts counters (mutated in place for seamless transitions)
* @param onDone - called when playback finishes
* @returns cancel function, or null if prompt has no Opus frames
*/
export function playPromptOpus(
prompt: ICachedPrompt,
sendPacket: (pkt: Buffer) => void,
ssrc: number,
counters: { seq: number; ts: number },
onDone?: () => void,
): (() => void) | null {
if (prompt.opusFrames.length === 0) {
onDone?.();
return null;
}
const frames = prompt.opusFrames;
const PT = 111;
let frameIdx = 0;
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, counters.seq & 0xffff, counters.ts >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendPacket(pkt);
counters.seq++;
counters.ts += 960; // Opus 48kHz: 960 samples per 20ms
frameIdx++;
}, 20);
return () => clearInterval(timer);
}

View File

@@ -1,199 +0,0 @@
/**
* Audio transcoding bridge — uses smartrust to communicate with the Rust
* opus-codec binary, which handles Opus ↔ G.722 ↔ PCMU/PCMA transcoding.
*
* All codec conversion happens in Rust (libopus + SpanDSP G.722 port).
* The TypeScript side just passes raw payloads back and forth.
*/
import path from 'node:path';
import { RustBridge } from '@push.rocks/smartrust';
// ---------------------------------------------------------------------------
// Command type map for smartrust
// ---------------------------------------------------------------------------
type TCodecCommands = {
init: {
params: Record<string, never>;
result: Record<string, never>;
};
create_session: {
params: { session_id: string };
result: Record<string, never>;
};
destroy_session: {
params: { session_id: string };
result: Record<string, never>;
};
transcode: {
params: { data_b64: string; from_pt: number; to_pt: number; session_id?: string; direction?: string };
result: { data_b64: string };
};
encode_pcm: {
params: { data_b64: string; sample_rate: number; to_pt: number; session_id?: string };
result: { data_b64: string };
};
};
// ---------------------------------------------------------------------------
// Bridge singleton
// ---------------------------------------------------------------------------
let bridge: RustBridge<TCodecCommands> | null = null;
let initialized = false;
function buildLocalPaths(): string[] {
const root = process.cwd();
return [
path.join(root, 'dist_rust', 'opus-codec'),
path.join(root, 'rust', 'target', 'release', 'opus-codec'),
path.join(root, 'rust', 'target', 'debug', 'opus-codec'),
];
}
let logFn: ((msg: string) => void) | undefined;
/**
* Initialize the audio transcoding bridge. Spawns the Rust binary.
*/
export async function initCodecBridge(log?: (msg: string) => void): Promise<boolean> {
if (initialized && bridge) return true;
logFn = log;
try {
bridge = new RustBridge<TCodecCommands>({
binaryName: 'opus-codec',
localPaths: buildLocalPaths(),
});
const spawned = await bridge.spawn();
if (!spawned) {
log?.('[codec] failed to spawn opus-codec binary');
bridge = null;
return false;
}
// Auto-restart: reset state when the Rust process exits so the next
// transcode attempt triggers re-initialization instead of silent failure.
bridge.on('exit', () => {
logFn?.('[codec] Rust audio transcoder process exited — will re-init on next use');
bridge = null;
initialized = false;
});
await bridge.sendCommand('init', {} as any);
initialized = true;
log?.('[codec] Rust audio transcoder initialized (Opus + G.722 + PCMU/PCMA)');
return true;
} catch (e: any) {
log?.(`[codec] init error: ${e.message}`);
bridge = null;
return false;
}
}
// ---------------------------------------------------------------------------
// Session management — per-call codec isolation
// ---------------------------------------------------------------------------
/**
* Create an isolated codec session. Each session gets its own Opus/G.722
* encoder/decoder state, preventing concurrent calls from corrupting each
* other's stateful codec predictions.
*/
export async function createSession(sessionId: string): Promise<boolean> {
if (!bridge || !initialized) {
// Attempt auto-reinit if bridge died.
const ok = await initCodecBridge(logFn);
if (!ok) return false;
}
try {
await bridge!.sendCommand('create_session', { session_id: sessionId });
return true;
} catch (e: any) {
logFn?.(`[codec] create_session error: ${e?.message || e}`);
return false;
}
}
/**
* Destroy a codec session, freeing its encoder/decoder state.
*/
export async function destroySession(sessionId: string): Promise<void> {
if (!bridge || !initialized) return;
try {
await bridge.sendCommand('destroy_session', { session_id: sessionId });
} catch {
// Best-effort cleanup.
}
}
// ---------------------------------------------------------------------------
// Transcoding
// ---------------------------------------------------------------------------
/**
* Transcode an RTP payload between two codecs.
* All codec work (Opus, G.722, PCMU, PCMA) + resampling happens in Rust.
*
* @param data - raw RTP payload (no header)
* @param fromPT - source payload type (0=PCMU, 8=PCMA, 9=G.722, 111=Opus)
* @param toPT - target payload type
* @param sessionId - optional session for isolated codec state
* @returns transcoded payload, or null on failure
*/
export async function transcode(data: Buffer, fromPT: number, toPT: number, sessionId?: string, direction?: string): Promise<Buffer | null> {
if (!bridge || !initialized) return null;
try {
const params: any = {
data_b64: data.toString('base64'),
from_pt: fromPT,
to_pt: toPT,
};
if (sessionId) params.session_id = sessionId;
if (direction) params.direction = direction;
const result = await bridge.sendCommand('transcode', params);
return Buffer.from(result.data_b64, 'base64');
} catch {
return null;
}
}
/**
* Encode raw 16-bit PCM to a target codec.
* @param pcmData - raw 16-bit LE PCM bytes
* @param sampleRate - input sample rate (e.g. 22050 for Piper TTS)
* @param toPT - target payload type (9=G.722, 111=Opus, 0=PCMU, 8=PCMA)
* @param sessionId - optional session for isolated codec state
*/
export async function encodePcm(pcmData: Buffer, sampleRate: number, toPT: number, sessionId?: string): Promise<Buffer | null> {
if (!bridge || !initialized) return null;
try {
const params: any = {
data_b64: pcmData.toString('base64'),
sample_rate: sampleRate,
to_pt: toPT,
};
if (sessionId) params.session_id = sessionId;
const result = await bridge.sendCommand('encode_pcm', params);
return Buffer.from(result.data_b64, 'base64');
} catch (e: any) {
console.error('[encodePcm] error:', e?.message || e);
return null;
}
}
/** Check if the codec bridge is ready. */
export function isCodecReady(): boolean {
return initialized && bridge !== null;
}
/** Shut down the codec bridge. */
export function shutdownCodecBridge(): void {
if (bridge) {
try { bridge.kill(); } catch { /* ignore */ }
bridge = null;
initialized = false;
}
}

View File

@@ -79,6 +79,10 @@ type TProxyCommands = {
params: { call_id: string; leg_id: string; key: string; value: unknown };
result: Record<string, never>;
};
generate_tts: {
params: { model: string; voices: string; voice: string; text: string; output: string };
result: { output: string };
};
};
// ---------------------------------------------------------------------------
@@ -493,6 +497,15 @@ export function isProxyReady(): boolean {
return initialized && bridge !== null;
}
/** Send an arbitrary command to the proxy engine bridge. */
export async function sendProxyCommand<K extends keyof TProxyCommands>(
method: K,
params: TProxyCommands[K]['params'],
): Promise<TProxyCommands[K]['result']> {
if (!bridge || !initialized) throw new Error('proxy engine not initialized');
return bridge.sendCommand(method as string, params as any) as any;
}
/** Shut down the proxy engine. */
export function shutdownProxyEngine(): void {
if (bridge) {

View File

@@ -24,7 +24,6 @@ import {
getAllBrowserDeviceIds,
getBrowserDeviceWs,
} from './webrtcbridge.ts';
import { initCodecBridge } from './opusbridge.ts';
import { initAnnouncement } from './announcement.ts';
import { PromptCache } from './call/prompt-cache.ts';
import { VoiceboxManager } from './voicebox.ts';
@@ -426,9 +425,9 @@ async function startProxyEngine(): Promise<void> {
id: data.leg_id,
type: data.kind,
state: data.state,
codec: null,
rtpPort: null,
remoteMedia: null,
codec: data.codec ?? null,
rtpPort: data.rtpPort ?? null,
remoteMedia: data.remoteMedia ?? null,
metadata: data.metadata || {},
});
}
@@ -523,9 +522,8 @@ async function startProxyEngine(): Promise<void> {
const deviceList = appConfig.devices.map((d) => d.displayName).join(', ');
log(`proxy engine started | LAN ${appConfig.proxy.lanIp}:${appConfig.proxy.lanPort} | providers: ${providerList} | devices: ${deviceList}`);
// Initialize audio codec bridge (still needed for WebRTC transcoding).
// Generate TTS audio (WAV files on disk, played by Rust audio_player).
try {
await initCodecBridge(log);
await initAnnouncement(log);
// Pre-generate prompts.
@@ -547,7 +545,7 @@ async function startProxyEngine(): Promise<void> {
}
log(`[startup] prompts cached: ${promptCache.listIds().join(', ') || 'none'}`);
} catch (e) {
log(`[codec] init failed: ${e}`);
log(`[tts] init failed: ${e}`);
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.15.0',
version: '1.17.1',
description: 'undefined'
}