8 Commits

34 changed files with 1022 additions and 1335 deletions

View File

@@ -1,5 +1,36 @@
# Changelog
## 2026-04-10 - 1.17.2 - fix(proxy-engine)
use negotiated SDP payload types when wiring SIP legs and enable default nnnoiseless features for telephony denoising
- Select the negotiated codec payload type from SDP answers instead of always using the first offered codec
- Preserve the device leg's preferred payload type from its own INVITE SDP when attaching it to the mixer
- Enable default nnnoiseless features in codec-lib and proxy-engine dependencies
## 2026-04-10 - 1.17.1 - fix(proxy-engine,codec-lib,sip-proto,ts)
preserve negotiated media details and improve RTP audio handling across call legs
- Use native Opus float encode/decode to avoid unnecessary i16 quantization in the f32 audio path.
- Parse full RTP headers including extensions and sequence numbers, then sort inbound packets before decoding to keep codec state stable for out-of-order audio.
- Capture negotiated codec payload types from SDP offers and answers and include codec, RTP port, remote media, and metadata in leg_added events.
- Emit leg_state_changed and leg_removed events more consistently so the dashboard reflects leg lifecycle updates accurately.
## 2026-04-10 - 1.17.0 - feat(proxy-engine)
upgrade the internal audio bus to 48kHz f32 with per-leg denoising and improve SIP leg routing
- switch mixer, prompt playback, and tool leg audio handling from 16kHz i16 to 48kHz f32 for higher-quality internal processing
- add f32 decode/encode and resampling support plus standalone RNNoise denoiser creation in codec-lib
- apply per-leg inbound noise suppression in the mixer before mix-minus generation
- fix passthrough call routing by matching the actual leg from the signaling source address when Call-IDs are shared
- correct dialed number extraction from bare SIP request URIs by parsing the user part directly
## 2026-04-10 - 1.16.0 - feat(proxy-engine)
integrate Kokoro TTS generation into proxy-engine and simplify TypeScript prompt handling to use cached WAV files
- adds a generate_tts command to proxy-engine with lazy-loaded Kokoro model support and WAV output generation
- removes standalone opus-codec and tts-engine workspace binaries by consolidating TTS generation into proxy-engine
- updates announcement and prompt cache flows to generate and cache WAV files on disk instead of pre-encoding RTP frames in TypeScript
## 2026-04-10 - 1.15.0 - feat(proxy-engine)
add device leg, leg transfer, and leg replacement call controls

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,6 @@
{
"name": "siprouter",
"version": "1.15.0",
"version": "1.17.2",
"private": true,
"type": "module",
"scripts": {

249
rust/Cargo.lock generated
View File

@@ -237,6 +237,17 @@ version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "audiopus"
version = "0.3.0-rc.0"
@@ -487,6 +498,31 @@ dependencies = [
"inout",
]
[[package]]
name = "clap"
version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [
"atty",
"bitflags 1.3.2",
"clap_lex",
"indexmap 1.9.3",
"once_cell",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "cmake"
version = "0.1.58"
@@ -700,6 +736,125 @@ version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06d2e3287df1c007e74221c49ca10a95d557349e54b3a75dc2fb14712c751f04"
[[package]]
name = "dasp"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7381b67da416b639690ac77c73b86a7b5e64a29e31d1f75fb3b1102301ef355a"
dependencies = [
"dasp_envelope",
"dasp_frame",
"dasp_interpolate",
"dasp_peak",
"dasp_ring_buffer",
"dasp_rms",
"dasp_sample",
"dasp_signal",
"dasp_slice",
"dasp_window",
]
[[package]]
name = "dasp_envelope"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ec617ce7016f101a87fe85ed44180839744265fae73bb4aa43e7ece1b7668b6"
dependencies = [
"dasp_frame",
"dasp_peak",
"dasp_ring_buffer",
"dasp_rms",
"dasp_sample",
]
[[package]]
name = "dasp_frame"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2a3937f5fe2135702897535c8d4a5553f8b116f76c1529088797f2eee7c5cd6"
dependencies = [
"dasp_sample",
]
[[package]]
name = "dasp_interpolate"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc975a6563bb7ca7ec0a6c784ead49983a21c24835b0bc96eea11ee407c7486"
dependencies = [
"dasp_frame",
"dasp_ring_buffer",
"dasp_sample",
]
[[package]]
name = "dasp_peak"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cf88559d79c21f3d8523d91250c397f9a15b5fc72fbb3f87fdb0a37b79915bf"
dependencies = [
"dasp_frame",
"dasp_sample",
]
[[package]]
name = "dasp_ring_buffer"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07d79e19b89618a543c4adec9c5a347fe378a19041699b3278e616e387511ea1"
[[package]]
name = "dasp_rms"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6c5dcb30b7e5014486e2822537ea2beae50b19722ffe2ed7549ab03774575aa"
dependencies = [
"dasp_frame",
"dasp_ring_buffer",
"dasp_sample",
]
[[package]]
name = "dasp_sample"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c87e182de0887fd5361989c677c4e8f5000cd9491d6d563161a8f3a5519fc7f"
[[package]]
name = "dasp_signal"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa1ab7d01689c6ed4eae3d38fe1cea08cba761573fbd2d592528d55b421077e7"
dependencies = [
"dasp_envelope",
"dasp_frame",
"dasp_interpolate",
"dasp_peak",
"dasp_ring_buffer",
"dasp_rms",
"dasp_sample",
"dasp_window",
]
[[package]]
name = "dasp_slice"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e1c7335d58e7baedafa516cb361360ff38d6f4d3f9d9d5ee2a2fc8e27178fa1"
dependencies = [
"dasp_frame",
"dasp_sample",
]
[[package]]
name = "dasp_window"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99ded7b88821d2ce4e8b842c9f1c86ac911891ab89443cc1de750cae764c5076"
dependencies = [
"dasp_sample",
]
[[package]]
name = "data-encoding"
version = "2.10.0"
@@ -1214,6 +1369,12 @@ dependencies = [
"subtle",
]
[[package]]
name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.15.5"
@@ -1246,6 +1407,15 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
@@ -1446,6 +1616,16 @@ dependencies = [
"zstd",
]
[[package]]
name = "indexmap"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
dependencies = [
"autocfg",
"hashbrown 0.12.3",
]
[[package]]
name = "indexmap"
version = "2.14.0"
@@ -1739,7 +1919,13 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "805d5964d1e7a0006a7fdced7dae75084d66d18b35f1dfe81bd76929b1f8da0c"
dependencies = [
"anyhow",
"clap",
"dasp",
"dasp_interpolate",
"dasp_ring_buffer",
"easyfft",
"hound",
"once_cell",
]
@@ -1881,16 +2067,6 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "opus-codec"
version = "0.2.0"
dependencies = [
"base64 0.22.1",
"codec-lib",
"serde",
"serde_json",
]
[[package]]
name = "ort"
version = "2.0.0-rc.11"
@@ -1915,6 +2091,12 @@ dependencies = [
"ureq",
]
[[package]]
name = "os_str_bytes"
version = "6.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1"
[[package]]
name = "p256"
version = "0.11.1"
@@ -2188,6 +2370,9 @@ dependencies = [
"base64 0.22.1",
"codec-lib",
"hound",
"kokoro-tts",
"nnnoiseless",
"ort",
"rand 0.8.5",
"regex-lite",
"serde",
@@ -2890,6 +3075,21 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "termcolor"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755"
dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.16.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c13547615a44dc9c452a8a534638acdf07120d4b6847c8178705da06306a3057"
[[package]]
name = "thiserror"
version = "1.0.69"
@@ -3008,16 +3208,6 @@ dependencies = [
"strength_reduce",
]
[[package]]
name = "tts-engine"
version = "0.1.0"
dependencies = [
"hound",
"kokoro-tts",
"ort",
"tokio",
]
[[package]]
name = "turn"
version = "0.6.1"
@@ -3261,7 +3451,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909"
dependencies = [
"anyhow",
"indexmap",
"indexmap 2.14.0",
"wasm-encoder",
"wasmparser",
]
@@ -3274,7 +3464,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe"
dependencies = [
"bitflags 2.11.0",
"hashbrown 0.15.5",
"indexmap",
"indexmap 2.14.0",
"semver",
]
@@ -3532,6 +3722,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
@@ -3581,7 +3780,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21"
dependencies = [
"anyhow",
"heck",
"indexmap",
"indexmap 2.14.0",
"prettyplease",
"syn 2.0.117",
"wasm-metadata",
@@ -3612,7 +3811,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2"
dependencies = [
"anyhow",
"bitflags 2.11.0",
"indexmap",
"indexmap 2.14.0",
"log",
"serde",
"serde_derive",
@@ -3631,7 +3830,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736"
dependencies = [
"anyhow",
"id-arena",
"indexmap",
"indexmap 2.14.0",
"log",
"semver",
"serde",

View File

@@ -1,8 +1,6 @@
[workspace]
members = [
"crates/codec-lib",
"crates/opus-codec",
"crates/tts-engine",
"crates/sip-proto",
"crates/proxy-engine",
]

View File

@@ -7,4 +7,4 @@ edition = "2021"
audiopus = "0.3.0-rc.0"
ezk-g722 = "0.1"
rubato = "0.14"
nnnoiseless = { version = "0.5", default-features = false }
nnnoiseless = "0.5"

View File

@@ -1,7 +1,7 @@
//! Audio codec library for the SIP router.
//!
//! Handles Opus ↔ G.722 ↔ PCMU/PCMA transcoding with ML noise suppression.
//! Used by both the standalone `opus-codec` CLI and the `proxy-engine` binary.
//! Used by the `proxy-engine` binary for all audio transcoding.
use audiopus::coder::{Decoder as OpusDecoder, Encoder as OpusEncoder};
use audiopus::packet::Packet as OpusPacket;
@@ -104,6 +104,8 @@ pub struct TranscodeState {
g722_dec: libg722::decoder::Decoder,
/// Cached FFT resamplers keyed by (from_rate, to_rate, chunk_size).
resamplers: HashMap<(u32, u32, usize), FftFixedIn<f64>>,
/// Cached f32 FFT resamplers keyed by (from_rate, to_rate, chunk_size).
resamplers_f32: HashMap<(u32, u32, usize), FftFixedIn<f32>>,
/// ML noise suppression for the SIP-bound direction.
denoiser_to_sip: Box<DenoiseState<'static>>,
/// ML noise suppression for the browser-bound direction.
@@ -133,6 +135,7 @@ impl TranscodeState {
g722_enc,
g722_dec,
resamplers: HashMap::new(),
resamplers_f32: HashMap::new(),
denoiser_to_sip: DenoiseState::new(),
denoiser_to_browser: DenoiseState::new(),
})
@@ -293,6 +296,126 @@ impl TranscodeState {
_ => Err(format!("unsupported target PT {pt}")),
}
}
// ---- f32 API for high-quality internal bus ----------------------------
/// Decode an encoded audio payload to f32 PCM samples in [-1.0, 1.0].
/// Returns (samples, sample_rate).
///
/// For Opus, uses native float decode (no i16 quantization).
/// For G.722/G.711, decodes to i16 then converts (codec is natively i16).
pub fn decode_to_f32(&mut self, data: &[u8], pt: u8) -> Result<(Vec<f32>, u32), String> {
match pt {
PT_OPUS => {
let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz
let packet =
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
let out =
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
let n: usize = self
.opus_dec
.decode_float(Some(packet), out, false)
.map_err(|e| format!("opus decode_float: {e}"))?
.into();
pcm.truncate(n);
Ok((pcm, 48000))
}
_ => {
// G.722, PCMU, PCMA: natively i16 codecs — decode then convert.
let (pcm_i16, rate) = self.decode_to_pcm(data, pt)?;
let pcm_f32 = pcm_i16.iter().map(|&s| s as f32 / 32768.0).collect();
Ok((pcm_f32, rate))
}
}
}
/// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec.
///
/// For Opus, uses native float encode (no i16 quantization).
/// For G.722/G.711, converts to i16 then encodes (codec is natively i16).
pub fn encode_from_f32(&mut self, pcm: &[f32], pt: u8) -> Result<Vec<u8>, String> {
match pt {
PT_OPUS => {
let mut buf = vec![0u8; 4000];
let n: usize = self
.opus_enc
.encode_float(pcm, &mut buf)
.map_err(|e| format!("opus encode_float: {e}"))?
.into();
buf.truncate(n);
Ok(buf)
}
_ => {
// G.722, PCMU, PCMA: natively i16 codecs.
let pcm_i16: Vec<i16> = pcm
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();
self.encode_from_pcm(&pcm_i16, pt)
}
}
}
/// High-quality sample rate conversion for f32 PCM using rubato FFT resampler.
/// Uses a separate cache from the i16 resampler.
pub fn resample_f32(
&mut self,
pcm: &[f32],
from_rate: u32,
to_rate: u32,
) -> Result<Vec<f32>, String> {
if from_rate == to_rate || pcm.is_empty() {
return Ok(pcm.to_vec());
}
let chunk = pcm.len();
let key = (from_rate, to_rate, chunk);
if !self.resamplers_f32.contains_key(&key) {
let r =
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, chunk, 1, 1)
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
self.resamplers_f32.insert(key, r);
}
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
let input = vec![pcm.to_vec()];
let result = resampler
.process(&input, None)
.map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?;
Ok(result[0].clone())
}
/// Apply RNNoise ML noise suppression to 48kHz f32 PCM audio.
/// Processes in 480-sample (10ms) frames. State persists across calls.
/// Operates natively in f32 — no i16 conversion overhead.
pub fn denoise_f32(denoiser: &mut DenoiseState, pcm: &[f32]) -> Vec<f32> {
let frame_size = DenoiseState::FRAME_SIZE; // 480
let total = pcm.len();
let whole = (total / frame_size) * frame_size;
let mut output = Vec::with_capacity(total);
let mut out_buf = [0.0f32; 480];
// nnnoiseless expects f32 samples scaled as i16 range (-32768..32767).
for offset in (0..whole).step_by(frame_size) {
let input: Vec<f32> = pcm[offset..offset + frame_size]
.iter()
.map(|&s| s * 32768.0)
.collect();
denoiser.process_frame(&mut out_buf, &input);
output.extend(out_buf.iter().map(|&s| s / 32768.0));
}
if whole < total {
output.extend_from_slice(&pcm[whole..]);
}
output
}
}
/// Create a new standalone denoiser for per-leg inbound processing.
pub fn new_denoiser() -> Box<DenoiseState<'static>> {
DenoiseState::new()
}
#[cfg(test)]

View File

@@ -1,14 +0,0 @@
[package]
name = "opus-codec"
version = "0.2.0"
edition = "2021"
[[bin]]
name = "opus-codec"
path = "src/main.rs"
[dependencies]
codec-lib = { path = "../codec-lib" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
base64 = "0.22"

View File

@@ -1,286 +0,0 @@
/// Audio transcoding bridge for smartrust.
///
/// Thin CLI wrapper around `codec-lib`. Handles Opus ↔ G.722 ↔ PCMU transcoding.
///
/// Protocol:
/// -> {"id":"1","method":"init","params":{}}
/// <- {"id":"1","success":true,"result":{}}
/// -> {"id":"2","method":"create_session","params":{"session_id":"call-abc"}}
/// <- {"id":"2","success":true,"result":{}}
/// -> {"id":"3","method":"transcode","params":{"session_id":"call-abc","data_b64":"...","from_pt":111,"to_pt":9}}
/// <- {"id":"3","success":true,"result":{"data_b64":"..."}}
/// -> {"id":"4","method":"destroy_session","params":{"session_id":"call-abc"}}
/// <- {"id":"4","success":true,"result":{}}
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine as _;
use codec_lib::{codec_sample_rate, TranscodeState};
use serde::Deserialize;
use std::collections::HashMap;
use std::io::{self, BufRead, Write};
#[derive(Deserialize)]
struct Request {
id: String,
method: String,
#[serde(default)]
params: serde_json::Value,
}
fn respond(
out: &mut impl Write,
id: &str,
success: bool,
result: Option<serde_json::Value>,
error: Option<&str>,
) {
let mut resp = serde_json::json!({ "id": id, "success": success });
if let Some(r) = result {
resp["result"] = r;
}
if let Some(e) = error {
resp["error"] = serde_json::Value::String(e.to_string());
}
let _ = writeln!(out, "{}", resp);
let _ = out.flush();
}
/// Resolve a session: if session_id is provided, look it up in the sessions map;
/// otherwise fall back to the default state (backward compat with `init`).
fn get_session<'a>(
sessions: &'a mut HashMap<String, TranscodeState>,
default: &'a mut Option<TranscodeState>,
params: &serde_json::Value,
) -> Option<&'a mut TranscodeState> {
if let Some(sid) = params.get("session_id").and_then(|v| v.as_str()) {
sessions.get_mut(sid)
} else {
default.as_mut()
}
}
fn main() {
let stdin = io::stdin();
let stdout = io::stdout();
let mut out = io::BufWriter::new(stdout.lock());
let _ = writeln!(out, r#"{{"event":"ready","data":{{}}}}"#);
let _ = out.flush();
let mut default_state: Option<TranscodeState> = None;
let mut sessions: HashMap<String, TranscodeState> = HashMap::new();
for line in stdin.lock().lines() {
let line = match line {
Ok(l) if !l.trim().is_empty() => l,
Ok(_) => continue,
Err(_) => break,
};
let req: Request = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
respond(&mut out, "", false, None, Some(&format!("parse: {e}")));
continue;
}
};
match req.method.as_str() {
"init" => match TranscodeState::new() {
Ok(s) => {
default_state = Some(s);
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
}
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
},
"create_session" => {
let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s.to_string(),
None => {
respond(&mut out, &req.id, false, None, Some("missing session_id"));
continue;
}
};
if sessions.contains_key(&session_id) {
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
continue;
}
match TranscodeState::new() {
Ok(s) => {
sessions.insert(session_id, s);
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
}
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
}
}
"destroy_session" => {
let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) {
Some(s) => s,
None => {
respond(&mut out, &req.id, false, None, Some("missing session_id"));
continue;
}
};
sessions.remove(session_id);
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
}
"transcode" => {
let st = match get_session(&mut sessions, &mut default_state, &req.params) {
Some(s) => s,
None => {
respond(
&mut out,
&req.id,
false,
None,
Some("not initialized (no session or default state)"),
);
continue;
}
};
let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) {
Some(s) => s,
None => {
respond(&mut out, &req.id, false, None, Some("missing data_b64"));
continue;
}
};
let from_pt =
req.params.get("from_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
let direction = req.params.get("direction").and_then(|v| v.as_str());
let data = match B64.decode(data_b64) {
Ok(b) => b,
Err(e) => {
respond(
&mut out,
&req.id,
false,
None,
Some(&format!("b64: {e}")),
);
continue;
}
};
match st.transcode(&data, from_pt, to_pt, direction) {
Ok(result) => {
respond(
&mut out,
&req.id,
true,
Some(serde_json::json!({ "data_b64": B64.encode(&result) })),
None,
);
}
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
}
}
"encode_pcm" => {
let st = match get_session(&mut sessions, &mut default_state, &req.params) {
Some(s) => s,
None => {
respond(
&mut out,
&req.id,
false,
None,
Some("not initialized (no session or default state)"),
);
continue;
}
};
let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) {
Some(s) => s,
None => {
respond(&mut out, &req.id, false, None, Some("missing data_b64"));
continue;
}
};
let sample_rate = req
.params
.get("sample_rate")
.and_then(|v| v.as_u64())
.unwrap_or(22050) as u32;
let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8;
let data = match B64.decode(data_b64) {
Ok(b) => b,
Err(e) => {
respond(
&mut out,
&req.id,
false,
None,
Some(&format!("b64: {e}")),
);
continue;
}
};
if data.len() % 2 != 0 {
respond(
&mut out,
&req.id,
false,
None,
Some("PCM data has odd byte count (expected 16-bit LE samples)"),
);
continue;
}
let pcm: Vec<i16> = data
.chunks_exact(2)
.map(|c| i16::from_le_bytes([c[0], c[1]]))
.collect();
let target_rate = codec_sample_rate(to_pt);
let resampled = match st.resample(&pcm, sample_rate, target_rate) {
Ok(r) => r,
Err(e) => {
respond(&mut out, &req.id, false, None, Some(&e));
continue;
}
};
match st.encode_from_pcm(&resampled, to_pt) {
Ok(encoded) => {
respond(
&mut out,
&req.id,
true,
Some(serde_json::json!({ "data_b64": B64.encode(&encoded) })),
None,
);
}
Err(e) => {
respond(&mut out, &req.id, false, None, Some(&e));
}
}
}
"encode" | "decode" => {
respond(
&mut out,
&req.id,
false,
None,
Some("use 'transcode' command instead"),
);
}
_ => respond(
&mut out,
&req.id,
false,
None,
Some(&format!("unknown: {}", req.method)),
),
}
}
}

View File

@@ -10,6 +10,7 @@ path = "src/main.rs"
[dependencies]
codec-lib = { path = "../codec-lib" }
sip-proto = { path = "../sip-proto" }
nnnoiseless = "0.5"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
@@ -18,3 +19,8 @@ regex-lite = "0.1"
webrtc = "0.8"
rand = "0.8"
hound = "3.5"
kokoro-tts = { version = "0.3", default-features = false }
ort = { version = "=2.0.0-rc.11", default-features = false, features = [
"std", "download-binaries", "copy-dylibs", "ndarray",
"tls-native-vendored"
] }

View File

@@ -10,9 +10,9 @@ use tokio::net::UdpSocket;
use tokio::time::{self, Duration};
/// Mixing sample rate used by the mixer (must stay in sync with mixer::MIX_RATE).
const MIX_RATE: u32 = 16000;
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320;
const MIX_FRAME_SIZE: usize = 960;
/// Play a WAV file as RTP to a destination.
/// Returns when playback is complete.
@@ -178,9 +178,9 @@ pub async fn play_beep(
Ok((seq, ts))
}
/// Load a WAV file and split it into 20ms PCM frames at 16kHz.
/// Load a WAV file and split it into 20ms f32 PCM frames at 48kHz.
/// Used by the leg interaction system to prepare prompt audio for the mixer.
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
let path = Path::new(wav_path);
if !path.exists() {
return Err(format!("WAV file not found: {wav_path}"));
@@ -191,17 +191,17 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
let spec = reader.spec();
let wav_rate = spec.sample_rate;
// Read all samples as i16.
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
// Read all samples as f32 in [-1.0, 1.0].
let samples: Vec<f32> = if spec.bits_per_sample == 16 {
reader
.samples::<i16>()
.filter_map(|s| s.ok())
.map(|s| s as f32 / 32768.0)
.collect()
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
reader
.samples::<f32>()
.filter_map(|s| s.ok())
.map(|s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect()
} else {
return Err(format!(
@@ -214,24 +214,24 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
return Ok(vec![]);
}
// Resample to MIX_RATE (16kHz) if needed.
// Resample to MIX_RATE (48kHz) if needed.
let resampled = if wav_rate != MIX_RATE {
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
transcoder
.resample(&samples, wav_rate, MIX_RATE)
.resample_f32(&samples, wav_rate, MIX_RATE)
.map_err(|e| format!("resample: {e}"))?
} else {
samples
};
// Split into MIX_FRAME_SIZE (320) sample frames.
// Split into MIX_FRAME_SIZE (960) sample frames.
let mut frames = Vec::new();
let mut offset = 0;
while offset < resampled.len() {
let end = (offset + MIX_FRAME_SIZE).min(resampled.len());
let mut frame = resampled[offset..end].to_vec();
// Pad short final frame with silence.
frame.resize(MIX_FRAME_SIZE, 0);
frame.resize(MIX_FRAME_SIZE, 0.0);
frames.push(frame);
offset += MIX_FRAME_SIZE;
}

View File

@@ -20,6 +20,35 @@ use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::UdpSocket;
/// Emit a `leg_added` event with full leg information.
/// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed.
fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
let metadata: serde_json::Value = if leg.metadata.is_empty() {
serde_json::json!({})
} else {
serde_json::Value::Object(
leg.metadata
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
)
};
emit_event(
tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg.id,
"kind": leg.kind.as_str(),
"state": leg.state.as_str(),
"codec": sip_proto::helpers::codec_name(leg.codec_pt),
"rtpPort": leg.rtp_port,
"remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
"metadata": metadata,
}),
);
}
pub struct CallManager {
/// All active calls, keyed by internal call ID.
pub calls: HashMap<String, Call>,
@@ -120,7 +149,19 @@ impl CallManager {
}
// Passthrough-style routing for inbound/outbound device↔provider calls.
self.route_passthrough_message(&call_id, &leg_id, msg, from_addr, socket, config)
// The sip_index only stores one leg for shared Call-IDs, so we need to
// determine which leg the message actually belongs to by comparing from_addr.
let actual_leg_id = self
.calls
.get(&call_id)
.and_then(|call| {
call.legs
.values()
.find(|l| l.signaling_addr == Some(from_addr))
.map(|l| l.id.clone())
})
.unwrap_or(leg_id);
self.route_passthrough_message(&call_id, &actual_leg_id, msg, from_addr, socket, config)
.await
}
@@ -155,7 +196,17 @@ impl CallManager {
};
// Mutable borrow on call/leg is now released.
let sip_pt = codecs.first().copied().unwrap_or(9);
let mut sip_pt = codecs.first().copied().unwrap_or(9);
// If the message has SDP (e.g., 200 OK answer), use the negotiated codec
// instead of the offered one.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Some(pt) = ep.codec_pt {
sip_pt = pt;
}
}
}
match action {
SipLegAction::None => {}
@@ -253,14 +304,27 @@ impl CallManager {
dev_leg.state = LegState::Connected;
}
}
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": dev_leg_id, "state": "connected" }),
);
// Wire device leg to mixer.
// Use the device's preferred codec from its INVITE SDP,
// not the provider's negotiated codec.
let dev_pt = device_invite
.has_sdp_body()
.then(|| parse_sdp_endpoint(&device_invite.body))
.flatten()
.and_then(|ep| ep.codec_pt)
.unwrap_or(sip_pt);
if let Some(dev_remote_addr) = dev_remote {
let dev_channels = create_leg_channels();
spawn_sip_inbound(dev_rtp_socket.clone(), dev_channels.inbound_tx);
spawn_sip_outbound(dev_rtp_socket, dev_remote_addr, dev_channels.outbound_rx);
if let Some(call) = self.calls.get(call_id) {
call.add_leg_to_mixer(&dev_leg_id, sip_pt, dev_channels.inbound_rx, dev_channels.outbound_tx)
call.add_leg_to_mixer(&dev_leg_id, dev_pt, dev_channels.inbound_rx, dev_channels.outbound_tx)
.await;
}
}
@@ -312,6 +376,8 @@ impl CallManager {
leg.state = LegState::Terminated;
}
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
emit_event(&self.out_tx, "call_ended",
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
self.terminate_call(call_id).await;
@@ -517,21 +583,30 @@ impl CallManager {
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Ringing;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
} else if code >= 200 && code < 300 {
let mut needs_wiring = false;
if let Some(leg) = call.legs.get_mut(this_leg_id) {
leg.state = LegState::Connected;
// Learn remote media from SDP.
// Learn remote media and negotiated codec from SDP answer.
if msg.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
leg.remote_media = Some(addr);
}
// Use the codec from the SDP answer (what the remote actually selected).
if let Some(pt) = ep.codec_pt {
leg.codec_pt = pt;
}
}
}
needs_wiring = true;
}
emit_event(&self.out_tx, "leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
if call.state != CallState::Connected {
call.state = CallState::Connected;
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
@@ -677,15 +752,19 @@ impl CallManager {
call.callee_number = Some(called_number);
call.state = CallState::Ringing;
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
let mut codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
// Provider leg — extract media from SDP.
// Provider leg — extract media and negotiated codec from SDP.
let mut provider_media: Option<SocketAddr> = None;
if invite.has_sdp_body() {
if let Some(ep) = parse_sdp_endpoint(&invite.body) {
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
provider_media = Some(addr);
}
// Use the codec from the provider's SDP offer (what they actually want to use).
if let Some(pt) = ep.codec_pt {
codec_pt = pt;
}
}
}
@@ -755,6 +834,16 @@ impl CallManager {
// Store the call.
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs.
if let Some(call) = self.calls.get(&call_id) {
if let Some(leg) = call.legs.get(&provider_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
if let Some(leg) = call.legs.get(&device_leg_id) {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -842,6 +931,14 @@ impl CallManager {
.insert(sip_call_id, (call_id.clone(), leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -866,11 +963,18 @@ impl CallManager {
let lan_port = config.proxy.lan_port;
let device_sip_call_id = invite.call_id().to_string();
// Extract just the user part from the request URI (e.g., "sip:16196000@10.0.0.1" → "16196000").
// extract_uri is for header values with angle brackets, not bare request URIs.
let dialed_number = invite
.request_uri()
.and_then(|uri| SipMessage::extract_uri(uri))
.unwrap_or(invite.request_uri().unwrap_or(""))
.to_string();
.map(|uri| {
let stripped = uri
.strip_prefix("sip:")
.or_else(|| uri.strip_prefix("sips:"))
.unwrap_or(uri);
stripped.split('@').next().unwrap_or(stripped).to_string()
})
.unwrap_or_default();
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
Some(a) => a,
@@ -983,6 +1087,14 @@ impl CallManager {
.insert(provider_sip_call_id, (call_id.clone(), provider_leg_id));
self.calls.insert(call_id.clone(), call);
// Emit leg_added for both initial legs (device + provider).
if let Some(call) = self.calls.get(&call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, &call_id, leg);
}
}
Some(call_id)
}
@@ -1050,17 +1162,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-provider",
"state": "inviting",
"number": number,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1126,17 +1232,11 @@ impl CallManager {
let call = self.calls.get_mut(call_id).unwrap();
call.legs.insert(leg_id.clone(), leg_info);
emit_event(
&self.out_tx,
"leg_added",
serde_json::json!({
"call_id": call_id,
"leg_id": leg_id,
"kind": "sip-device",
"state": "inviting",
"device_id": device_id,
}),
);
if let Some(call) = self.calls.get(call_id) {
if let Some(leg) = call.legs.get(&leg_id) {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
Some(leg_id)
}
@@ -1223,6 +1323,13 @@ impl CallManager {
None => return false,
};
// Emit leg_removed for source call.
emit_event(
&self.out_tx,
"leg_removed",
serde_json::json!({ "call_id": source_call_id, "leg_id": leg_id }),
);
// Update SIP index to point to the target call.
if let Some(sip_cid) = &leg_info.sip_call_id {
self.sip_index.insert(
@@ -1255,15 +1362,12 @@ impl CallManager {
let target_call = self.calls.get_mut(target_call_id).unwrap();
target_call.legs.insert(leg_id.to_string(), leg_info);
emit_event(
&self.out_tx,
"leg_transferred",
serde_json::json!({
"leg_id": leg_id,
"source_call_id": source_call_id,
"target_call_id": target_call_id,
}),
);
// Emit leg_added for target call.
if let Some(target) = self.calls.get(target_call_id) {
if let Some(leg) = target.legs.get(leg_id) {
emit_leg_added_event(&self.out_tx, target_call_id, leg);
}
}
// Check if source call has too few legs remaining.
let source_call = self.calls.get(source_call_id).unwrap();
@@ -1366,6 +1470,11 @@ impl CallManager {
}
}
leg.state = LegState::Terminated;
emit_event(
&self.out_tx,
"leg_state_changed",
serde_json::json!({ "call_id": call_id, "leg_id": leg.id, "state": "terminated" }),
);
}
emit_event(
@@ -1484,6 +1593,13 @@ impl CallManager {
);
self.calls.insert(call_id.to_string(), call);
// Emit leg_added for the provider leg.
if let Some(call) = self.calls.get(call_id) {
for leg in call.legs.values() {
emit_leg_added_event(&self.out_tx, call_id, leg);
}
}
// Build recording path.
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)

View File

@@ -35,7 +35,8 @@ pub fn create_leg_channels() -> LegChannels {
}
/// Spawn the inbound I/O task for a SIP leg.
/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer.
/// Reads RTP from the socket, parses the variable-length header (RFC 3550),
/// and sends the payload to the mixer.
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
pub fn spawn_sip_inbound(
rtp_socket: Arc<UdpSocket>,
@@ -51,12 +52,29 @@ pub fn spawn_sip_inbound(
}
let pt = buf[1] & 0x7F;
let marker = (buf[1] & 0x80) != 0;
let seq = u16::from_be_bytes([buf[2], buf[3]]);
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
let payload = buf[12..n].to_vec();
// RFC 3550: header length = 12 + (CC * 4) + optional extension.
let cc = (buf[0] & 0x0F) as usize;
let has_extension = (buf[0] & 0x10) != 0;
let mut offset = 12 + cc * 4;
if has_extension {
if offset + 4 > n {
continue; // Malformed: extension header truncated.
}
let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
offset += 4 + ext_len * 4;
}
if offset >= n {
continue; // No payload after header.
}
let payload = buf[offset..n].to_vec();
if payload.is_empty() {
continue;
}
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, timestamp }).await.is_err() {
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() {
break; // Channel closed — leg removed.
}
}

View File

@@ -21,6 +21,7 @@ mod rtp;
mod sip_leg;
mod sip_transport;
mod tool_leg;
mod tts;
mod voicemail;
mod webrtc_engine;
@@ -93,6 +94,9 @@ async fn main() {
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
// TTS engine — separate lock, lazy-loads model on first use.
let tts_engine = Arc::new(Mutex::new(tts::TtsEngine::new()));
// Read commands from stdin.
let stdin = tokio::io::stdin();
let reader = BufReader::new(stdin);
@@ -113,11 +117,12 @@ async fn main() {
let engine = engine.clone();
let webrtc = webrtc.clone();
let tts_engine = tts_engine.clone();
let out_tx = out_tx.clone();
// Handle commands — some are async, so we spawn.
tokio::spawn(async move {
handle_command(engine, webrtc, &out_tx, cmd).await;
handle_command(engine, webrtc, tts_engine, &out_tx, cmd).await;
});
}
}
@@ -125,6 +130,7 @@ async fn main() {
async fn handle_command(
engine: Arc<Mutex<ProxyEngine>>,
webrtc: Arc<Mutex<WebRtcEngine>>,
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: &OutTx,
cmd: Command,
) {
@@ -150,6 +156,8 @@ async fn handle_command(
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
// TTS command — lock tts_engine only (no SIP/WebRTC contention).
"generate_tts" => handle_generate_tts(tts_engine, out_tx, &cmd).await,
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
}
}
@@ -669,6 +677,10 @@ async fn handle_webrtc_link(
"leg_id": session_id,
"kind": "webrtc",
"state": "connected",
"codec": "Opus",
"rtpPort": 0,
"remoteMedia": null,
"metadata": {},
}));
respond_ok(out_tx, &cmd.id, serde_json::json!({
@@ -1117,8 +1129,11 @@ async fn handle_add_tool_leg(
"call_id": call_id,
"leg_id": tool_leg_id,
"kind": "tool",
"tool_type": tool_type_str,
"state": "connected",
"codec": null,
"rtpPort": 0,
"remoteMedia": null,
"metadata": { "tool_type": tool_type_str },
}),
);
@@ -1218,3 +1233,16 @@ async fn handle_set_leg_metadata(
leg.metadata.insert(key, value);
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
}
/// Handle `generate_tts` — synthesize text to a WAV file using Kokoro TTS.
async fn handle_generate_tts(
tts_engine: Arc<Mutex<tts::TtsEngine>>,
out_tx: &OutTx,
cmd: &Command,
) {
let mut tts = tts_engine.lock().await;
match tts.generate(&cmd.params).await {
Ok(result) => respond_ok(out_tx, &cmd.id, result),
Err(e) => respond_err(out_tx, &cmd.id, &e),
}
}

View File

@@ -3,9 +3,12 @@
//! Each Call spawns one mixer task. Legs communicate with the mixer via
//! tokio mpsc channels — no shared mutable state, no lock contention.
//!
//! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame).
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
//!
//! The mixer runs a 20ms tick loop:
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
//! 2. Compute total mix (sum of all **participant** legs' PCM as i32)
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
//! 5. For each tool leg: send per-source unmerged audio batch
@@ -13,16 +16,18 @@
use crate::ipc::{emit_event, OutTx};
use crate::rtp::{build_rtp_header, rtp_clock_increment};
use codec_lib::{codec_sample_rate, TranscodeState};
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
use nnnoiseless::DenoiseState;
use std::collections::{HashMap, VecDeque};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::{self, Duration, MissedTickBehavior};
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
const MIX_RATE: u32 = 16000;
/// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample.
/// All processing (denoising, mixing) happens at this rate in f32 for maximum quality.
const MIX_RATE: u32 = 48000;
/// Samples per 20ms frame at the mixing rate.
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
/// A raw RTP payload received from a leg (no RTP header).
pub struct RtpPacket {
@@ -30,6 +35,8 @@ pub struct RtpPacket {
pub payload_type: u8,
/// RTP marker bit (first packet of a DTMF event, etc.).
pub marker: bool,
/// RTP sequence number for reordering.
pub seq: u16,
/// RTP timestamp from the original packet header.
pub timestamp: u32,
}
@@ -47,8 +54,8 @@ enum LegRole {
}
struct IsolationState {
/// PCM frames at MIX_RATE (320 samples each) queued for playback.
prompt_frames: VecDeque<Vec<i16>>,
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
prompt_frames: VecDeque<Vec<f32>>,
/// Digits that complete the interaction (e.g., ['1', '2']).
expected_digits: Vec<char>,
/// Ticks remaining before timeout (decremented each tick after prompt ends).
@@ -88,8 +95,8 @@ pub struct ToolAudioBatch {
/// One participant's 20ms audio frame.
pub struct ToolAudioSource {
pub leg_id: String,
/// PCM at 16kHz, MIX_FRAME_SIZE (320) samples.
pub pcm_16k: Vec<i16>,
/// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples.
pub pcm_48k: Vec<f32>,
}
/// Internal storage for a tool leg inside the mixer.
@@ -122,8 +129,8 @@ pub enum MixerCommand {
/// DTMF from the leg is checked against expected_digits.
StartInteraction {
leg_id: String,
/// PCM frames at MIX_RATE (16kHz), each 320 samples.
prompt_pcm_frames: Vec<Vec<i16>>,
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
prompt_pcm_frames: Vec<Vec<f32>>,
expected_digits: Vec<char>,
timeout_ms: u32,
result_tx: oneshot::Sender<InteractionResult>,
@@ -149,10 +156,12 @@ pub enum MixerCommand {
struct MixerLegSlot {
codec_pt: u8,
transcoder: TranscodeState,
/// Per-leg inbound denoiser (48kHz, 480-sample frames).
denoiser: Box<DenoiseState<'static>>,
inbound_rx: mpsc::Receiver<RtpPacket>,
outbound_tx: mpsc::Sender<Vec<u8>>,
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
last_pcm_frame: Vec<i16>,
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
last_pcm_frame: Vec<f32>,
/// Number of consecutive ticks with no inbound packet.
silent_ticks: u32,
// RTP output state.
@@ -220,9 +229,10 @@ async fn mixer_loop(
MixerLegSlot {
codec_pt,
transcoder,
denoiser: new_denoiser(),
inbound_rx,
outbound_tx,
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
silent_ticks: 0,
rtp_seq: 0,
rtp_ts: 0,
@@ -311,16 +321,18 @@ async fn mixer_loop(
continue;
}
// ── 2. Drain inbound packets, decode to 16kHz PCM. ─────────
// ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ────
// DTMF (PT 101) packets are collected separately.
// Audio packets are sorted by sequence number and decoded
// in order to maintain codec state (critical for G.722 ADPCM).
let leg_ids: Vec<String> = legs.keys().cloned().collect();
let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new();
for lid in &leg_ids {
let slot = legs.get_mut(lid).unwrap();
// Drain channel — collect DTMF packets separately, keep latest audio.
let mut latest_audio: Option<RtpPacket> = None;
// Drain channel — collect DTMF separately, collect ALL audio packets.
let mut audio_packets: Vec<RtpPacket> = Vec::new();
loop {
match slot.inbound_rx.try_recv() {
Ok(pkt) => {
@@ -328,33 +340,47 @@ async fn mixer_loop(
// DTMF telephone-event: collect for processing.
dtmf_forward.push((lid.clone(), pkt));
} else {
latest_audio = Some(pkt);
audio_packets.push(pkt);
}
}
Err(_) => break,
}
}
if let Some(pkt) = latest_audio {
if !audio_packets.is_empty() {
slot.silent_ticks = 0;
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to mixing rate if needed.
let pcm_mix = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = pcm_mix;
frame.resize(MIX_FRAME_SIZE, 0);
slot.last_pcm_frame = frame;
}
Err(_) => {
// Decode failed — use silence.
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
// Sort by sequence number for correct codec state progression.
// This prevents G.722 ADPCM state corruption from out-of-order packets.
audio_packets.sort_by_key(|p| p.seq);
// Decode ALL packets in order (maintains codec state),
// but only keep the last decoded frame for mixing.
for pkt in &audio_packets {
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
Ok((pcm, rate)) => {
// Resample to 48kHz mixing rate if needed.
let pcm_48k = if rate == MIX_RATE {
pcm
} else {
slot.transcoder
.resample_f32(&pcm, rate, MIX_RATE)
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
};
// Per-leg inbound denoising at 48kHz.
// Only for SIP telephony legs — WebRTC browsers
// already apply noise suppression via getUserMedia.
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
} else {
pcm_48k
};
// Pad or truncate to exactly MIX_FRAME_SIZE.
let mut frame = processed;
frame.resize(MIX_FRAME_SIZE, 0.0);
slot.last_pcm_frame = frame;
}
Err(_) => {}
}
}
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
@@ -364,17 +390,18 @@ async fn mixer_loop(
slot.silent_ticks += 1;
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
if slot.silent_ticks > 150 {
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
}
}
}
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
// Accumulate as f64 to prevent precision loss when summing f32.
let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE];
for slot in legs.values() {
if matches!(slot.role, LegRole::Participant) {
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
total_mix[i] += s as i32;
total_mix[i] += s as f64;
}
}
}
@@ -387,27 +414,27 @@ async fn mixer_loop(
for (lid, slot) in legs.iter_mut() {
match &mut slot.role {
LegRole::Participant => {
// Mix-minus: total minus this leg's own contribution.
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
for i in 0..MIX_FRAME_SIZE {
let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32)
.clamp(-32768, 32767) as i16;
mix_minus.push(sample);
let sample =
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
mix_minus.push(sample.clamp(-1.0, 1.0));
}
// Resample from 16kHz to the leg's codec native rate.
// Resample from 48kHz to the leg's codec native rate.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
mix_minus
} else {
slot.transcoder
.resample(&mix_minus, MIX_RATE, target_rate)
.resample_f32(&mix_minus, MIX_RATE, target_rate)
.unwrap_or_default()
};
// Encode to the leg's codec.
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
let encoded =
match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
Ok(e) if !e.is_empty() => e,
_ => continue,
};
@@ -456,21 +483,21 @@ async fn mixer_loop(
frame
} else {
state.prompt_done = true;
vec![0i16; MIX_FRAME_SIZE]
vec![0.0f32; MIX_FRAME_SIZE]
};
// Encode prompt frame to the leg's codec (reuses existing encode path).
// Encode prompt frame to the leg's codec.
let target_rate = codec_sample_rate(slot.codec_pt);
let resampled = if target_rate == MIX_RATE {
pcm_frame
} else {
slot.transcoder
.resample(&pcm_frame, MIX_RATE, target_rate)
.resample_f32(&pcm_frame, MIX_RATE, target_rate)
.unwrap_or_default()
};
if let Ok(encoded) =
slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt)
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
{
if !encoded.is_empty() {
let header = build_rtp_header(
@@ -523,7 +550,7 @@ async fn mixer_loop(
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
.map(|(lid, s)| ToolAudioSource {
leg_id: lid.clone(),
pcm_16k: s.last_pcm_frame.clone(),
pcm_48k: s.last_pcm_frame.clone(),
})
.collect();
@@ -533,7 +560,7 @@ async fn mixer_loop(
.iter()
.map(|s| ToolAudioSource {
leg_id: s.leg_id.clone(),
pcm_16k: s.pcm_16k.clone(),
pcm_48k: s.pcm_48k.clone(),
})
.collect(),
};

View File

@@ -2,7 +2,7 @@
//!
//! Tool legs are observer legs that receive individual audio streams from each
//! participant in a call. The mixer pipes `ToolAudioBatch` every 20ms containing
//! each participant's decoded PCM@16kHz tagged with source leg ID.
//! each participant's decoded PCM@48kHz f32 tagged with source leg ID.
//!
//! Consumers:
//! - **Recording**: writes per-source WAV files for speaker-separated recording.
@@ -37,20 +37,25 @@ pub fn spawn_recording_tool(
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
// Skip silence-only frames (all zeros = no audio activity).
let has_audio = source.pcm_16k.iter().any(|&s| s != 0);
// Skip silence-only frames (near-zero = no audio activity).
let has_audio = source.pcm_48k.iter().any(|&s| s.abs() > 1e-6);
if !has_audio && !recorders.contains_key(&source.leg_id) {
continue; // Don't create a file for silence-only sources.
}
let recorder = recorders.entry(source.leg_id.clone()).or_insert_with(|| {
let path = format!("{}/{}-{}.wav", base_dir, call_id, source.leg_id);
Recorder::new_pcm(&path, 16000, None).unwrap_or_else(|e| {
Recorder::new_pcm(&path, 48000, None).unwrap_or_else(|e| {
panic!("failed to create recorder for {}: {e}", source.leg_id);
})
});
if !recorder.write_pcm(&source.pcm_16k) {
// Convert f32 [-1.0, 1.0] to i16 for WAV writing.
let pcm_i16: Vec<i16> = source.pcm_48k
.iter()
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
.collect();
if !recorder.write_pcm(&pcm_i16) {
// Max duration reached — stop recording this source.
break;
}
@@ -88,7 +93,7 @@ pub fn spawn_recording_tool(
/// Spawn a transcription tool leg.
///
/// The plumbing is fully real: it receives per-source unmerged PCM@16kHz from
/// The plumbing is fully real: it receives per-source unmerged PCM@48kHz f32 from
/// the mixer every 20ms. The consumer is a stub that accumulates audio and
/// reports metadata on close. Future: will stream to a Whisper HTTP endpoint.
pub fn spawn_transcription_tool(
@@ -105,7 +110,7 @@ pub fn spawn_transcription_tool(
while let Some(batch) = rx.recv().await {
for source in &batch.sources {
*source_samples.entry(source.leg_id.clone()).or_insert(0) +=
source.pcm_16k.len() as u64;
source.pcm_48k.len() as u64;
// TODO: Future — accumulate chunks and stream to Whisper endpoint.
// For now, the audio is received and counted but not processed.
@@ -118,7 +123,7 @@ pub fn spawn_transcription_tool(
.map(|(leg_id, samples)| {
serde_json::json!({
"source_leg_id": leg_id,
"duration_ms": (samples * 1000) / 16000,
"duration_ms": (samples * 1000) / 48000,
})
})
.collect();

View File

@@ -0,0 +1,138 @@
//! Text-to-speech engine — synthesizes text to WAV files using Kokoro neural TTS.
//!
//! The model is loaded lazily on first use. If the model/voices files are not
//! present, the generate command returns an error and the TS side falls back
//! to espeak-ng.
use kokoro_tts::{KokoroTts, Voice};
use std::path::Path;
/// Wraps the Kokoro TTS engine with lazy model loading.
pub struct TtsEngine {
tts: Option<KokoroTts>,
/// Path that was used to load the current model (for cache invalidation).
loaded_model_path: String,
loaded_voices_path: String,
}
impl TtsEngine {
pub fn new() -> Self {
Self {
tts: None,
loaded_model_path: String::new(),
loaded_voices_path: String::new(),
}
}
/// Generate a WAV file from text.
///
/// Params (from IPC JSON):
/// - `model`: path to the ONNX model file
/// - `voices`: path to the voices.bin file
/// - `voice`: voice name (e.g. "af_bella")
/// - `text`: text to synthesize
/// - `output`: output WAV file path
pub async fn generate(&mut self, params: &serde_json::Value) -> Result<serde_json::Value, String> {
let model_path = params.get("model").and_then(|v| v.as_str())
.ok_or("missing 'model' param")?;
let voices_path = params.get("voices").and_then(|v| v.as_str())
.ok_or("missing 'voices' param")?;
let voice_name = params.get("voice").and_then(|v| v.as_str())
.unwrap_or("af_bella");
let text = params.get("text").and_then(|v| v.as_str())
.ok_or("missing 'text' param")?;
let output_path = params.get("output").and_then(|v| v.as_str())
.ok_or("missing 'output' param")?;
if text.is_empty() {
return Err("empty text".into());
}
// Check that model/voices files exist.
if !Path::new(model_path).exists() {
return Err(format!("model not found: {model_path}"));
}
if !Path::new(voices_path).exists() {
return Err(format!("voices not found: {voices_path}"));
}
// Lazy-load or reload if paths changed.
if self.tts.is_none()
|| self.loaded_model_path != model_path
|| self.loaded_voices_path != voices_path
{
eprintln!("[tts] loading model: {model_path}");
let tts = KokoroTts::new(model_path, voices_path)
.await
.map_err(|e| format!("model load failed: {e:?}"))?;
self.tts = Some(tts);
self.loaded_model_path = model_path.to_string();
self.loaded_voices_path = voices_path.to_string();
}
let tts = self.tts.as_ref().unwrap();
let voice = select_voice(voice_name);
eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\"");
let (samples, duration) = tts.synth(text, voice)
.await
.map_err(|e| format!("synthesis failed: {e:?}"))?;
eprintln!("[tts] synthesized {} samples in {duration:?}", samples.len());
// Write 24kHz 16-bit mono WAV.
let spec = hound::WavSpec {
channels: 1,
sample_rate: 24000,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = hound::WavWriter::create(output_path, spec)
.map_err(|e| format!("WAV create failed: {e}"))?;
for &sample in &samples {
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
writer.write_sample(s16).map_err(|e| format!("WAV write: {e}"))?;
}
writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?;
eprintln!("[tts] wrote {output_path}");
Ok(serde_json::json!({ "output": output_path }))
}
}
/// Map voice name string to Kokoro Voice enum variant.
fn select_voice(name: &str) -> Voice {
match name {
"af_bella" => Voice::AfBella(1.0),
"af_heart" => Voice::AfHeart(1.0),
"af_jessica" => Voice::AfJessica(1.0),
"af_nicole" => Voice::AfNicole(1.0),
"af_nova" => Voice::AfNova(1.0),
"af_sarah" => Voice::AfSarah(1.0),
"af_sky" => Voice::AfSky(1.0),
"af_river" => Voice::AfRiver(1.0),
"af_alloy" => Voice::AfAlloy(1.0),
"af_aoede" => Voice::AfAoede(1.0),
"af_kore" => Voice::AfKore(1.0),
"am_adam" => Voice::AmAdam(1.0),
"am_echo" => Voice::AmEcho(1.0),
"am_eric" => Voice::AmEric(1.0),
"am_fenrir" => Voice::AmFenrir(1.0),
"am_liam" => Voice::AmLiam(1.0),
"am_michael" => Voice::AmMichael(1.0),
"am_onyx" => Voice::AmOnyx(1.0),
"am_puck" => Voice::AmPuck(1.0),
"bf_alice" => Voice::BfAlice(1.0),
"bf_emma" => Voice::BfEmma(1.0),
"bf_isabella" => Voice::BfIsabella(1.0),
"bf_lily" => Voice::BfLily(1.0),
"bm_daniel" => Voice::BmDaniel(1.0),
"bm_fable" => Voice::BmFable(1.0),
"bm_george" => Voice::BmGeorge(1.0),
"bm_lewis" => Voice::BmLewis(1.0),
_ => {
eprintln!("[tts] unknown voice '{name}', falling back to af_bella");
Voice::AfBella(1.0)
}
}
}

View File

@@ -290,8 +290,9 @@ async fn browser_to_mixer_loop(
.send(RtpPacket {
payload: payload.to_vec(),
payload_type: PT_OPUS,
marker: false,
timestamp: 0,
marker: rtp_packet.header.marker,
seq: rtp_packet.header.sequence_number,
timestamp: rtp_packet.header.timestamp,
})
.await;
}

View File

@@ -197,10 +197,11 @@ pub fn compute_digest_auth(
use crate::Endpoint;
/// Parse the audio media port and connection address from an SDP body.
/// Parse the audio media port, connection address, and preferred codec from an SDP body.
pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
let mut addr: Option<&str> = None;
let mut port: Option<u16> = None;
let mut codec_pt: Option<u8> = None;
let normalized = sdp.replace("\r\n", "\n");
for raw in normalized.split('\n') {
@@ -208,10 +209,16 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
if let Some(rest) = line.strip_prefix("c=IN IP4 ") {
addr = Some(rest.trim());
} else if let Some(rest) = line.strip_prefix("m=audio ") {
// m=audio <port> RTP/AVP <pt1> [<pt2> ...]
let parts: Vec<&str> = rest.split_whitespace().collect();
if !parts.is_empty() {
port = parts[0].parse().ok();
}
// parts[1] is "RTP/AVP" or similar, parts[2..] are payload types.
// The first PT is the preferred codec.
if parts.len() > 2 {
codec_pt = parts[2].parse::<u8>().ok();
}
}
}
@@ -219,6 +226,7 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
(Some(a), Some(p)) => Some(Endpoint {
address: a.to_string(),
port: p,
codec_pt,
}),
_ => None,
}

View File

@@ -9,9 +9,11 @@ pub mod dialog;
pub mod helpers;
pub mod rewrite;
/// Network endpoint (address + port).
/// Network endpoint (address + port + optional negotiated codec).
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Endpoint {
pub address: String,
pub port: u16,
/// First payload type from the SDP `m=audio` line (the preferred codec).
pub codec_pt: Option<u8>,
}

View File

@@ -92,7 +92,7 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option<Endpoint>
.collect();
let original = match (orig_addr, orig_port) {
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p }),
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }),
_ => None,
};

View File

@@ -1,18 +0,0 @@
[package]
name = "tts-engine"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "tts-engine"
path = "src/main.rs"
[dependencies]
kokoro-tts = { version = "0.3", default-features = false }
# Pin to rc.11 matching kokoro-tts's expectation; enable vendored TLS to avoid system libssl-dev.
ort = { version = "=2.0.0-rc.11", default-features = false, features = [
"std", "download-binaries", "copy-dylibs", "ndarray",
"tls-native-vendored"
] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
hound = "3.5"

View File

@@ -1,149 +0,0 @@
/// TTS engine CLI — synthesizes text to a WAV file using Kokoro neural TTS.
///
/// Usage:
/// echo "Hello world" | tts-engine --model kokoro-v1.0.onnx --voices voices.bin --output out.wav
/// tts-engine --model kokoro-v1.0.onnx --voices voices.bin --output out.wav --text "Hello world"
///
/// Outputs 24kHz 16-bit mono WAV.
use kokoro_tts::{KokoroTts, Voice};
use std::io::{self, Read};
fn parse_args() -> Result<(String, String, String, String, Option<String>), String> {
let args: Vec<String> = std::env::args().collect();
let mut model = String::new();
let mut voices = String::new();
let mut output = String::new();
let mut text: Option<String> = None;
let mut voice_name: Option<String> = None;
let mut i = 1;
while i < args.len() {
match args[i].as_str() {
"--model" => { i += 1; model = args.get(i).cloned().unwrap_or_default(); }
"--voices" => { i += 1; voices = args.get(i).cloned().unwrap_or_default(); }
"--output" | "--output_file" => { i += 1; output = args.get(i).cloned().unwrap_or_default(); }
"--text" => { i += 1; text = args.get(i).cloned(); }
"--voice" => { i += 1; voice_name = args.get(i).cloned(); }
_ => {}
}
i += 1;
}
if model.is_empty() { return Err("--model required".into()); }
if voices.is_empty() { return Err("--voices required".into()); }
if output.is_empty() { return Err("--output required".into()); }
let voice_str = voice_name.unwrap_or_else(|| "af_bella".into());
Ok((model, voices, output, voice_str, text))
}
fn select_voice(name: &str) -> Voice {
match name {
"af_bella" => Voice::AfBella(1.0),
"af_heart" => Voice::AfHeart(1.0),
"af_jessica" => Voice::AfJessica(1.0),
"af_nicole" => Voice::AfNicole(1.0),
"af_nova" => Voice::AfNova(1.0),
"af_sarah" => Voice::AfSarah(1.0),
"af_sky" => Voice::AfSky(1.0),
"af_river" => Voice::AfRiver(1.0),
"af_alloy" => Voice::AfAlloy(1.0),
"af_aoede" => Voice::AfAoede(1.0),
"af_kore" => Voice::AfKore(1.0),
"am_adam" => Voice::AmAdam(1.0),
"am_echo" => Voice::AmEcho(1.0),
"am_eric" => Voice::AmEric(1.0),
"am_fenrir" => Voice::AmFenrir(1.0),
"am_liam" => Voice::AmLiam(1.0),
"am_michael" => Voice::AmMichael(1.0),
"am_onyx" => Voice::AmOnyx(1.0),
"am_puck" => Voice::AmPuck(1.0),
"bf_alice" => Voice::BfAlice(1.0),
"bf_emma" => Voice::BfEmma(1.0),
"bf_isabella" => Voice::BfIsabella(1.0),
"bf_lily" => Voice::BfLily(1.0),
"bm_daniel" => Voice::BmDaniel(1.0),
"bm_fable" => Voice::BmFable(1.0),
"bm_george" => Voice::BmGeorge(1.0),
"bm_lewis" => Voice::BmLewis(1.0),
_ => {
eprintln!("[tts-engine] unknown voice '{}', falling back to af_bella", name);
Voice::AfBella(1.0)
}
}
}
#[tokio::main]
async fn main() {
let (model_path, voices_path, output_path, voice_name, text_arg) = match parse_args() {
Ok(v) => v,
Err(e) => {
eprintln!("Error: {}", e);
eprintln!("Usage: tts-engine --model <model.onnx> --voices <voices.bin> --output <output.wav> [--text <text>] [--voice <voice_name>]");
std::process::exit(1);
}
};
// Get text from --text arg or stdin.
let text = match text_arg {
Some(t) => t,
None => {
let mut buf = String::new();
io::stdin().read_to_string(&mut buf).expect("failed to read stdin");
buf.trim().to_string()
}
};
if text.is_empty() {
eprintln!("[tts-engine] no text provided");
std::process::exit(1);
}
eprintln!("[tts-engine] loading model: {}", model_path);
let tts = match KokoroTts::new(&model_path, &voices_path).await {
Ok(t) => t,
Err(e) => {
eprintln!("[tts-engine] failed to load model: {:?}", e);
std::process::exit(1);
}
};
let voice = select_voice(&voice_name);
eprintln!("[tts-engine] synthesizing with voice '{}': \"{}\"", voice_name, text);
let (samples, duration) = match tts.synth(&text, voice).await {
Ok(r) => r,
Err(e) => {
eprintln!("[tts-engine] synthesis failed: {:?}", e);
std::process::exit(1);
}
};
eprintln!("[tts-engine] synthesized {} samples in {:?}", samples.len(), duration);
// Write WAV: 24kHz, 16-bit, mono (same format announcement.ts expects).
let spec = hound::WavSpec {
channels: 1,
sample_rate: 24000,
bits_per_sample: 16,
sample_format: hound::SampleFormat::Int,
};
let mut writer = match hound::WavWriter::create(&output_path, spec) {
Ok(w) => w,
Err(e) => {
eprintln!("[tts-engine] failed to create WAV: {}", e);
std::process::exit(1);
}
};
for &sample in &samples {
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
writer.write_sample(s16).unwrap();
}
writer.finalize().unwrap();
eprintln!("[tts-engine] wrote {}", output_path);
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.15.0',
version: '1.17.2',
description: 'undefined'
}

View File

@@ -1,59 +1,22 @@
/**
* TTS announcement module — pre-generates audio announcements using espeak-ng
* and caches them as encoded RTP packets for playback during call setup.
* TTS announcement module — generates announcement WAV files at startup.
*
* On startup, generates the announcement WAV via espeak-ng (formant-based TTS
* with highly accurate pronunciation), encodes each 20ms frame to G.722 (for
* SIP) and Opus (for WebRTC) via the Rust transcoder, and caches the packets.
* Engine priority: espeak-ng (formant TTS, fast) → Kokoro neural TTS via
* proxy-engine → disabled.
*
* Falls back to the Rust tts-engine (Kokoro neural TTS) if espeak-ng is not
* installed, and disables announcements if neither is available.
* The generated WAV is left on disk for Rust's audio_player / start_interaction
* to play during calls. No encoding or RTP playback happens in TypeScript.
*/
import { execSync } from 'node:child_process';
import fs from 'node:fs';
import path from 'node:path';
import { Buffer } from 'node:buffer';
import { encodePcm, isCodecReady } from './opusbridge.ts';
/** RTP clock increment per 20ms frame for each codec. */
function rtpClockIncrement(pt: number): number {
if (pt === 111) return 960;
if (pt === 9) return 160;
return 160;
}
/** Build a fresh RTP header. */
function buildRtpHeader(pt: number, seq: number, ts: number, ssrc: number, marker: boolean): Buffer {
const hdr = Buffer.alloc(12);
hdr[0] = 0x80;
hdr[1] = (marker ? 0x80 : 0) | (pt & 0x7f);
hdr.writeUInt16BE(seq & 0xffff, 2);
hdr.writeUInt32BE(ts >>> 0, 4);
hdr.writeUInt32BE(ssrc >>> 0, 8);
return hdr;
}
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** A pre-encoded announcement ready for RTP playback. */
export interface IAnnouncementCache {
/** G.722 encoded frames (each is a 20ms frame payload, no RTP header). */
g722Frames: Buffer[];
/** Opus encoded frames for WebRTC playback. */
opusFrames: Buffer[];
/** Total duration in milliseconds. */
durationMs: number;
}
import { sendProxyCommand, isProxyReady } from './proxybridge.ts';
// ---------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------
let cachedAnnouncement: IAnnouncementCache | null = null;
const TTS_DIR = path.join(process.cwd(), '.nogit', 'tts');
const ANNOUNCEMENT_TEXT = "Hello. I'm connecting your call now.";
const CACHE_WAV = path.join(TTS_DIR, 'announcement.wav');
@@ -64,12 +27,10 @@ const KOKORO_VOICES = 'voices.bin';
const KOKORO_VOICE = 'af_bella';
// ---------------------------------------------------------------------------
// Initialization
// TTS generators
// ---------------------------------------------------------------------------
/**
* Check if espeak-ng is available on the system.
*/
/** Check if espeak-ng is available on the system. */
function isEspeakAvailable(): boolean {
try {
execSync('which espeak-ng', { stdio: 'pipe' });
@@ -79,10 +40,7 @@ function isEspeakAvailable(): boolean {
}
}
/**
* Generate announcement WAV via espeak-ng (primary engine).
* Returns true on success.
*/
/** Generate announcement WAV via espeak-ng (primary engine). */
function generateViaEspeak(wavPath: string, text: string, log: (msg: string) => void): boolean {
log('[tts] generating announcement audio via espeak-ng...');
try {
@@ -98,11 +56,8 @@ function generateViaEspeak(wavPath: string, text: string, log: (msg: string) =>
}
}
/**
* Generate announcement WAV via Kokoro TTS (fallback engine).
* Returns true on success.
*/
function generateViaKokoro(wavPath: string, text: string, log: (msg: string) => void): boolean {
/** Generate announcement WAV via Kokoro TTS (fallback, runs inside proxy-engine). */
async function generateViaKokoro(wavPath: string, text: string, log: (msg: string) => void): Promise<boolean> {
const modelPath = path.join(TTS_DIR, KOKORO_MODEL);
const voicesPath = path.join(TTS_DIR, KOKORO_VOICES);
@@ -111,25 +66,21 @@ function generateViaKokoro(wavPath: string, text: string, log: (msg: string) =>
return false;
}
const root = process.cwd();
const ttsBinPaths = [
path.join(root, 'dist_rust', 'tts-engine'),
path.join(root, 'rust', 'target', 'release', 'tts-engine'),
path.join(root, 'rust', 'target', 'debug', 'tts-engine'),
];
const ttsBin = ttsBinPaths.find((p) => fs.existsSync(p));
if (!ttsBin) {
log('[tts] tts-engine binary not found — Kokoro fallback unavailable');
if (!isProxyReady()) {
log('[tts] proxy-engine not ready — Kokoro fallback unavailable');
return false;
}
log('[tts] generating announcement audio via Kokoro TTS (fallback)...');
try {
execSync(
`"${ttsBin}" --model "${modelPath}" --voices "${voicesPath}" --voice "${KOKORO_VOICE}" --output "${wavPath}" --text "${text}"`,
{ timeout: 120000, stdio: 'pipe' },
);
log('[tts] Kokoro WAV generated');
await sendProxyCommand('generate_tts', {
model: modelPath,
voices: voicesPath,
voice: KOKORO_VOICE,
text,
output: wavPath,
});
log('[tts] Kokoro WAV generated (via proxy-engine)');
return true;
} catch (e: any) {
log(`[tts] Kokoro failed: ${e.message}`);
@@ -137,40 +88,13 @@ function generateViaKokoro(wavPath: string, text: string, log: (msg: string) =>
}
}
/**
* Read a WAV file and detect its sample rate from the fmt chunk.
* Returns { pcm, sampleRate } or null on failure.
*/
function readWavWithRate(wavPath: string): { pcm: Buffer; sampleRate: number } | null {
const wav = fs.readFileSync(wavPath);
if (wav.length < 44) return null;
if (wav.toString('ascii', 0, 4) !== 'RIFF') return null;
if (wav.toString('ascii', 8, 12) !== 'WAVE') return null;
let sampleRate = 22050; // default
let offset = 12;
let pcm: Buffer | null = null;
while (offset < wav.length - 8) {
const chunkId = wav.toString('ascii', offset, offset + 4);
const chunkSize = wav.readUInt32LE(offset + 4);
if (chunkId === 'fmt ') {
sampleRate = wav.readUInt32LE(offset + 12);
}
if (chunkId === 'data') {
pcm = wav.subarray(offset + 8, offset + 8 + chunkSize);
}
offset += 8 + chunkSize;
if (offset % 2 !== 0) offset++;
}
if (!pcm) return null;
return { pcm, sampleRate };
}
// ---------------------------------------------------------------------------
// Initialization
// ---------------------------------------------------------------------------
/**
* Pre-generate the announcement audio and encode to G.722 + Opus frames.
* Must be called after the codec bridge is initialized.
* Pre-generate the announcement WAV file.
* Must be called after the proxy engine is initialized.
*
* Engine priority: espeak-ng → Kokoro → disabled.
*/
@@ -178,7 +102,6 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
fs.mkdirSync(TTS_DIR, { recursive: true });
try {
// Generate WAV if not cached.
if (!fs.existsSync(CACHE_WAV)) {
let generated = false;
@@ -189,9 +112,9 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
log('[tts] espeak-ng not installed — trying Kokoro fallback');
}
// Fall back to Kokoro.
// Fall back to Kokoro (via proxy-engine).
if (!generated) {
generated = generateViaKokoro(CACHE_WAV, ANNOUNCEMENT_TEXT, log);
generated = await generateViaKokoro(CACHE_WAV, ANNOUNCEMENT_TEXT, log);
}
if (!generated) {
@@ -200,49 +123,7 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
}
}
// Read WAV and extract raw PCM + sample rate.
const result = readWavWithRate(CACHE_WAV);
if (!result) {
log('[tts] failed to parse WAV file');
return false;
}
const { pcm, sampleRate } = result;
// Wait for codec bridge to be ready.
if (!isCodecReady()) {
log('[tts] codec bridge not ready — will retry');
return false;
}
// Encode in 20ms chunks. The Rust encoder resamples to each codec's native rate.
const FRAME_SAMPLES = Math.floor(sampleRate * 0.02);
const FRAME_BYTES = FRAME_SAMPLES * 2; // 16-bit = 2 bytes per sample
const totalFrames = Math.floor(pcm.length / FRAME_BYTES);
const g722Frames: Buffer[] = [];
const opusFrames: Buffer[] = [];
log(`[tts] encoding ${totalFrames} frames (${FRAME_SAMPLES} samples/frame @ ${sampleRate}Hz)...`);
for (let i = 0; i < totalFrames; i++) {
const framePcm = pcm.subarray(i * FRAME_BYTES, (i + 1) * FRAME_BYTES);
const pcmBuf = Buffer.from(framePcm);
const [g722, opus] = await Promise.all([
encodePcm(pcmBuf, sampleRate, 9), // G.722 for SIP devices
encodePcm(pcmBuf, sampleRate, 111), // Opus for WebRTC browsers
]);
if (g722) g722Frames.push(g722);
if (opus) opusFrames.push(opus);
if (!g722 && !opus && i < 3) log(`[tts] frame ${i} encode failed`);
}
cachedAnnouncement = {
g722Frames,
opusFrames,
durationMs: totalFrames * 20,
};
log(`[tts] announcement cached: ${g722Frames.length} frames (${(totalFrames * 20 / 1000).toFixed(1)}s)`);
log('[tts] announcement WAV ready');
return true;
} catch (e: any) {
log(`[tts] init error: ${e.message}`);
@@ -250,100 +131,7 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
}
}
// ---------------------------------------------------------------------------
// Playback
// ---------------------------------------------------------------------------
/**
* Play the pre-cached announcement to an RTP endpoint.
*
* @param sendPacket - function to send a raw RTP packet
* @param ssrc - SSRC to use in RTP headers
* @param onDone - called when the announcement finishes
* @returns a cancel function, or null if no announcement is cached
*/
export function playAnnouncement(
sendPacket: (pkt: Buffer) => void,
ssrc: number,
onDone?: () => void,
): (() => void) | null {
if (!cachedAnnouncement || cachedAnnouncement.g722Frames.length === 0) {
onDone?.();
return null;
}
const frames = cachedAnnouncement.g722Frames;
const PT = 9; // G.722
let frameIdx = 0;
let seq = Math.floor(Math.random() * 0xffff);
let rtpTs = Math.floor(Math.random() * 0xffffffff);
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, seq & 0xffff, rtpTs >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendPacket(pkt);
seq++;
rtpTs += rtpClockIncrement(PT);
frameIdx++;
}, 20);
// Return cancel function.
return () => clearInterval(timer);
/** Get the path to the cached announcement WAV, or null if not generated. */
export function getAnnouncementWavPath(): string | null {
return fs.existsSync(CACHE_WAV) ? CACHE_WAV : null;
}
/**
* Play pre-cached Opus announcement to a WebRTC PeerConnection sender.
*
* @param sendRtpPacket - function to send a raw RTP packet via sender.sendRtp()
* @param ssrc - SSRC to use in RTP headers
* @param onDone - called when announcement finishes
* @returns cancel function, or null if no announcement cached
*/
export function playAnnouncementToWebRtc(
sendRtpPacket: (pkt: Buffer) => void,
ssrc: number,
counters: { seq: number; ts: number },
onDone?: () => void,
): (() => void) | null {
if (!cachedAnnouncement || cachedAnnouncement.opusFrames.length === 0) {
onDone?.();
return null;
}
const frames = cachedAnnouncement.opusFrames;
const PT = 111; // Opus
let frameIdx = 0;
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, counters.seq & 0xffff, counters.ts >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendRtpPacket(pkt);
counters.seq++;
counters.ts += 960; // Opus at 48kHz: 960 samples per 20ms
frameIdx++;
}, 20);
return () => clearInterval(timer);
}
/** Check if an announcement is cached and ready. */
export function isAnnouncementReady(): boolean {
return cachedAnnouncement !== null && cachedAnnouncement.g722Frames.length > 0;
}

View File

@@ -1,55 +1,31 @@
/**
* PromptCache — manages multiple named audio prompts for IVR and voicemail.
* PromptCache — manages named audio prompt WAV files for IVR and voicemail.
*
* Each prompt is pre-encoded as both G.722 frames (for SIP legs) and Opus
* frames (for WebRTC legs), ready for 20ms RTP playback.
* Generates WAV files via espeak-ng (primary) or Kokoro TTS through the
* proxy-engine (fallback). Also supports loading pre-existing WAV files
* and programmatic tone generation.
*
* Supports three sources:
* 1. TTS generation via espeak-ng (primary) or Kokoro (fallback)
* 2. Loading from a pre-existing WAV file
* 3. Programmatic tone generation (beep, etc.)
*
* The existing announcement.ts system continues to work independently;
* this module provides generalized prompt management for IVR/voicemail.
* All audio playback happens in Rust (audio_player / start_interaction).
* This module only manages WAV files on disk.
*/
import { execSync } from 'node:child_process';
import fs from 'node:fs';
import path from 'node:path';
import { Buffer } from 'node:buffer';
import { encodePcm, isCodecReady } from '../opusbridge.ts';
/** RTP clock increment per 20ms frame for each codec. */
function rtpClockIncrement(pt: number): number {
if (pt === 111) return 960;
if (pt === 9) return 160;
return 160;
}
/** Build a fresh RTP header. */
function buildRtpHeader(pt: number, seq: number, ts: number, ssrc: number, marker: boolean): Buffer {
const hdr = Buffer.alloc(12);
hdr[0] = 0x80;
hdr[1] = (marker ? 0x80 : 0) | (pt & 0x7f);
hdr.writeUInt16BE(seq & 0xffff, 2);
hdr.writeUInt32BE(ts >>> 0, 4);
hdr.writeUInt32BE(ssrc >>> 0, 8);
return hdr;
}
import { sendProxyCommand, isProxyReady } from '../proxybridge.ts';
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
/** A pre-encoded prompt ready for RTP playback. */
/** A cached prompt — just a WAV file path and metadata. */
export interface ICachedPrompt {
/** Unique prompt identifier. */
id: string;
/** G.722 encoded frames (20ms each, no RTP header). */
g722Frames: Buffer[];
/** Opus encoded frames (20ms each, no RTP header). */
opusFrames: Buffer[];
/** Total duration in milliseconds. */
/** Path to the WAV file on disk. */
wavPath: string;
/** Total duration in milliseconds (approximate, from WAV header). */
durationMs: number;
}
@@ -82,84 +58,61 @@ function generateViaEspeak(wavPath: string, text: string): boolean {
}
}
/** Generate WAV via Kokoro TTS. */
function generateViaKokoro(wavPath: string, text: string, voice: string): boolean {
/** Generate WAV via Kokoro TTS (runs inside proxy-engine). */
async function generateViaKokoro(wavPath: string, text: string, voice: string): Promise<boolean> {
const modelPath = path.join(TTS_DIR, 'kokoro-v1.0.onnx');
const voicesPath = path.join(TTS_DIR, 'voices.bin');
if (!fs.existsSync(modelPath) || !fs.existsSync(voicesPath)) return false;
const root = process.cwd();
const ttsBin = [
path.join(root, 'dist_rust', 'tts-engine'),
path.join(root, 'rust', 'target', 'release', 'tts-engine'),
path.join(root, 'rust', 'target', 'debug', 'tts-engine'),
].find((p) => fs.existsSync(p));
if (!ttsBin) return false;
if (!isProxyReady()) return false;
try {
execSync(
`"${ttsBin}" --model "${modelPath}" --voices "${voicesPath}" --voice "${voice}" --output "${wavPath}" --text "${text}"`,
{ timeout: 120000, stdio: 'pipe' },
);
await sendProxyCommand('generate_tts', {
model: modelPath,
voices: voicesPath,
voice,
text,
output: wavPath,
});
return true;
} catch {
return false;
}
}
/** Read a WAV file and return raw PCM + sample rate. */
function readWavWithRate(wavPath: string): { pcm: Buffer; sampleRate: number } | null {
const wav = fs.readFileSync(wavPath);
if (wav.length < 44) return null;
if (wav.toString('ascii', 0, 4) !== 'RIFF') return null;
if (wav.toString('ascii', 8, 12) !== 'WAVE') return null;
/** Read a WAV file's duration from its header. */
function getWavDurationMs(wavPath: string): number {
try {
const wav = fs.readFileSync(wavPath);
if (wav.length < 44) return 0;
if (wav.toString('ascii', 0, 4) !== 'RIFF') return 0;
let sampleRate = 22050;
let pcm: Buffer | null = null;
let offset = 12;
let sampleRate = 16000;
let dataSize = 0;
let bitsPerSample = 16;
let channels = 1;
let offset = 12;
while (offset < wav.length - 8) {
const chunkId = wav.toString('ascii', offset, offset + 4);
const chunkSize = wav.readUInt32LE(offset + 4);
if (chunkId === 'fmt ') {
sampleRate = wav.readUInt32LE(offset + 12);
while (offset < wav.length - 8) {
const chunkId = wav.toString('ascii', offset, offset + 4);
const chunkSize = wav.readUInt32LE(offset + 4);
if (chunkId === 'fmt ') {
channels = wav.readUInt16LE(offset + 10);
sampleRate = wav.readUInt32LE(offset + 12);
bitsPerSample = wav.readUInt16LE(offset + 22);
}
if (chunkId === 'data') {
dataSize = chunkSize;
}
offset += 8 + chunkSize;
if (offset % 2 !== 0) offset++;
}
if (chunkId === 'data') {
pcm = wav.subarray(offset + 8, offset + 8 + chunkSize);
}
offset += 8 + chunkSize;
if (offset % 2 !== 0) offset++;
const bytesPerSample = (bitsPerSample / 8) * channels;
const totalSamples = bytesPerSample > 0 ? dataSize / bytesPerSample : 0;
return sampleRate > 0 ? Math.round((totalSamples / sampleRate) * 1000) : 0;
} catch {
return 0;
}
return pcm ? { pcm, sampleRate } : null;
}
/** Encode raw PCM frames to G.722 + Opus. */
async function encodePcmFrames(
pcm: Buffer,
sampleRate: number,
log: (msg: string) => void,
): Promise<{ g722Frames: Buffer[]; opusFrames: Buffer[] } | null> {
if (!isCodecReady()) return null;
const frameSamples = Math.floor(sampleRate * 0.02); // 20ms
const frameBytes = frameSamples * 2; // 16-bit
const totalFrames = Math.floor(pcm.length / frameBytes);
const g722Frames: Buffer[] = [];
const opusFrames: Buffer[] = [];
for (let i = 0; i < totalFrames; i++) {
const framePcm = Buffer.from(pcm.subarray(i * frameBytes, (i + 1) * frameBytes));
const [g722, opus] = await Promise.all([
encodePcm(framePcm, sampleRate, 9), // G.722
encodePcm(framePcm, sampleRate, 111), // Opus
]);
if (g722) g722Frames.push(g722);
if (opus) opusFrames.push(opus);
}
return { g722Frames, opusFrames };
}
// ---------------------------------------------------------------------------
@@ -195,7 +148,7 @@ export class PromptCache {
}
/**
* Generate a TTS prompt and cache it.
* Generate a TTS prompt WAV and cache its path.
* Uses espeak-ng (primary) or Kokoro (fallback).
*/
async generatePrompt(id: string, text: string, voice = 'af_bella'): Promise<ICachedPrompt | null> {
@@ -207,14 +160,14 @@ export class PromptCache {
this.espeakAvailable = isEspeakAvailable();
}
// Generate WAV.
let generated = false;
// Generate WAV if not already on disk.
if (!fs.existsSync(wavPath)) {
let generated = false;
if (this.espeakAvailable) {
generated = generateViaEspeak(wavPath, text);
}
if (!generated) {
generated = generateViaKokoro(wavPath, text, voice);
generated = await generateViaKokoro(wavPath, text, voice);
}
if (!generated) {
this.log(`[prompt-cache] failed to generate TTS for "${id}"`);
@@ -223,49 +176,22 @@ export class PromptCache {
this.log(`[prompt-cache] generated WAV for "${id}"`);
}
return this.loadWavPrompt(id, wavPath);
return this.registerWav(id, wavPath);
}
/**
* Load a WAV file as a prompt and cache it.
* Load a pre-existing WAV file as a prompt.
*/
async loadWavPrompt(id: string, wavPath: string): Promise<ICachedPrompt | null> {
if (!fs.existsSync(wavPath)) {
this.log(`[prompt-cache] WAV not found: ${wavPath}`);
return null;
}
const result = readWavWithRate(wavPath);
if (!result) {
this.log(`[prompt-cache] failed to parse WAV: ${wavPath}`);
return null;
}
const encoded = await encodePcmFrames(result.pcm, result.sampleRate, this.log);
if (!encoded) {
this.log(`[prompt-cache] encoding failed for "${id}" (codec bridge not ready?)`);
return null;
}
const durationMs = encoded.g722Frames.length * 20;
const prompt: ICachedPrompt = {
id,
g722Frames: encoded.g722Frames,
opusFrames: encoded.opusFrames,
durationMs,
};
this.prompts.set(id, prompt);
this.log(`[prompt-cache] cached "${id}": ${encoded.g722Frames.length} frames (${(durationMs / 1000).toFixed(1)}s)`);
return prompt;
return this.registerWav(id, wavPath);
}
/**
* Generate a beep tone prompt (sine wave).
* @param id - prompt ID
* @param freqHz - tone frequency (default 1000 Hz)
* @param durationMs - tone duration (default 500ms)
* @param amplitude - 16-bit amplitude (default 8000)
* Generate a beep tone WAV and cache it.
*/
async generateBeep(
id: string,
@@ -273,149 +199,77 @@ export class PromptCache {
durationMs = 500,
amplitude = 8000,
): Promise<ICachedPrompt | null> {
// Generate at 16kHz for decent quality.
const sampleRate = 16000;
const totalSamples = Math.floor((sampleRate * durationMs) / 1000);
const pcm = Buffer.alloc(totalSamples * 2);
fs.mkdirSync(TTS_DIR, { recursive: true });
const wavPath = path.join(TTS_DIR, `prompt-${id}.wav`);
for (let i = 0; i < totalSamples; i++) {
const t = i / sampleRate;
// Apply a short fade-in/fade-out to avoid click artifacts.
const fadeLen = Math.floor(sampleRate * 0.01); // 10ms fade
let envelope = 1.0;
if (i < fadeLen) envelope = i / fadeLen;
else if (i > totalSamples - fadeLen) envelope = (totalSamples - i) / fadeLen;
if (!fs.existsSync(wavPath)) {
// Generate 16kHz 16-bit mono sine wave WAV.
const sampleRate = 16000;
const totalSamples = Math.floor((sampleRate * durationMs) / 1000);
const pcm = Buffer.alloc(totalSamples * 2);
const sample = Math.round(Math.sin(2 * Math.PI * freqHz * t) * amplitude * envelope);
pcm.writeInt16LE(Math.max(-32768, Math.min(32767, sample)), i * 2);
for (let i = 0; i < totalSamples; i++) {
const t = i / sampleRate;
const fadeLen = Math.floor(sampleRate * 0.01); // 10ms fade
let envelope = 1.0;
if (i < fadeLen) envelope = i / fadeLen;
else if (i > totalSamples - fadeLen) envelope = (totalSamples - i) / fadeLen;
const sample = Math.round(Math.sin(2 * Math.PI * freqHz * t) * amplitude * envelope);
pcm.writeInt16LE(Math.max(-32768, Math.min(32767, sample)), i * 2);
}
// Write WAV file.
const headerSize = 44;
const dataSize = pcm.length;
const wav = Buffer.alloc(headerSize + dataSize);
// RIFF header
wav.write('RIFF', 0);
wav.writeUInt32LE(36 + dataSize, 4);
wav.write('WAVE', 8);
// fmt chunk
wav.write('fmt ', 12);
wav.writeUInt32LE(16, 16); // chunk size
wav.writeUInt16LE(1, 20); // PCM format
wav.writeUInt16LE(1, 22); // mono
wav.writeUInt32LE(sampleRate, 24);
wav.writeUInt32LE(sampleRate * 2, 28); // byte rate
wav.writeUInt16LE(2, 32); // block align
wav.writeUInt16LE(16, 34); // bits per sample
// data chunk
wav.write('data', 36);
wav.writeUInt32LE(dataSize, 40);
pcm.copy(wav, 44);
fs.writeFileSync(wavPath, wav);
this.log(`[prompt-cache] beep WAV generated for "${id}"`);
}
const encoded = await encodePcmFrames(pcm, sampleRate, this.log);
if (!encoded) {
this.log(`[prompt-cache] beep encoding failed for "${id}"`);
return null;
}
const actualDuration = encoded.g722Frames.length * 20;
const prompt: ICachedPrompt = {
id,
g722Frames: encoded.g722Frames,
opusFrames: encoded.opusFrames,
durationMs: actualDuration,
};
this.prompts.set(id, prompt);
this.log(`[prompt-cache] beep "${id}" cached: ${actualDuration}ms @ ${freqHz}Hz`);
return prompt;
return this.registerWav(id, wavPath);
}
/**
* Remove a prompt from the cache.
*/
/** Remove a prompt from the cache. */
remove(id: string): void {
this.prompts.delete(id);
}
/**
* Clear all cached prompts.
*/
/** Clear all cached prompts. */
clear(): void {
this.prompts.clear();
}
}
// ---------------------------------------------------------------------------
// Standalone playback helpers (for use by SystemLeg)
// ---------------------------------------------------------------------------
// -------------------------------------------------------------------------
// Internal
// -------------------------------------------------------------------------
/**
* Play a cached prompt's G.722 frames as RTP packets at 20ms intervals.
*
* @param prompt - the cached prompt to play
* @param sendPacket - function to send a raw RTP packet (12-byte header + payload)
* @param ssrc - SSRC for RTP headers
* @param onDone - called when playback finishes
* @returns cancel function, or null if prompt has no G.722 frames
*/
export function playPromptG722(
prompt: ICachedPrompt,
sendPacket: (pkt: Buffer) => void,
ssrc: number,
onDone?: () => void,
): (() => void) | null {
if (prompt.g722Frames.length === 0) {
onDone?.();
return null;
private registerWav(id: string, wavPath: string): ICachedPrompt {
const durationMs = getWavDurationMs(wavPath);
const prompt: ICachedPrompt = { id, wavPath, durationMs };
this.prompts.set(id, prompt);
this.log(`[prompt-cache] cached "${id}": ${wavPath} (${(durationMs / 1000).toFixed(1)}s)`);
return prompt;
}
const frames = prompt.g722Frames;
const PT = 9;
let frameIdx = 0;
let seq = Math.floor(Math.random() * 0xffff);
let rtpTs = Math.floor(Math.random() * 0xffffffff);
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, seq & 0xffff, rtpTs >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendPacket(pkt);
seq++;
rtpTs += rtpClockIncrement(PT);
frameIdx++;
}, 20);
return () => clearInterval(timer);
}
/**
* Play a cached prompt's Opus frames as RTP packets at 20ms intervals.
*
* @param prompt - the cached prompt to play
* @param sendPacket - function to send a raw RTP packet
* @param ssrc - SSRC for RTP headers
* @param counters - shared seq/ts counters (mutated in place for seamless transitions)
* @param onDone - called when playback finishes
* @returns cancel function, or null if prompt has no Opus frames
*/
export function playPromptOpus(
prompt: ICachedPrompt,
sendPacket: (pkt: Buffer) => void,
ssrc: number,
counters: { seq: number; ts: number },
onDone?: () => void,
): (() => void) | null {
if (prompt.opusFrames.length === 0) {
onDone?.();
return null;
}
const frames = prompt.opusFrames;
const PT = 111;
let frameIdx = 0;
const timer = setInterval(() => {
if (frameIdx >= frames.length) {
clearInterval(timer);
onDone?.();
return;
}
const payload = frames[frameIdx];
const hdr = buildRtpHeader(PT, counters.seq & 0xffff, counters.ts >>> 0, ssrc >>> 0, frameIdx === 0);
const pkt = Buffer.concat([hdr, payload]);
sendPacket(pkt);
counters.seq++;
counters.ts += 960; // Opus 48kHz: 960 samples per 20ms
frameIdx++;
}, 20);
return () => clearInterval(timer);
}

View File

@@ -1,199 +0,0 @@
/**
* Audio transcoding bridge — uses smartrust to communicate with the Rust
* opus-codec binary, which handles Opus ↔ G.722 ↔ PCMU/PCMA transcoding.
*
* All codec conversion happens in Rust (libopus + SpanDSP G.722 port).
* The TypeScript side just passes raw payloads back and forth.
*/
import path from 'node:path';
import { RustBridge } from '@push.rocks/smartrust';
// ---------------------------------------------------------------------------
// Command type map for smartrust
// ---------------------------------------------------------------------------
type TCodecCommands = {
init: {
params: Record<string, never>;
result: Record<string, never>;
};
create_session: {
params: { session_id: string };
result: Record<string, never>;
};
destroy_session: {
params: { session_id: string };
result: Record<string, never>;
};
transcode: {
params: { data_b64: string; from_pt: number; to_pt: number; session_id?: string; direction?: string };
result: { data_b64: string };
};
encode_pcm: {
params: { data_b64: string; sample_rate: number; to_pt: number; session_id?: string };
result: { data_b64: string };
};
};
// ---------------------------------------------------------------------------
// Bridge singleton
// ---------------------------------------------------------------------------
let bridge: RustBridge<TCodecCommands> | null = null;
let initialized = false;
function buildLocalPaths(): string[] {
const root = process.cwd();
return [
path.join(root, 'dist_rust', 'opus-codec'),
path.join(root, 'rust', 'target', 'release', 'opus-codec'),
path.join(root, 'rust', 'target', 'debug', 'opus-codec'),
];
}
let logFn: ((msg: string) => void) | undefined;
/**
* Initialize the audio transcoding bridge. Spawns the Rust binary.
*/
export async function initCodecBridge(log?: (msg: string) => void): Promise<boolean> {
if (initialized && bridge) return true;
logFn = log;
try {
bridge = new RustBridge<TCodecCommands>({
binaryName: 'opus-codec',
localPaths: buildLocalPaths(),
});
const spawned = await bridge.spawn();
if (!spawned) {
log?.('[codec] failed to spawn opus-codec binary');
bridge = null;
return false;
}
// Auto-restart: reset state when the Rust process exits so the next
// transcode attempt triggers re-initialization instead of silent failure.
bridge.on('exit', () => {
logFn?.('[codec] Rust audio transcoder process exited — will re-init on next use');
bridge = null;
initialized = false;
});
await bridge.sendCommand('init', {} as any);
initialized = true;
log?.('[codec] Rust audio transcoder initialized (Opus + G.722 + PCMU/PCMA)');
return true;
} catch (e: any) {
log?.(`[codec] init error: ${e.message}`);
bridge = null;
return false;
}
}
// ---------------------------------------------------------------------------
// Session management — per-call codec isolation
// ---------------------------------------------------------------------------
/**
* Create an isolated codec session. Each session gets its own Opus/G.722
* encoder/decoder state, preventing concurrent calls from corrupting each
* other's stateful codec predictions.
*/
export async function createSession(sessionId: string): Promise<boolean> {
if (!bridge || !initialized) {
// Attempt auto-reinit if bridge died.
const ok = await initCodecBridge(logFn);
if (!ok) return false;
}
try {
await bridge!.sendCommand('create_session', { session_id: sessionId });
return true;
} catch (e: any) {
logFn?.(`[codec] create_session error: ${e?.message || e}`);
return false;
}
}
/**
* Destroy a codec session, freeing its encoder/decoder state.
*/
export async function destroySession(sessionId: string): Promise<void> {
if (!bridge || !initialized) return;
try {
await bridge.sendCommand('destroy_session', { session_id: sessionId });
} catch {
// Best-effort cleanup.
}
}
// ---------------------------------------------------------------------------
// Transcoding
// ---------------------------------------------------------------------------
/**
* Transcode an RTP payload between two codecs.
* All codec work (Opus, G.722, PCMU, PCMA) + resampling happens in Rust.
*
* @param data - raw RTP payload (no header)
* @param fromPT - source payload type (0=PCMU, 8=PCMA, 9=G.722, 111=Opus)
* @param toPT - target payload type
* @param sessionId - optional session for isolated codec state
* @returns transcoded payload, or null on failure
*/
export async function transcode(data: Buffer, fromPT: number, toPT: number, sessionId?: string, direction?: string): Promise<Buffer | null> {
if (!bridge || !initialized) return null;
try {
const params: any = {
data_b64: data.toString('base64'),
from_pt: fromPT,
to_pt: toPT,
};
if (sessionId) params.session_id = sessionId;
if (direction) params.direction = direction;
const result = await bridge.sendCommand('transcode', params);
return Buffer.from(result.data_b64, 'base64');
} catch {
return null;
}
}
/**
* Encode raw 16-bit PCM to a target codec.
* @param pcmData - raw 16-bit LE PCM bytes
* @param sampleRate - input sample rate (e.g. 22050 for Piper TTS)
* @param toPT - target payload type (9=G.722, 111=Opus, 0=PCMU, 8=PCMA)
* @param sessionId - optional session for isolated codec state
*/
export async function encodePcm(pcmData: Buffer, sampleRate: number, toPT: number, sessionId?: string): Promise<Buffer | null> {
if (!bridge || !initialized) return null;
try {
const params: any = {
data_b64: pcmData.toString('base64'),
sample_rate: sampleRate,
to_pt: toPT,
};
if (sessionId) params.session_id = sessionId;
const result = await bridge.sendCommand('encode_pcm', params);
return Buffer.from(result.data_b64, 'base64');
} catch (e: any) {
console.error('[encodePcm] error:', e?.message || e);
return null;
}
}
/** Check if the codec bridge is ready. */
export function isCodecReady(): boolean {
return initialized && bridge !== null;
}
/** Shut down the codec bridge. */
export function shutdownCodecBridge(): void {
if (bridge) {
try { bridge.kill(); } catch { /* ignore */ }
bridge = null;
initialized = false;
}
}

View File

@@ -79,6 +79,10 @@ type TProxyCommands = {
params: { call_id: string; leg_id: string; key: string; value: unknown };
result: Record<string, never>;
};
generate_tts: {
params: { model: string; voices: string; voice: string; text: string; output: string };
result: { output: string };
};
};
// ---------------------------------------------------------------------------
@@ -493,6 +497,15 @@ export function isProxyReady(): boolean {
return initialized && bridge !== null;
}
/** Send an arbitrary command to the proxy engine bridge. */
export async function sendProxyCommand<K extends keyof TProxyCommands>(
method: K,
params: TProxyCommands[K]['params'],
): Promise<TProxyCommands[K]['result']> {
if (!bridge || !initialized) throw new Error('proxy engine not initialized');
return bridge.sendCommand(method as string, params as any) as any;
}
/** Shut down the proxy engine. */
export function shutdownProxyEngine(): void {
if (bridge) {

View File

@@ -24,7 +24,6 @@ import {
getAllBrowserDeviceIds,
getBrowserDeviceWs,
} from './webrtcbridge.ts';
import { initCodecBridge } from './opusbridge.ts';
import { initAnnouncement } from './announcement.ts';
import { PromptCache } from './call/prompt-cache.ts';
import { VoiceboxManager } from './voicebox.ts';
@@ -426,9 +425,9 @@ async function startProxyEngine(): Promise<void> {
id: data.leg_id,
type: data.kind,
state: data.state,
codec: null,
rtpPort: null,
remoteMedia: null,
codec: data.codec ?? null,
rtpPort: data.rtpPort ?? null,
remoteMedia: data.remoteMedia ?? null,
metadata: data.metadata || {},
});
}
@@ -523,9 +522,8 @@ async function startProxyEngine(): Promise<void> {
const deviceList = appConfig.devices.map((d) => d.displayName).join(', ');
log(`proxy engine started | LAN ${appConfig.proxy.lanIp}:${appConfig.proxy.lanPort} | providers: ${providerList} | devices: ${deviceList}`);
// Initialize audio codec bridge (still needed for WebRTC transcoding).
// Generate TTS audio (WAV files on disk, played by Rust audio_player).
try {
await initCodecBridge(log);
await initAnnouncement(log);
// Pre-generate prompts.
@@ -547,7 +545,7 @@ async function startProxyEngine(): Promise<void> {
}
log(`[startup] prompts cached: ${promptCache.listIds().join(', ') || 'none'}`);
} catch (e) {
log(`[codec] init failed: ${e}`);
log(`[tts] init failed: ${e}`);
}
}

View File

@@ -3,6 +3,6 @@
*/
export const commitinfo = {
name: 'siprouter',
version: '1.15.0',
version: '1.17.2',
description: 'undefined'
}