Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a02146633b | |||
| f78639dd19 | |||
| 2aca5f1510 | |||
| 73b28f5f57 | |||
| 10ad432a4c | |||
| 66112091a2 |
24
changelog.md
24
changelog.md
@@ -1,5 +1,29 @@
|
||||
# Changelog
|
||||
|
||||
## 2026-04-10 - 1.17.1 - fix(proxy-engine,codec-lib,sip-proto,ts)
|
||||
preserve negotiated media details and improve RTP audio handling across call legs
|
||||
|
||||
- Use native Opus float encode/decode to avoid unnecessary i16 quantization in the f32 audio path.
|
||||
- Parse full RTP headers including extensions and sequence numbers, then sort inbound packets before decoding to keep codec state stable for out-of-order audio.
|
||||
- Capture negotiated codec payload types from SDP offers and answers and include codec, RTP port, remote media, and metadata in leg_added events.
|
||||
- Emit leg_state_changed and leg_removed events more consistently so the dashboard reflects leg lifecycle updates accurately.
|
||||
|
||||
## 2026-04-10 - 1.17.0 - feat(proxy-engine)
|
||||
upgrade the internal audio bus to 48kHz f32 with per-leg denoising and improve SIP leg routing
|
||||
|
||||
- switch mixer, prompt playback, and tool leg audio handling from 16kHz i16 to 48kHz f32 for higher-quality internal processing
|
||||
- add f32 decode/encode and resampling support plus standalone RNNoise denoiser creation in codec-lib
|
||||
- apply per-leg inbound noise suppression in the mixer before mix-minus generation
|
||||
- fix passthrough call routing by matching the actual leg from the signaling source address when Call-IDs are shared
|
||||
- correct dialed number extraction from bare SIP request URIs by parsing the user part directly
|
||||
|
||||
## 2026-04-10 - 1.16.0 - feat(proxy-engine)
|
||||
integrate Kokoro TTS generation into proxy-engine and simplify TypeScript prompt handling to use cached WAV files
|
||||
|
||||
- adds a generate_tts command to proxy-engine with lazy-loaded Kokoro model support and WAV output generation
|
||||
- removes standalone opus-codec and tts-engine workspace binaries by consolidating TTS generation into proxy-engine
|
||||
- updates announcement and prompt cache flows to generate and cache WAV files on disk instead of pre-encoding RTP frames in TypeScript
|
||||
|
||||
## 2026-04-10 - 1.15.0 - feat(proxy-engine)
|
||||
add device leg, leg transfer, and leg replacement call controls
|
||||
|
||||
|
||||
BIN
nogit/voicemail/default/msg-1775840000387.wav
Normal file
BIN
nogit/voicemail/default/msg-1775840000387.wav
Normal file
Binary file not shown.
BIN
nogit/voicemail/default/msg-1775840014276.wav
Normal file
BIN
nogit/voicemail/default/msg-1775840014276.wav
Normal file
Binary file not shown.
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "siprouter",
|
||||
"version": "1.15.0",
|
||||
"version": "1.17.1",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
|
||||
23
rust/Cargo.lock
generated
23
rust/Cargo.lock
generated
@@ -1881,16 +1881,6 @@ dependencies = [
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "opus-codec"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"codec-lib",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ort"
|
||||
version = "2.0.0-rc.11"
|
||||
@@ -2188,6 +2178,9 @@ dependencies = [
|
||||
"base64 0.22.1",
|
||||
"codec-lib",
|
||||
"hound",
|
||||
"kokoro-tts",
|
||||
"nnnoiseless",
|
||||
"ort",
|
||||
"rand 0.8.5",
|
||||
"regex-lite",
|
||||
"serde",
|
||||
@@ -3008,16 +3001,6 @@ dependencies = [
|
||||
"strength_reduce",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tts-engine"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"hound",
|
||||
"kokoro-tts",
|
||||
"ort",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "turn"
|
||||
version = "0.6.1"
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
[workspace]
|
||||
members = [
|
||||
"crates/codec-lib",
|
||||
"crates/opus-codec",
|
||||
"crates/tts-engine",
|
||||
"crates/sip-proto",
|
||||
"crates/proxy-engine",
|
||||
]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
//! Audio codec library for the SIP router.
|
||||
//!
|
||||
//! Handles Opus ↔ G.722 ↔ PCMU/PCMA transcoding with ML noise suppression.
|
||||
//! Used by both the standalone `opus-codec` CLI and the `proxy-engine` binary.
|
||||
//! Used by the `proxy-engine` binary for all audio transcoding.
|
||||
|
||||
use audiopus::coder::{Decoder as OpusDecoder, Encoder as OpusEncoder};
|
||||
use audiopus::packet::Packet as OpusPacket;
|
||||
@@ -104,6 +104,8 @@ pub struct TranscodeState {
|
||||
g722_dec: libg722::decoder::Decoder,
|
||||
/// Cached FFT resamplers keyed by (from_rate, to_rate, chunk_size).
|
||||
resamplers: HashMap<(u32, u32, usize), FftFixedIn<f64>>,
|
||||
/// Cached f32 FFT resamplers keyed by (from_rate, to_rate, chunk_size).
|
||||
resamplers_f32: HashMap<(u32, u32, usize), FftFixedIn<f32>>,
|
||||
/// ML noise suppression for the SIP-bound direction.
|
||||
denoiser_to_sip: Box<DenoiseState<'static>>,
|
||||
/// ML noise suppression for the browser-bound direction.
|
||||
@@ -133,6 +135,7 @@ impl TranscodeState {
|
||||
g722_enc,
|
||||
g722_dec,
|
||||
resamplers: HashMap::new(),
|
||||
resamplers_f32: HashMap::new(),
|
||||
denoiser_to_sip: DenoiseState::new(),
|
||||
denoiser_to_browser: DenoiseState::new(),
|
||||
})
|
||||
@@ -293,6 +296,126 @@ impl TranscodeState {
|
||||
_ => Err(format!("unsupported target PT {pt}")),
|
||||
}
|
||||
}
|
||||
|
||||
// ---- f32 API for high-quality internal bus ----------------------------
|
||||
|
||||
/// Decode an encoded audio payload to f32 PCM samples in [-1.0, 1.0].
|
||||
/// Returns (samples, sample_rate).
|
||||
///
|
||||
/// For Opus, uses native float decode (no i16 quantization).
|
||||
/// For G.722/G.711, decodes to i16 then converts (codec is natively i16).
|
||||
pub fn decode_to_f32(&mut self, data: &[u8], pt: u8) -> Result<(Vec<f32>, u32), String> {
|
||||
match pt {
|
||||
PT_OPUS => {
|
||||
let mut pcm = vec![0.0f32; 5760]; // up to 120ms at 48kHz
|
||||
let packet =
|
||||
OpusPacket::try_from(data).map_err(|e| format!("opus packet: {e}"))?;
|
||||
let out =
|
||||
MutSignals::try_from(&mut pcm[..]).map_err(|e| format!("opus signals: {e}"))?;
|
||||
let n: usize = self
|
||||
.opus_dec
|
||||
.decode_float(Some(packet), out, false)
|
||||
.map_err(|e| format!("opus decode_float: {e}"))?
|
||||
.into();
|
||||
pcm.truncate(n);
|
||||
Ok((pcm, 48000))
|
||||
}
|
||||
_ => {
|
||||
// G.722, PCMU, PCMA: natively i16 codecs — decode then convert.
|
||||
let (pcm_i16, rate) = self.decode_to_pcm(data, pt)?;
|
||||
let pcm_f32 = pcm_i16.iter().map(|&s| s as f32 / 32768.0).collect();
|
||||
Ok((pcm_f32, rate))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Encode f32 PCM samples ([-1.0, 1.0]) to an audio codec.
|
||||
///
|
||||
/// For Opus, uses native float encode (no i16 quantization).
|
||||
/// For G.722/G.711, converts to i16 then encodes (codec is natively i16).
|
||||
pub fn encode_from_f32(&mut self, pcm: &[f32], pt: u8) -> Result<Vec<u8>, String> {
|
||||
match pt {
|
||||
PT_OPUS => {
|
||||
let mut buf = vec![0u8; 4000];
|
||||
let n: usize = self
|
||||
.opus_enc
|
||||
.encode_float(pcm, &mut buf)
|
||||
.map_err(|e| format!("opus encode_float: {e}"))?
|
||||
.into();
|
||||
buf.truncate(n);
|
||||
Ok(buf)
|
||||
}
|
||||
_ => {
|
||||
// G.722, PCMU, PCMA: natively i16 codecs.
|
||||
let pcm_i16: Vec<i16> = pcm
|
||||
.iter()
|
||||
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
|
||||
.collect();
|
||||
self.encode_from_pcm(&pcm_i16, pt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// High-quality sample rate conversion for f32 PCM using rubato FFT resampler.
|
||||
/// Uses a separate cache from the i16 resampler.
|
||||
pub fn resample_f32(
|
||||
&mut self,
|
||||
pcm: &[f32],
|
||||
from_rate: u32,
|
||||
to_rate: u32,
|
||||
) -> Result<Vec<f32>, String> {
|
||||
if from_rate == to_rate || pcm.is_empty() {
|
||||
return Ok(pcm.to_vec());
|
||||
}
|
||||
|
||||
let chunk = pcm.len();
|
||||
let key = (from_rate, to_rate, chunk);
|
||||
|
||||
if !self.resamplers_f32.contains_key(&key) {
|
||||
let r =
|
||||
FftFixedIn::<f32>::new(from_rate as usize, to_rate as usize, chunk, 1, 1)
|
||||
.map_err(|e| format!("resampler f32 {from_rate}->{to_rate}: {e}"))?;
|
||||
self.resamplers_f32.insert(key, r);
|
||||
}
|
||||
let resampler = self.resamplers_f32.get_mut(&key).unwrap();
|
||||
|
||||
let input = vec![pcm.to_vec()];
|
||||
let result = resampler
|
||||
.process(&input, None)
|
||||
.map_err(|e| format!("resample f32 {from_rate}->{to_rate}: {e}"))?;
|
||||
|
||||
Ok(result[0].clone())
|
||||
}
|
||||
|
||||
/// Apply RNNoise ML noise suppression to 48kHz f32 PCM audio.
|
||||
/// Processes in 480-sample (10ms) frames. State persists across calls.
|
||||
/// Operates natively in f32 — no i16 conversion overhead.
|
||||
pub fn denoise_f32(denoiser: &mut DenoiseState, pcm: &[f32]) -> Vec<f32> {
|
||||
let frame_size = DenoiseState::FRAME_SIZE; // 480
|
||||
let total = pcm.len();
|
||||
let whole = (total / frame_size) * frame_size;
|
||||
let mut output = Vec::with_capacity(total);
|
||||
let mut out_buf = [0.0f32; 480];
|
||||
|
||||
// nnnoiseless expects f32 samples scaled as i16 range (-32768..32767).
|
||||
for offset in (0..whole).step_by(frame_size) {
|
||||
let input: Vec<f32> = pcm[offset..offset + frame_size]
|
||||
.iter()
|
||||
.map(|&s| s * 32768.0)
|
||||
.collect();
|
||||
denoiser.process_frame(&mut out_buf, &input);
|
||||
output.extend(out_buf.iter().map(|&s| s / 32768.0));
|
||||
}
|
||||
if whole < total {
|
||||
output.extend_from_slice(&pcm[whole..]);
|
||||
}
|
||||
output
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new standalone denoiser for per-leg inbound processing.
|
||||
pub fn new_denoiser() -> Box<DenoiseState<'static>> {
|
||||
DenoiseState::new()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
[package]
|
||||
name = "opus-codec"
|
||||
version = "0.2.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "opus-codec"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
codec-lib = { path = "../codec-lib" }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
base64 = "0.22"
|
||||
@@ -1,286 +0,0 @@
|
||||
/// Audio transcoding bridge for smartrust.
|
||||
///
|
||||
/// Thin CLI wrapper around `codec-lib`. Handles Opus ↔ G.722 ↔ PCMU transcoding.
|
||||
///
|
||||
/// Protocol:
|
||||
/// -> {"id":"1","method":"init","params":{}}
|
||||
/// <- {"id":"1","success":true,"result":{}}
|
||||
/// -> {"id":"2","method":"create_session","params":{"session_id":"call-abc"}}
|
||||
/// <- {"id":"2","success":true,"result":{}}
|
||||
/// -> {"id":"3","method":"transcode","params":{"session_id":"call-abc","data_b64":"...","from_pt":111,"to_pt":9}}
|
||||
/// <- {"id":"3","success":true,"result":{"data_b64":"..."}}
|
||||
/// -> {"id":"4","method":"destroy_session","params":{"session_id":"call-abc"}}
|
||||
/// <- {"id":"4","success":true,"result":{}}
|
||||
|
||||
use base64::engine::general_purpose::STANDARD as B64;
|
||||
use base64::Engine as _;
|
||||
use codec_lib::{codec_sample_rate, TranscodeState};
|
||||
use serde::Deserialize;
|
||||
use std::collections::HashMap;
|
||||
use std::io::{self, BufRead, Write};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct Request {
|
||||
id: String,
|
||||
method: String,
|
||||
#[serde(default)]
|
||||
params: serde_json::Value,
|
||||
}
|
||||
|
||||
fn respond(
|
||||
out: &mut impl Write,
|
||||
id: &str,
|
||||
success: bool,
|
||||
result: Option<serde_json::Value>,
|
||||
error: Option<&str>,
|
||||
) {
|
||||
let mut resp = serde_json::json!({ "id": id, "success": success });
|
||||
if let Some(r) = result {
|
||||
resp["result"] = r;
|
||||
}
|
||||
if let Some(e) = error {
|
||||
resp["error"] = serde_json::Value::String(e.to_string());
|
||||
}
|
||||
let _ = writeln!(out, "{}", resp);
|
||||
let _ = out.flush();
|
||||
}
|
||||
|
||||
/// Resolve a session: if session_id is provided, look it up in the sessions map;
|
||||
/// otherwise fall back to the default state (backward compat with `init`).
|
||||
fn get_session<'a>(
|
||||
sessions: &'a mut HashMap<String, TranscodeState>,
|
||||
default: &'a mut Option<TranscodeState>,
|
||||
params: &serde_json::Value,
|
||||
) -> Option<&'a mut TranscodeState> {
|
||||
if let Some(sid) = params.get("session_id").and_then(|v| v.as_str()) {
|
||||
sessions.get_mut(sid)
|
||||
} else {
|
||||
default.as_mut()
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let stdin = io::stdin();
|
||||
let stdout = io::stdout();
|
||||
let mut out = io::BufWriter::new(stdout.lock());
|
||||
|
||||
let _ = writeln!(out, r#"{{"event":"ready","data":{{}}}}"#);
|
||||
let _ = out.flush();
|
||||
|
||||
let mut default_state: Option<TranscodeState> = None;
|
||||
let mut sessions: HashMap<String, TranscodeState> = HashMap::new();
|
||||
|
||||
for line in stdin.lock().lines() {
|
||||
let line = match line {
|
||||
Ok(l) if !l.trim().is_empty() => l,
|
||||
Ok(_) => continue,
|
||||
Err(_) => break,
|
||||
};
|
||||
|
||||
let req: Request = match serde_json::from_str(&line) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
respond(&mut out, "", false, None, Some(&format!("parse: {e}")));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match req.method.as_str() {
|
||||
"init" => match TranscodeState::new() {
|
||||
Ok(s) => {
|
||||
default_state = Some(s);
|
||||
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
|
||||
}
|
||||
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
|
||||
},
|
||||
|
||||
"create_session" => {
|
||||
let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s.to_string(),
|
||||
None => {
|
||||
respond(&mut out, &req.id, false, None, Some("missing session_id"));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if sessions.contains_key(&session_id) {
|
||||
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
|
||||
continue;
|
||||
}
|
||||
match TranscodeState::new() {
|
||||
Ok(s) => {
|
||||
sessions.insert(session_id, s);
|
||||
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
|
||||
}
|
||||
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
|
||||
}
|
||||
}
|
||||
|
||||
"destroy_session" => {
|
||||
let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
respond(&mut out, &req.id, false, None, Some("missing session_id"));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
sessions.remove(session_id);
|
||||
respond(&mut out, &req.id, true, Some(serde_json::json!({})), None);
|
||||
}
|
||||
|
||||
"transcode" => {
|
||||
let st = match get_session(&mut sessions, &mut default_state, &req.params) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
false,
|
||||
None,
|
||||
Some("not initialized (no session or default state)"),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
respond(&mut out, &req.id, false, None, Some("missing data_b64"));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let from_pt =
|
||||
req.params.get("from_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
|
||||
let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8;
|
||||
let direction = req.params.get("direction").and_then(|v| v.as_str());
|
||||
|
||||
let data = match B64.decode(data_b64) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
false,
|
||||
None,
|
||||
Some(&format!("b64: {e}")),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match st.transcode(&data, from_pt, to_pt, direction) {
|
||||
Ok(result) => {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
true,
|
||||
Some(serde_json::json!({ "data_b64": B64.encode(&result) })),
|
||||
None,
|
||||
);
|
||||
}
|
||||
Err(e) => respond(&mut out, &req.id, false, None, Some(&e)),
|
||||
}
|
||||
}
|
||||
|
||||
"encode_pcm" => {
|
||||
let st = match get_session(&mut sessions, &mut default_state, &req.params) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
false,
|
||||
None,
|
||||
Some("not initialized (no session or default state)"),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
respond(&mut out, &req.id, false, None, Some("missing data_b64"));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let sample_rate = req
|
||||
.params
|
||||
.get("sample_rate")
|
||||
.and_then(|v| v.as_u64())
|
||||
.unwrap_or(22050) as u32;
|
||||
let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8;
|
||||
|
||||
let data = match B64.decode(data_b64) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
false,
|
||||
None,
|
||||
Some(&format!("b64: {e}")),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if data.len() % 2 != 0 {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
false,
|
||||
None,
|
||||
Some("PCM data has odd byte count (expected 16-bit LE samples)"),
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
let pcm: Vec<i16> = data
|
||||
.chunks_exact(2)
|
||||
.map(|c| i16::from_le_bytes([c[0], c[1]]))
|
||||
.collect();
|
||||
|
||||
let target_rate = codec_sample_rate(to_pt);
|
||||
let resampled = match st.resample(&pcm, sample_rate, target_rate) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
respond(&mut out, &req.id, false, None, Some(&e));
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match st.encode_from_pcm(&resampled, to_pt) {
|
||||
Ok(encoded) => {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
true,
|
||||
Some(serde_json::json!({ "data_b64": B64.encode(&encoded) })),
|
||||
None,
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
respond(&mut out, &req.id, false, None, Some(&e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"encode" | "decode" => {
|
||||
respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
false,
|
||||
None,
|
||||
Some("use 'transcode' command instead"),
|
||||
);
|
||||
}
|
||||
|
||||
_ => respond(
|
||||
&mut out,
|
||||
&req.id,
|
||||
false,
|
||||
None,
|
||||
Some(&format!("unknown: {}", req.method)),
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ path = "src/main.rs"
|
||||
[dependencies]
|
||||
codec-lib = { path = "../codec-lib" }
|
||||
sip-proto = { path = "../sip-proto" }
|
||||
nnnoiseless = { version = "0.5", default-features = false }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde_json = "1"
|
||||
@@ -18,3 +19,8 @@ regex-lite = "0.1"
|
||||
webrtc = "0.8"
|
||||
rand = "0.8"
|
||||
hound = "3.5"
|
||||
kokoro-tts = { version = "0.3", default-features = false }
|
||||
ort = { version = "=2.0.0-rc.11", default-features = false, features = [
|
||||
"std", "download-binaries", "copy-dylibs", "ndarray",
|
||||
"tls-native-vendored"
|
||||
] }
|
||||
|
||||
@@ -10,9 +10,9 @@ use tokio::net::UdpSocket;
|
||||
use tokio::time::{self, Duration};
|
||||
|
||||
/// Mixing sample rate used by the mixer (must stay in sync with mixer::MIX_RATE).
|
||||
const MIX_RATE: u32 = 16000;
|
||||
const MIX_RATE: u32 = 48000;
|
||||
/// Samples per 20ms frame at the mixing rate.
|
||||
const MIX_FRAME_SIZE: usize = 320;
|
||||
const MIX_FRAME_SIZE: usize = 960;
|
||||
|
||||
/// Play a WAV file as RTP to a destination.
|
||||
/// Returns when playback is complete.
|
||||
@@ -178,9 +178,9 @@ pub async fn play_beep(
|
||||
Ok((seq, ts))
|
||||
}
|
||||
|
||||
/// Load a WAV file and split it into 20ms PCM frames at 16kHz.
|
||||
/// Load a WAV file and split it into 20ms f32 PCM frames at 48kHz.
|
||||
/// Used by the leg interaction system to prepare prompt audio for the mixer.
|
||||
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
|
||||
pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<f32>>, String> {
|
||||
let path = Path::new(wav_path);
|
||||
if !path.exists() {
|
||||
return Err(format!("WAV file not found: {wav_path}"));
|
||||
@@ -191,17 +191,17 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
|
||||
let spec = reader.spec();
|
||||
let wav_rate = spec.sample_rate;
|
||||
|
||||
// Read all samples as i16.
|
||||
let samples: Vec<i16> = if spec.bits_per_sample == 16 {
|
||||
// Read all samples as f32 in [-1.0, 1.0].
|
||||
let samples: Vec<f32> = if spec.bits_per_sample == 16 {
|
||||
reader
|
||||
.samples::<i16>()
|
||||
.filter_map(|s| s.ok())
|
||||
.map(|s| s as f32 / 32768.0)
|
||||
.collect()
|
||||
} else if spec.bits_per_sample == 32 && spec.sample_format == hound::SampleFormat::Float {
|
||||
reader
|
||||
.samples::<f32>()
|
||||
.filter_map(|s| s.ok())
|
||||
.map(|s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
|
||||
.collect()
|
||||
} else {
|
||||
return Err(format!(
|
||||
@@ -214,24 +214,24 @@ pub fn load_prompt_pcm_frames(wav_path: &str) -> Result<Vec<Vec<i16>>, String> {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
// Resample to MIX_RATE (16kHz) if needed.
|
||||
// Resample to MIX_RATE (48kHz) if needed.
|
||||
let resampled = if wav_rate != MIX_RATE {
|
||||
let mut transcoder = TranscodeState::new().map_err(|e| format!("codec init: {e}"))?;
|
||||
transcoder
|
||||
.resample(&samples, wav_rate, MIX_RATE)
|
||||
.resample_f32(&samples, wav_rate, MIX_RATE)
|
||||
.map_err(|e| format!("resample: {e}"))?
|
||||
} else {
|
||||
samples
|
||||
};
|
||||
|
||||
// Split into MIX_FRAME_SIZE (320) sample frames.
|
||||
// Split into MIX_FRAME_SIZE (960) sample frames.
|
||||
let mut frames = Vec::new();
|
||||
let mut offset = 0;
|
||||
while offset < resampled.len() {
|
||||
let end = (offset + MIX_FRAME_SIZE).min(resampled.len());
|
||||
let mut frame = resampled[offset..end].to_vec();
|
||||
// Pad short final frame with silence.
|
||||
frame.resize(MIX_FRAME_SIZE, 0);
|
||||
frame.resize(MIX_FRAME_SIZE, 0.0);
|
||||
frames.push(frame);
|
||||
offset += MIX_FRAME_SIZE;
|
||||
}
|
||||
|
||||
@@ -20,6 +20,35 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
|
||||
/// Emit a `leg_added` event with full leg information.
|
||||
/// Free function (not a method) to avoid `&self` borrow conflicts when `self.calls` is borrowed.
|
||||
fn emit_leg_added_event(tx: &OutTx, call_id: &str, leg: &LegInfo) {
|
||||
let metadata: serde_json::Value = if leg.metadata.is_empty() {
|
||||
serde_json::json!({})
|
||||
} else {
|
||||
serde_json::Value::Object(
|
||||
leg.metadata
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect(),
|
||||
)
|
||||
};
|
||||
emit_event(
|
||||
tx,
|
||||
"leg_added",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"leg_id": leg.id,
|
||||
"kind": leg.kind.as_str(),
|
||||
"state": leg.state.as_str(),
|
||||
"codec": sip_proto::helpers::codec_name(leg.codec_pt),
|
||||
"rtpPort": leg.rtp_port,
|
||||
"remoteMedia": leg.remote_media.map(|a| format!("{}:{}", a.ip(), a.port())),
|
||||
"metadata": metadata,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
pub struct CallManager {
|
||||
/// All active calls, keyed by internal call ID.
|
||||
pub calls: HashMap<String, Call>,
|
||||
@@ -120,7 +149,19 @@ impl CallManager {
|
||||
}
|
||||
|
||||
// Passthrough-style routing for inbound/outbound device↔provider calls.
|
||||
self.route_passthrough_message(&call_id, &leg_id, msg, from_addr, socket, config)
|
||||
// The sip_index only stores one leg for shared Call-IDs, so we need to
|
||||
// determine which leg the message actually belongs to by comparing from_addr.
|
||||
let actual_leg_id = self
|
||||
.calls
|
||||
.get(&call_id)
|
||||
.and_then(|call| {
|
||||
call.legs
|
||||
.values()
|
||||
.find(|l| l.signaling_addr == Some(from_addr))
|
||||
.map(|l| l.id.clone())
|
||||
})
|
||||
.unwrap_or(leg_id);
|
||||
self.route_passthrough_message(&call_id, &actual_leg_id, msg, from_addr, socket, config)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -253,6 +294,11 @@ impl CallManager {
|
||||
dev_leg.state = LegState::Connected;
|
||||
}
|
||||
}
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": dev_leg_id, "state": "connected" }),
|
||||
);
|
||||
|
||||
// Wire device leg to mixer.
|
||||
if let Some(dev_remote_addr) = dev_remote {
|
||||
@@ -312,6 +358,8 @@ impl CallManager {
|
||||
leg.state = LegState::Terminated;
|
||||
}
|
||||
}
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg_id, "state": "terminated" }));
|
||||
emit_event(&self.out_tx, "call_ended",
|
||||
serde_json::json!({ "call_id": call_id, "reason": reason, "duration": duration }));
|
||||
self.terminate_call(call_id).await;
|
||||
@@ -517,21 +565,30 @@ impl CallManager {
|
||||
if let Some(leg) = call.legs.get_mut(this_leg_id) {
|
||||
leg.state = LegState::Ringing;
|
||||
}
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "ringing" }));
|
||||
} else if code >= 200 && code < 300 {
|
||||
let mut needs_wiring = false;
|
||||
if let Some(leg) = call.legs.get_mut(this_leg_id) {
|
||||
leg.state = LegState::Connected;
|
||||
// Learn remote media from SDP.
|
||||
// Learn remote media and negotiated codec from SDP answer.
|
||||
if msg.has_sdp_body() {
|
||||
if let Some(ep) = parse_sdp_endpoint(&msg.body) {
|
||||
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
|
||||
leg.remote_media = Some(addr);
|
||||
}
|
||||
// Use the codec from the SDP answer (what the remote actually selected).
|
||||
if let Some(pt) = ep.codec_pt {
|
||||
leg.codec_pt = pt;
|
||||
}
|
||||
}
|
||||
}
|
||||
needs_wiring = true;
|
||||
}
|
||||
|
||||
emit_event(&self.out_tx, "leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": this_leg_id, "state": "connected" }));
|
||||
|
||||
if call.state != CallState::Connected {
|
||||
call.state = CallState::Connected;
|
||||
emit_event(&self.out_tx, "call_answered", serde_json::json!({ "call_id": call_id }));
|
||||
@@ -677,15 +734,19 @@ impl CallManager {
|
||||
call.callee_number = Some(called_number);
|
||||
call.state = CallState::Ringing;
|
||||
|
||||
let codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
|
||||
let mut codec_pt = provider_config.codecs.first().copied().unwrap_or(9);
|
||||
|
||||
// Provider leg — extract media from SDP.
|
||||
// Provider leg — extract media and negotiated codec from SDP.
|
||||
let mut provider_media: Option<SocketAddr> = None;
|
||||
if invite.has_sdp_body() {
|
||||
if let Some(ep) = parse_sdp_endpoint(&invite.body) {
|
||||
if let Ok(addr) = format!("{}:{}", ep.address, ep.port).parse() {
|
||||
provider_media = Some(addr);
|
||||
}
|
||||
// Use the codec from the provider's SDP offer (what they actually want to use).
|
||||
if let Some(pt) = ep.codec_pt {
|
||||
codec_pt = pt;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -755,6 +816,16 @@ impl CallManager {
|
||||
// Store the call.
|
||||
self.calls.insert(call_id.clone(), call);
|
||||
|
||||
// Emit leg_added for both initial legs.
|
||||
if let Some(call) = self.calls.get(&call_id) {
|
||||
if let Some(leg) = call.legs.get(&provider_leg_id) {
|
||||
emit_leg_added_event(&self.out_tx, &call_id, leg);
|
||||
}
|
||||
if let Some(leg) = call.legs.get(&device_leg_id) {
|
||||
emit_leg_added_event(&self.out_tx, &call_id, leg);
|
||||
}
|
||||
}
|
||||
|
||||
Some(call_id)
|
||||
}
|
||||
|
||||
@@ -842,6 +913,14 @@ impl CallManager {
|
||||
.insert(sip_call_id, (call_id.clone(), leg_id));
|
||||
|
||||
self.calls.insert(call_id.clone(), call);
|
||||
|
||||
// Emit leg_added for the provider leg.
|
||||
if let Some(call) = self.calls.get(&call_id) {
|
||||
for leg in call.legs.values() {
|
||||
emit_leg_added_event(&self.out_tx, &call_id, leg);
|
||||
}
|
||||
}
|
||||
|
||||
Some(call_id)
|
||||
}
|
||||
|
||||
@@ -866,11 +945,18 @@ impl CallManager {
|
||||
let lan_port = config.proxy.lan_port;
|
||||
let device_sip_call_id = invite.call_id().to_string();
|
||||
|
||||
// Extract just the user part from the request URI (e.g., "sip:16196000@10.0.0.1" → "16196000").
|
||||
// extract_uri is for header values with angle brackets, not bare request URIs.
|
||||
let dialed_number = invite
|
||||
.request_uri()
|
||||
.and_then(|uri| SipMessage::extract_uri(uri))
|
||||
.unwrap_or(invite.request_uri().unwrap_or(""))
|
||||
.to_string();
|
||||
.map(|uri| {
|
||||
let stripped = uri
|
||||
.strip_prefix("sip:")
|
||||
.or_else(|| uri.strip_prefix("sips:"))
|
||||
.unwrap_or(uri);
|
||||
stripped.split('@').next().unwrap_or(stripped).to_string()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let provider_dest: SocketAddr = match provider_config.outbound_proxy.to_socket_addr() {
|
||||
Some(a) => a,
|
||||
@@ -983,6 +1069,14 @@ impl CallManager {
|
||||
.insert(provider_sip_call_id, (call_id.clone(), provider_leg_id));
|
||||
|
||||
self.calls.insert(call_id.clone(), call);
|
||||
|
||||
// Emit leg_added for both initial legs (device + provider).
|
||||
if let Some(call) = self.calls.get(&call_id) {
|
||||
for leg in call.legs.values() {
|
||||
emit_leg_added_event(&self.out_tx, &call_id, leg);
|
||||
}
|
||||
}
|
||||
|
||||
Some(call_id)
|
||||
}
|
||||
|
||||
@@ -1050,17 +1144,11 @@ impl CallManager {
|
||||
let call = self.calls.get_mut(call_id).unwrap();
|
||||
call.legs.insert(leg_id.clone(), leg_info);
|
||||
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_added",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"leg_id": leg_id,
|
||||
"kind": "sip-provider",
|
||||
"state": "inviting",
|
||||
"number": number,
|
||||
}),
|
||||
);
|
||||
if let Some(call) = self.calls.get(call_id) {
|
||||
if let Some(leg) = call.legs.get(&leg_id) {
|
||||
emit_leg_added_event(&self.out_tx, call_id, leg);
|
||||
}
|
||||
}
|
||||
|
||||
Some(leg_id)
|
||||
}
|
||||
@@ -1126,17 +1214,11 @@ impl CallManager {
|
||||
let call = self.calls.get_mut(call_id).unwrap();
|
||||
call.legs.insert(leg_id.clone(), leg_info);
|
||||
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_added",
|
||||
serde_json::json!({
|
||||
"call_id": call_id,
|
||||
"leg_id": leg_id,
|
||||
"kind": "sip-device",
|
||||
"state": "inviting",
|
||||
"device_id": device_id,
|
||||
}),
|
||||
);
|
||||
if let Some(call) = self.calls.get(call_id) {
|
||||
if let Some(leg) = call.legs.get(&leg_id) {
|
||||
emit_leg_added_event(&self.out_tx, call_id, leg);
|
||||
}
|
||||
}
|
||||
|
||||
Some(leg_id)
|
||||
}
|
||||
@@ -1223,6 +1305,13 @@ impl CallManager {
|
||||
None => return false,
|
||||
};
|
||||
|
||||
// Emit leg_removed for source call.
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_removed",
|
||||
serde_json::json!({ "call_id": source_call_id, "leg_id": leg_id }),
|
||||
);
|
||||
|
||||
// Update SIP index to point to the target call.
|
||||
if let Some(sip_cid) = &leg_info.sip_call_id {
|
||||
self.sip_index.insert(
|
||||
@@ -1255,15 +1344,12 @@ impl CallManager {
|
||||
let target_call = self.calls.get_mut(target_call_id).unwrap();
|
||||
target_call.legs.insert(leg_id.to_string(), leg_info);
|
||||
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_transferred",
|
||||
serde_json::json!({
|
||||
"leg_id": leg_id,
|
||||
"source_call_id": source_call_id,
|
||||
"target_call_id": target_call_id,
|
||||
}),
|
||||
);
|
||||
// Emit leg_added for target call.
|
||||
if let Some(target) = self.calls.get(target_call_id) {
|
||||
if let Some(leg) = target.legs.get(leg_id) {
|
||||
emit_leg_added_event(&self.out_tx, target_call_id, leg);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if source call has too few legs remaining.
|
||||
let source_call = self.calls.get(source_call_id).unwrap();
|
||||
@@ -1366,6 +1452,11 @@ impl CallManager {
|
||||
}
|
||||
}
|
||||
leg.state = LegState::Terminated;
|
||||
emit_event(
|
||||
&self.out_tx,
|
||||
"leg_state_changed",
|
||||
serde_json::json!({ "call_id": call_id, "leg_id": leg.id, "state": "terminated" }),
|
||||
);
|
||||
}
|
||||
|
||||
emit_event(
|
||||
@@ -1484,6 +1575,13 @@ impl CallManager {
|
||||
);
|
||||
self.calls.insert(call_id.to_string(), call);
|
||||
|
||||
// Emit leg_added for the provider leg.
|
||||
if let Some(call) = self.calls.get(call_id) {
|
||||
for leg in call.legs.values() {
|
||||
emit_leg_added_event(&self.out_tx, call_id, leg);
|
||||
}
|
||||
}
|
||||
|
||||
// Build recording path.
|
||||
let timestamp = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
|
||||
@@ -35,7 +35,8 @@ pub fn create_leg_channels() -> LegChannels {
|
||||
}
|
||||
|
||||
/// Spawn the inbound I/O task for a SIP leg.
|
||||
/// Reads RTP from the socket, strips the 12-byte header, sends payload to the mixer.
|
||||
/// Reads RTP from the socket, parses the variable-length header (RFC 3550),
|
||||
/// and sends the payload to the mixer.
|
||||
/// Returns the JoinHandle (exits when the inbound_tx channel is dropped).
|
||||
pub fn spawn_sip_inbound(
|
||||
rtp_socket: Arc<UdpSocket>,
|
||||
@@ -51,12 +52,29 @@ pub fn spawn_sip_inbound(
|
||||
}
|
||||
let pt = buf[1] & 0x7F;
|
||||
let marker = (buf[1] & 0x80) != 0;
|
||||
let seq = u16::from_be_bytes([buf[2], buf[3]]);
|
||||
let timestamp = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
|
||||
let payload = buf[12..n].to_vec();
|
||||
|
||||
// RFC 3550: header length = 12 + (CC * 4) + optional extension.
|
||||
let cc = (buf[0] & 0x0F) as usize;
|
||||
let has_extension = (buf[0] & 0x10) != 0;
|
||||
let mut offset = 12 + cc * 4;
|
||||
if has_extension {
|
||||
if offset + 4 > n {
|
||||
continue; // Malformed: extension header truncated.
|
||||
}
|
||||
let ext_len = u16::from_be_bytes([buf[offset + 2], buf[offset + 3]]) as usize;
|
||||
offset += 4 + ext_len * 4;
|
||||
}
|
||||
if offset >= n {
|
||||
continue; // No payload after header.
|
||||
}
|
||||
|
||||
let payload = buf[offset..n].to_vec();
|
||||
if payload.is_empty() {
|
||||
continue;
|
||||
}
|
||||
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, timestamp }).await.is_err() {
|
||||
if inbound_tx.send(RtpPacket { payload, payload_type: pt, marker, seq, timestamp }).await.is_err() {
|
||||
break; // Channel closed — leg removed.
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ mod rtp;
|
||||
mod sip_leg;
|
||||
mod sip_transport;
|
||||
mod tool_leg;
|
||||
mod tts;
|
||||
mod voicemail;
|
||||
mod webrtc_engine;
|
||||
|
||||
@@ -93,6 +94,9 @@ async fn main() {
|
||||
// WebRTC engine — separate lock to avoid deadlock with SIP handlers.
|
||||
let webrtc = Arc::new(Mutex::new(WebRtcEngine::new(out_tx.clone())));
|
||||
|
||||
// TTS engine — separate lock, lazy-loads model on first use.
|
||||
let tts_engine = Arc::new(Mutex::new(tts::TtsEngine::new()));
|
||||
|
||||
// Read commands from stdin.
|
||||
let stdin = tokio::io::stdin();
|
||||
let reader = BufReader::new(stdin);
|
||||
@@ -113,11 +117,12 @@ async fn main() {
|
||||
|
||||
let engine = engine.clone();
|
||||
let webrtc = webrtc.clone();
|
||||
let tts_engine = tts_engine.clone();
|
||||
let out_tx = out_tx.clone();
|
||||
|
||||
// Handle commands — some are async, so we spawn.
|
||||
tokio::spawn(async move {
|
||||
handle_command(engine, webrtc, &out_tx, cmd).await;
|
||||
handle_command(engine, webrtc, tts_engine, &out_tx, cmd).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -125,6 +130,7 @@ async fn main() {
|
||||
async fn handle_command(
|
||||
engine: Arc<Mutex<ProxyEngine>>,
|
||||
webrtc: Arc<Mutex<WebRtcEngine>>,
|
||||
tts_engine: Arc<Mutex<tts::TtsEngine>>,
|
||||
out_tx: &OutTx,
|
||||
cmd: Command,
|
||||
) {
|
||||
@@ -150,6 +156,8 @@ async fn handle_command(
|
||||
"add_tool_leg" => handle_add_tool_leg(engine, out_tx, &cmd).await,
|
||||
"remove_tool_leg" => handle_remove_tool_leg(engine, out_tx, &cmd).await,
|
||||
"set_leg_metadata" => handle_set_leg_metadata(engine, out_tx, &cmd).await,
|
||||
// TTS command — lock tts_engine only (no SIP/WebRTC contention).
|
||||
"generate_tts" => handle_generate_tts(tts_engine, out_tx, &cmd).await,
|
||||
_ => respond_err(out_tx, &cmd.id, &format!("unknown command: {}", cmd.method)),
|
||||
}
|
||||
}
|
||||
@@ -669,6 +677,10 @@ async fn handle_webrtc_link(
|
||||
"leg_id": session_id,
|
||||
"kind": "webrtc",
|
||||
"state": "connected",
|
||||
"codec": "Opus",
|
||||
"rtpPort": 0,
|
||||
"remoteMedia": null,
|
||||
"metadata": {},
|
||||
}));
|
||||
|
||||
respond_ok(out_tx, &cmd.id, serde_json::json!({
|
||||
@@ -1117,8 +1129,11 @@ async fn handle_add_tool_leg(
|
||||
"call_id": call_id,
|
||||
"leg_id": tool_leg_id,
|
||||
"kind": "tool",
|
||||
"tool_type": tool_type_str,
|
||||
"state": "connected",
|
||||
"codec": null,
|
||||
"rtpPort": 0,
|
||||
"remoteMedia": null,
|
||||
"metadata": { "tool_type": tool_type_str },
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -1218,3 +1233,16 @@ async fn handle_set_leg_metadata(
|
||||
leg.metadata.insert(key, value);
|
||||
respond_ok(out_tx, &cmd.id, serde_json::json!({}));
|
||||
}
|
||||
|
||||
/// Handle `generate_tts` — synthesize text to a WAV file using Kokoro TTS.
|
||||
async fn handle_generate_tts(
|
||||
tts_engine: Arc<Mutex<tts::TtsEngine>>,
|
||||
out_tx: &OutTx,
|
||||
cmd: &Command,
|
||||
) {
|
||||
let mut tts = tts_engine.lock().await;
|
||||
match tts.generate(&cmd.params).await {
|
||||
Ok(result) => respond_ok(out_tx, &cmd.id, result),
|
||||
Err(e) => respond_err(out_tx, &cmd.id, &e),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,12 @@
|
||||
//! Each Call spawns one mixer task. Legs communicate with the mixer via
|
||||
//! tokio mpsc channels — no shared mutable state, no lock contention.
|
||||
//!
|
||||
//! Internal bus format: 48kHz f32 PCM (960 samples per 20ms frame).
|
||||
//! All encoding/decoding happens at leg boundaries. Per-leg inbound denoising at 48kHz.
|
||||
//!
|
||||
//! The mixer runs a 20ms tick loop:
|
||||
//! 1. Drain inbound channels, decode to PCM, resample to 16kHz
|
||||
//! 2. Compute total mix (sum of all **participant** legs' PCM as i32)
|
||||
//! 1. Drain inbound channels, decode to f32, resample to 48kHz, denoise per-leg
|
||||
//! 2. Compute total mix (sum of all **participant** legs' f32 PCM as f64)
|
||||
//! 3. For each participant leg: mix-minus = total - own, resample to leg codec rate, encode, send
|
||||
//! 4. For each isolated leg: play prompt frame or silence, check DTMF
|
||||
//! 5. For each tool leg: send per-source unmerged audio batch
|
||||
@@ -13,16 +16,18 @@
|
||||
|
||||
use crate::ipc::{emit_event, OutTx};
|
||||
use crate::rtp::{build_rtp_header, rtp_clock_increment};
|
||||
use codec_lib::{codec_sample_rate, TranscodeState};
|
||||
use codec_lib::{codec_sample_rate, new_denoiser, TranscodeState};
|
||||
use nnnoiseless::DenoiseState;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::{self, Duration, MissedTickBehavior};
|
||||
|
||||
/// Mixing sample rate — 16kHz. G.722 is native, G.711 needs 2× upsample, Opus needs 3× downsample.
|
||||
const MIX_RATE: u32 = 16000;
|
||||
/// Mixing sample rate — 48kHz. Opus is native, G.722 needs 3× upsample, G.711 needs 6× upsample.
|
||||
/// All processing (denoising, mixing) happens at this rate in f32 for maximum quality.
|
||||
const MIX_RATE: u32 = 48000;
|
||||
/// Samples per 20ms frame at the mixing rate.
|
||||
const MIX_FRAME_SIZE: usize = 320; // 16000 * 0.020
|
||||
const MIX_FRAME_SIZE: usize = 960; // 48000 * 0.020
|
||||
|
||||
/// A raw RTP payload received from a leg (no RTP header).
|
||||
pub struct RtpPacket {
|
||||
@@ -30,6 +35,8 @@ pub struct RtpPacket {
|
||||
pub payload_type: u8,
|
||||
/// RTP marker bit (first packet of a DTMF event, etc.).
|
||||
pub marker: bool,
|
||||
/// RTP sequence number for reordering.
|
||||
pub seq: u16,
|
||||
/// RTP timestamp from the original packet header.
|
||||
pub timestamp: u32,
|
||||
}
|
||||
@@ -47,8 +54,8 @@ enum LegRole {
|
||||
}
|
||||
|
||||
struct IsolationState {
|
||||
/// PCM frames at MIX_RATE (320 samples each) queued for playback.
|
||||
prompt_frames: VecDeque<Vec<i16>>,
|
||||
/// PCM frames at MIX_RATE (960 samples each, 48kHz f32) queued for playback.
|
||||
prompt_frames: VecDeque<Vec<f32>>,
|
||||
/// Digits that complete the interaction (e.g., ['1', '2']).
|
||||
expected_digits: Vec<char>,
|
||||
/// Ticks remaining before timeout (decremented each tick after prompt ends).
|
||||
@@ -88,8 +95,8 @@ pub struct ToolAudioBatch {
|
||||
/// One participant's 20ms audio frame.
|
||||
pub struct ToolAudioSource {
|
||||
pub leg_id: String,
|
||||
/// PCM at 16kHz, MIX_FRAME_SIZE (320) samples.
|
||||
pub pcm_16k: Vec<i16>,
|
||||
/// PCM at 48kHz f32, MIX_FRAME_SIZE (960) samples.
|
||||
pub pcm_48k: Vec<f32>,
|
||||
}
|
||||
|
||||
/// Internal storage for a tool leg inside the mixer.
|
||||
@@ -122,8 +129,8 @@ pub enum MixerCommand {
|
||||
/// DTMF from the leg is checked against expected_digits.
|
||||
StartInteraction {
|
||||
leg_id: String,
|
||||
/// PCM frames at MIX_RATE (16kHz), each 320 samples.
|
||||
prompt_pcm_frames: Vec<Vec<i16>>,
|
||||
/// PCM frames at MIX_RATE (48kHz f32), each 960 samples.
|
||||
prompt_pcm_frames: Vec<Vec<f32>>,
|
||||
expected_digits: Vec<char>,
|
||||
timeout_ms: u32,
|
||||
result_tx: oneshot::Sender<InteractionResult>,
|
||||
@@ -149,10 +156,12 @@ pub enum MixerCommand {
|
||||
struct MixerLegSlot {
|
||||
codec_pt: u8,
|
||||
transcoder: TranscodeState,
|
||||
/// Per-leg inbound denoiser (48kHz, 480-sample frames).
|
||||
denoiser: Box<DenoiseState<'static>>,
|
||||
inbound_rx: mpsc::Receiver<RtpPacket>,
|
||||
outbound_tx: mpsc::Sender<Vec<u8>>,
|
||||
/// Last decoded PCM frame at MIX_RATE (320 samples). Used for mix-minus.
|
||||
last_pcm_frame: Vec<i16>,
|
||||
/// Last decoded+denoised PCM frame at MIX_RATE (960 samples, 48kHz f32).
|
||||
last_pcm_frame: Vec<f32>,
|
||||
/// Number of consecutive ticks with no inbound packet.
|
||||
silent_ticks: u32,
|
||||
// RTP output state.
|
||||
@@ -220,9 +229,10 @@ async fn mixer_loop(
|
||||
MixerLegSlot {
|
||||
codec_pt,
|
||||
transcoder,
|
||||
denoiser: new_denoiser(),
|
||||
inbound_rx,
|
||||
outbound_tx,
|
||||
last_pcm_frame: vec![0i16; MIX_FRAME_SIZE],
|
||||
last_pcm_frame: vec![0.0f32; MIX_FRAME_SIZE],
|
||||
silent_ticks: 0,
|
||||
rtp_seq: 0,
|
||||
rtp_ts: 0,
|
||||
@@ -311,16 +321,18 @@ async fn mixer_loop(
|
||||
continue;
|
||||
}
|
||||
|
||||
// ── 2. Drain inbound packets, decode to 16kHz PCM. ─────────
|
||||
// ── 2. Drain inbound packets, decode to 48kHz f32 PCM. ────
|
||||
// DTMF (PT 101) packets are collected separately.
|
||||
// Audio packets are sorted by sequence number and decoded
|
||||
// in order to maintain codec state (critical for G.722 ADPCM).
|
||||
let leg_ids: Vec<String> = legs.keys().cloned().collect();
|
||||
let mut dtmf_forward: Vec<(String, RtpPacket)> = Vec::new();
|
||||
|
||||
for lid in &leg_ids {
|
||||
let slot = legs.get_mut(lid).unwrap();
|
||||
|
||||
// Drain channel — collect DTMF packets separately, keep latest audio.
|
||||
let mut latest_audio: Option<RtpPacket> = None;
|
||||
// Drain channel — collect DTMF separately, collect ALL audio packets.
|
||||
let mut audio_packets: Vec<RtpPacket> = Vec::new();
|
||||
loop {
|
||||
match slot.inbound_rx.try_recv() {
|
||||
Ok(pkt) => {
|
||||
@@ -328,33 +340,47 @@ async fn mixer_loop(
|
||||
// DTMF telephone-event: collect for processing.
|
||||
dtmf_forward.push((lid.clone(), pkt));
|
||||
} else {
|
||||
latest_audio = Some(pkt);
|
||||
audio_packets.push(pkt);
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(pkt) = latest_audio {
|
||||
if !audio_packets.is_empty() {
|
||||
slot.silent_ticks = 0;
|
||||
match slot.transcoder.decode_to_pcm(&pkt.payload, pkt.payload_type) {
|
||||
Ok((pcm, rate)) => {
|
||||
// Resample to mixing rate if needed.
|
||||
let pcm_mix = if rate == MIX_RATE {
|
||||
pcm
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&pcm, rate, MIX_RATE)
|
||||
.unwrap_or_else(|_| vec![0i16; MIX_FRAME_SIZE])
|
||||
};
|
||||
// Pad or truncate to exactly MIX_FRAME_SIZE.
|
||||
let mut frame = pcm_mix;
|
||||
frame.resize(MIX_FRAME_SIZE, 0);
|
||||
slot.last_pcm_frame = frame;
|
||||
}
|
||||
Err(_) => {
|
||||
// Decode failed — use silence.
|
||||
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
|
||||
|
||||
// Sort by sequence number for correct codec state progression.
|
||||
// This prevents G.722 ADPCM state corruption from out-of-order packets.
|
||||
audio_packets.sort_by_key(|p| p.seq);
|
||||
|
||||
// Decode ALL packets in order (maintains codec state),
|
||||
// but only keep the last decoded frame for mixing.
|
||||
for pkt in &audio_packets {
|
||||
match slot.transcoder.decode_to_f32(&pkt.payload, pkt.payload_type) {
|
||||
Ok((pcm, rate)) => {
|
||||
// Resample to 48kHz mixing rate if needed.
|
||||
let pcm_48k = if rate == MIX_RATE {
|
||||
pcm
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample_f32(&pcm, rate, MIX_RATE)
|
||||
.unwrap_or_else(|_| vec![0.0f32; MIX_FRAME_SIZE])
|
||||
};
|
||||
// Per-leg inbound denoising at 48kHz.
|
||||
// Skip for Opus/WebRTC legs — browsers already apply
|
||||
// their own noise suppression via getUserMedia.
|
||||
let processed = if slot.codec_pt != codec_lib::PT_OPUS {
|
||||
TranscodeState::denoise_f32(&mut slot.denoiser, &pcm_48k)
|
||||
} else {
|
||||
pcm_48k
|
||||
};
|
||||
// Pad or truncate to exactly MIX_FRAME_SIZE.
|
||||
let mut frame = processed;
|
||||
frame.resize(MIX_FRAME_SIZE, 0.0);
|
||||
slot.last_pcm_frame = frame;
|
||||
}
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
} else if dtmf_forward.iter().any(|(src, _)| src == lid) {
|
||||
@@ -364,17 +390,18 @@ async fn mixer_loop(
|
||||
slot.silent_ticks += 1;
|
||||
// After 150 ticks (3 seconds) of silence, zero out to avoid stale audio.
|
||||
if slot.silent_ticks > 150 {
|
||||
slot.last_pcm_frame = vec![0i16; MIX_FRAME_SIZE];
|
||||
slot.last_pcm_frame = vec![0.0f32; MIX_FRAME_SIZE];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── 3. Compute total mix from PARTICIPANT legs only. ────────
|
||||
let mut total_mix = vec![0i32; MIX_FRAME_SIZE];
|
||||
// Accumulate as f64 to prevent precision loss when summing f32.
|
||||
let mut total_mix = vec![0.0f64; MIX_FRAME_SIZE];
|
||||
for slot in legs.values() {
|
||||
if matches!(slot.role, LegRole::Participant) {
|
||||
for (i, &s) in slot.last_pcm_frame.iter().enumerate().take(MIX_FRAME_SIZE) {
|
||||
total_mix[i] += s as i32;
|
||||
total_mix[i] += s as f64;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -387,27 +414,27 @@ async fn mixer_loop(
|
||||
for (lid, slot) in legs.iter_mut() {
|
||||
match &mut slot.role {
|
||||
LegRole::Participant => {
|
||||
// Mix-minus: total minus this leg's own contribution.
|
||||
// Mix-minus: total minus this leg's own contribution, clamped to [-1.0, 1.0].
|
||||
let mut mix_minus = Vec::with_capacity(MIX_FRAME_SIZE);
|
||||
for i in 0..MIX_FRAME_SIZE {
|
||||
let sample = (total_mix[i] - slot.last_pcm_frame[i] as i32)
|
||||
.clamp(-32768, 32767) as i16;
|
||||
mix_minus.push(sample);
|
||||
let sample =
|
||||
(total_mix[i] - slot.last_pcm_frame[i] as f64) as f32;
|
||||
mix_minus.push(sample.clamp(-1.0, 1.0));
|
||||
}
|
||||
|
||||
// Resample from 16kHz to the leg's codec native rate.
|
||||
// Resample from 48kHz to the leg's codec native rate.
|
||||
let target_rate = codec_sample_rate(slot.codec_pt);
|
||||
let resampled = if target_rate == MIX_RATE {
|
||||
mix_minus
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&mix_minus, MIX_RATE, target_rate)
|
||||
.resample_f32(&mix_minus, MIX_RATE, target_rate)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
// Encode to the leg's codec.
|
||||
// Encode to the leg's codec (f32 → i16 → codec inside encode_from_f32).
|
||||
let encoded =
|
||||
match slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt) {
|
||||
match slot.transcoder.encode_from_f32(&resampled, slot.codec_pt) {
|
||||
Ok(e) if !e.is_empty() => e,
|
||||
_ => continue,
|
||||
};
|
||||
@@ -456,21 +483,21 @@ async fn mixer_loop(
|
||||
frame
|
||||
} else {
|
||||
state.prompt_done = true;
|
||||
vec![0i16; MIX_FRAME_SIZE]
|
||||
vec![0.0f32; MIX_FRAME_SIZE]
|
||||
};
|
||||
|
||||
// Encode prompt frame to the leg's codec (reuses existing encode path).
|
||||
// Encode prompt frame to the leg's codec.
|
||||
let target_rate = codec_sample_rate(slot.codec_pt);
|
||||
let resampled = if target_rate == MIX_RATE {
|
||||
pcm_frame
|
||||
} else {
|
||||
slot.transcoder
|
||||
.resample(&pcm_frame, MIX_RATE, target_rate)
|
||||
.resample_f32(&pcm_frame, MIX_RATE, target_rate)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
if let Ok(encoded) =
|
||||
slot.transcoder.encode_from_pcm(&resampled, slot.codec_pt)
|
||||
slot.transcoder.encode_from_f32(&resampled, slot.codec_pt)
|
||||
{
|
||||
if !encoded.is_empty() {
|
||||
let header = build_rtp_header(
|
||||
@@ -523,7 +550,7 @@ async fn mixer_loop(
|
||||
.filter(|(_, s)| matches!(s.role, LegRole::Participant))
|
||||
.map(|(lid, s)| ToolAudioSource {
|
||||
leg_id: lid.clone(),
|
||||
pcm_16k: s.last_pcm_frame.clone(),
|
||||
pcm_48k: s.last_pcm_frame.clone(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
@@ -533,7 +560,7 @@ async fn mixer_loop(
|
||||
.iter()
|
||||
.map(|s| ToolAudioSource {
|
||||
leg_id: s.leg_id.clone(),
|
||||
pcm_16k: s.pcm_16k.clone(),
|
||||
pcm_48k: s.pcm_48k.clone(),
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! Tool legs are observer legs that receive individual audio streams from each
|
||||
//! participant in a call. The mixer pipes `ToolAudioBatch` every 20ms containing
|
||||
//! each participant's decoded PCM@16kHz tagged with source leg ID.
|
||||
//! each participant's decoded PCM@48kHz f32 tagged with source leg ID.
|
||||
//!
|
||||
//! Consumers:
|
||||
//! - **Recording**: writes per-source WAV files for speaker-separated recording.
|
||||
@@ -37,20 +37,25 @@ pub fn spawn_recording_tool(
|
||||
|
||||
while let Some(batch) = rx.recv().await {
|
||||
for source in &batch.sources {
|
||||
// Skip silence-only frames (all zeros = no audio activity).
|
||||
let has_audio = source.pcm_16k.iter().any(|&s| s != 0);
|
||||
// Skip silence-only frames (near-zero = no audio activity).
|
||||
let has_audio = source.pcm_48k.iter().any(|&s| s.abs() > 1e-6);
|
||||
if !has_audio && !recorders.contains_key(&source.leg_id) {
|
||||
continue; // Don't create a file for silence-only sources.
|
||||
}
|
||||
|
||||
let recorder = recorders.entry(source.leg_id.clone()).or_insert_with(|| {
|
||||
let path = format!("{}/{}-{}.wav", base_dir, call_id, source.leg_id);
|
||||
Recorder::new_pcm(&path, 16000, None).unwrap_or_else(|e| {
|
||||
Recorder::new_pcm(&path, 48000, None).unwrap_or_else(|e| {
|
||||
panic!("failed to create recorder for {}: {e}", source.leg_id);
|
||||
})
|
||||
});
|
||||
|
||||
if !recorder.write_pcm(&source.pcm_16k) {
|
||||
// Convert f32 [-1.0, 1.0] to i16 for WAV writing.
|
||||
let pcm_i16: Vec<i16> = source.pcm_48k
|
||||
.iter()
|
||||
.map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16)
|
||||
.collect();
|
||||
if !recorder.write_pcm(&pcm_i16) {
|
||||
// Max duration reached — stop recording this source.
|
||||
break;
|
||||
}
|
||||
@@ -88,7 +93,7 @@ pub fn spawn_recording_tool(
|
||||
|
||||
/// Spawn a transcription tool leg.
|
||||
///
|
||||
/// The plumbing is fully real: it receives per-source unmerged PCM@16kHz from
|
||||
/// The plumbing is fully real: it receives per-source unmerged PCM@48kHz f32 from
|
||||
/// the mixer every 20ms. The consumer is a stub that accumulates audio and
|
||||
/// reports metadata on close. Future: will stream to a Whisper HTTP endpoint.
|
||||
pub fn spawn_transcription_tool(
|
||||
@@ -105,7 +110,7 @@ pub fn spawn_transcription_tool(
|
||||
while let Some(batch) = rx.recv().await {
|
||||
for source in &batch.sources {
|
||||
*source_samples.entry(source.leg_id.clone()).or_insert(0) +=
|
||||
source.pcm_16k.len() as u64;
|
||||
source.pcm_48k.len() as u64;
|
||||
|
||||
// TODO: Future — accumulate chunks and stream to Whisper endpoint.
|
||||
// For now, the audio is received and counted but not processed.
|
||||
@@ -118,7 +123,7 @@ pub fn spawn_transcription_tool(
|
||||
.map(|(leg_id, samples)| {
|
||||
serde_json::json!({
|
||||
"source_leg_id": leg_id,
|
||||
"duration_ms": (samples * 1000) / 16000,
|
||||
"duration_ms": (samples * 1000) / 48000,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
138
rust/crates/proxy-engine/src/tts.rs
Normal file
138
rust/crates/proxy-engine/src/tts.rs
Normal file
@@ -0,0 +1,138 @@
|
||||
//! Text-to-speech engine — synthesizes text to WAV files using Kokoro neural TTS.
|
||||
//!
|
||||
//! The model is loaded lazily on first use. If the model/voices files are not
|
||||
//! present, the generate command returns an error and the TS side falls back
|
||||
//! to espeak-ng.
|
||||
|
||||
use kokoro_tts::{KokoroTts, Voice};
|
||||
use std::path::Path;
|
||||
|
||||
/// Wraps the Kokoro TTS engine with lazy model loading.
|
||||
pub struct TtsEngine {
|
||||
tts: Option<KokoroTts>,
|
||||
/// Path that was used to load the current model (for cache invalidation).
|
||||
loaded_model_path: String,
|
||||
loaded_voices_path: String,
|
||||
}
|
||||
|
||||
impl TtsEngine {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
tts: None,
|
||||
loaded_model_path: String::new(),
|
||||
loaded_voices_path: String::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a WAV file from text.
|
||||
///
|
||||
/// Params (from IPC JSON):
|
||||
/// - `model`: path to the ONNX model file
|
||||
/// - `voices`: path to the voices.bin file
|
||||
/// - `voice`: voice name (e.g. "af_bella")
|
||||
/// - `text`: text to synthesize
|
||||
/// - `output`: output WAV file path
|
||||
pub async fn generate(&mut self, params: &serde_json::Value) -> Result<serde_json::Value, String> {
|
||||
let model_path = params.get("model").and_then(|v| v.as_str())
|
||||
.ok_or("missing 'model' param")?;
|
||||
let voices_path = params.get("voices").and_then(|v| v.as_str())
|
||||
.ok_or("missing 'voices' param")?;
|
||||
let voice_name = params.get("voice").and_then(|v| v.as_str())
|
||||
.unwrap_or("af_bella");
|
||||
let text = params.get("text").and_then(|v| v.as_str())
|
||||
.ok_or("missing 'text' param")?;
|
||||
let output_path = params.get("output").and_then(|v| v.as_str())
|
||||
.ok_or("missing 'output' param")?;
|
||||
|
||||
if text.is_empty() {
|
||||
return Err("empty text".into());
|
||||
}
|
||||
|
||||
// Check that model/voices files exist.
|
||||
if !Path::new(model_path).exists() {
|
||||
return Err(format!("model not found: {model_path}"));
|
||||
}
|
||||
if !Path::new(voices_path).exists() {
|
||||
return Err(format!("voices not found: {voices_path}"));
|
||||
}
|
||||
|
||||
// Lazy-load or reload if paths changed.
|
||||
if self.tts.is_none()
|
||||
|| self.loaded_model_path != model_path
|
||||
|| self.loaded_voices_path != voices_path
|
||||
{
|
||||
eprintln!("[tts] loading model: {model_path}");
|
||||
let tts = KokoroTts::new(model_path, voices_path)
|
||||
.await
|
||||
.map_err(|e| format!("model load failed: {e:?}"))?;
|
||||
self.tts = Some(tts);
|
||||
self.loaded_model_path = model_path.to_string();
|
||||
self.loaded_voices_path = voices_path.to_string();
|
||||
}
|
||||
|
||||
let tts = self.tts.as_ref().unwrap();
|
||||
let voice = select_voice(voice_name);
|
||||
|
||||
eprintln!("[tts] synthesizing voice '{voice_name}': \"{text}\"");
|
||||
let (samples, duration) = tts.synth(text, voice)
|
||||
.await
|
||||
.map_err(|e| format!("synthesis failed: {e:?}"))?;
|
||||
eprintln!("[tts] synthesized {} samples in {duration:?}", samples.len());
|
||||
|
||||
// Write 24kHz 16-bit mono WAV.
|
||||
let spec = hound::WavSpec {
|
||||
channels: 1,
|
||||
sample_rate: 24000,
|
||||
bits_per_sample: 16,
|
||||
sample_format: hound::SampleFormat::Int,
|
||||
};
|
||||
|
||||
let mut writer = hound::WavWriter::create(output_path, spec)
|
||||
.map_err(|e| format!("WAV create failed: {e}"))?;
|
||||
for &sample in &samples {
|
||||
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
|
||||
writer.write_sample(s16).map_err(|e| format!("WAV write: {e}"))?;
|
||||
}
|
||||
writer.finalize().map_err(|e| format!("WAV finalize: {e}"))?;
|
||||
|
||||
eprintln!("[tts] wrote {output_path}");
|
||||
Ok(serde_json::json!({ "output": output_path }))
|
||||
}
|
||||
}
|
||||
|
||||
/// Map voice name string to Kokoro Voice enum variant.
|
||||
fn select_voice(name: &str) -> Voice {
|
||||
match name {
|
||||
"af_bella" => Voice::AfBella(1.0),
|
||||
"af_heart" => Voice::AfHeart(1.0),
|
||||
"af_jessica" => Voice::AfJessica(1.0),
|
||||
"af_nicole" => Voice::AfNicole(1.0),
|
||||
"af_nova" => Voice::AfNova(1.0),
|
||||
"af_sarah" => Voice::AfSarah(1.0),
|
||||
"af_sky" => Voice::AfSky(1.0),
|
||||
"af_river" => Voice::AfRiver(1.0),
|
||||
"af_alloy" => Voice::AfAlloy(1.0),
|
||||
"af_aoede" => Voice::AfAoede(1.0),
|
||||
"af_kore" => Voice::AfKore(1.0),
|
||||
"am_adam" => Voice::AmAdam(1.0),
|
||||
"am_echo" => Voice::AmEcho(1.0),
|
||||
"am_eric" => Voice::AmEric(1.0),
|
||||
"am_fenrir" => Voice::AmFenrir(1.0),
|
||||
"am_liam" => Voice::AmLiam(1.0),
|
||||
"am_michael" => Voice::AmMichael(1.0),
|
||||
"am_onyx" => Voice::AmOnyx(1.0),
|
||||
"am_puck" => Voice::AmPuck(1.0),
|
||||
"bf_alice" => Voice::BfAlice(1.0),
|
||||
"bf_emma" => Voice::BfEmma(1.0),
|
||||
"bf_isabella" => Voice::BfIsabella(1.0),
|
||||
"bf_lily" => Voice::BfLily(1.0),
|
||||
"bm_daniel" => Voice::BmDaniel(1.0),
|
||||
"bm_fable" => Voice::BmFable(1.0),
|
||||
"bm_george" => Voice::BmGeorge(1.0),
|
||||
"bm_lewis" => Voice::BmLewis(1.0),
|
||||
_ => {
|
||||
eprintln!("[tts] unknown voice '{name}', falling back to af_bella");
|
||||
Voice::AfBella(1.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -290,8 +290,9 @@ async fn browser_to_mixer_loop(
|
||||
.send(RtpPacket {
|
||||
payload: payload.to_vec(),
|
||||
payload_type: PT_OPUS,
|
||||
marker: false,
|
||||
timestamp: 0,
|
||||
marker: rtp_packet.header.marker,
|
||||
seq: rtp_packet.header.sequence_number,
|
||||
timestamp: rtp_packet.header.timestamp,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -197,10 +197,11 @@ pub fn compute_digest_auth(
|
||||
|
||||
use crate::Endpoint;
|
||||
|
||||
/// Parse the audio media port and connection address from an SDP body.
|
||||
/// Parse the audio media port, connection address, and preferred codec from an SDP body.
|
||||
pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
|
||||
let mut addr: Option<&str> = None;
|
||||
let mut port: Option<u16> = None;
|
||||
let mut codec_pt: Option<u8> = None;
|
||||
|
||||
let normalized = sdp.replace("\r\n", "\n");
|
||||
for raw in normalized.split('\n') {
|
||||
@@ -208,10 +209,16 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
|
||||
if let Some(rest) = line.strip_prefix("c=IN IP4 ") {
|
||||
addr = Some(rest.trim());
|
||||
} else if let Some(rest) = line.strip_prefix("m=audio ") {
|
||||
// m=audio <port> RTP/AVP <pt1> [<pt2> ...]
|
||||
let parts: Vec<&str> = rest.split_whitespace().collect();
|
||||
if !parts.is_empty() {
|
||||
port = parts[0].parse().ok();
|
||||
}
|
||||
// parts[1] is "RTP/AVP" or similar, parts[2..] are payload types.
|
||||
// The first PT is the preferred codec.
|
||||
if parts.len() > 2 {
|
||||
codec_pt = parts[2].parse::<u8>().ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,6 +226,7 @@ pub fn parse_sdp_endpoint(sdp: &str) -> Option<Endpoint> {
|
||||
(Some(a), Some(p)) => Some(Endpoint {
|
||||
address: a.to_string(),
|
||||
port: p,
|
||||
codec_pt,
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
|
||||
@@ -9,9 +9,11 @@ pub mod dialog;
|
||||
pub mod helpers;
|
||||
pub mod rewrite;
|
||||
|
||||
/// Network endpoint (address + port).
|
||||
/// Network endpoint (address + port + optional negotiated codec).
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Endpoint {
|
||||
pub address: String,
|
||||
pub port: u16,
|
||||
/// First payload type from the SDP `m=audio` line (the preferred codec).
|
||||
pub codec_pt: Option<u8>,
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ pub fn rewrite_sdp(body: &str, ip: &str, port: u16) -> (String, Option<Endpoint>
|
||||
.collect();
|
||||
|
||||
let original = match (orig_addr, orig_port) {
|
||||
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p }),
|
||||
(Some(a), Some(p)) => Some(Endpoint { address: a, port: p, codec_pt: None }),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
[package]
|
||||
name = "tts-engine"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[[bin]]
|
||||
name = "tts-engine"
|
||||
path = "src/main.rs"
|
||||
|
||||
[dependencies]
|
||||
kokoro-tts = { version = "0.3", default-features = false }
|
||||
# Pin to rc.11 matching kokoro-tts's expectation; enable vendored TLS to avoid system libssl-dev.
|
||||
ort = { version = "=2.0.0-rc.11", default-features = false, features = [
|
||||
"std", "download-binaries", "copy-dylibs", "ndarray",
|
||||
"tls-native-vendored"
|
||||
] }
|
||||
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
|
||||
hound = "3.5"
|
||||
@@ -1,149 +0,0 @@
|
||||
/// TTS engine CLI — synthesizes text to a WAV file using Kokoro neural TTS.
|
||||
///
|
||||
/// Usage:
|
||||
/// echo "Hello world" | tts-engine --model kokoro-v1.0.onnx --voices voices.bin --output out.wav
|
||||
/// tts-engine --model kokoro-v1.0.onnx --voices voices.bin --output out.wav --text "Hello world"
|
||||
///
|
||||
/// Outputs 24kHz 16-bit mono WAV.
|
||||
|
||||
use kokoro_tts::{KokoroTts, Voice};
|
||||
use std::io::{self, Read};
|
||||
|
||||
fn parse_args() -> Result<(String, String, String, String, Option<String>), String> {
|
||||
let args: Vec<String> = std::env::args().collect();
|
||||
let mut model = String::new();
|
||||
let mut voices = String::new();
|
||||
let mut output = String::new();
|
||||
let mut text: Option<String> = None;
|
||||
let mut voice_name: Option<String> = None;
|
||||
|
||||
let mut i = 1;
|
||||
while i < args.len() {
|
||||
match args[i].as_str() {
|
||||
"--model" => { i += 1; model = args.get(i).cloned().unwrap_or_default(); }
|
||||
"--voices" => { i += 1; voices = args.get(i).cloned().unwrap_or_default(); }
|
||||
"--output" | "--output_file" => { i += 1; output = args.get(i).cloned().unwrap_or_default(); }
|
||||
"--text" => { i += 1; text = args.get(i).cloned(); }
|
||||
"--voice" => { i += 1; voice_name = args.get(i).cloned(); }
|
||||
_ => {}
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
|
||||
if model.is_empty() { return Err("--model required".into()); }
|
||||
if voices.is_empty() { return Err("--voices required".into()); }
|
||||
if output.is_empty() { return Err("--output required".into()); }
|
||||
|
||||
let voice_str = voice_name.unwrap_or_else(|| "af_bella".into());
|
||||
|
||||
Ok((model, voices, output, voice_str, text))
|
||||
}
|
||||
|
||||
fn select_voice(name: &str) -> Voice {
|
||||
match name {
|
||||
"af_bella" => Voice::AfBella(1.0),
|
||||
"af_heart" => Voice::AfHeart(1.0),
|
||||
"af_jessica" => Voice::AfJessica(1.0),
|
||||
"af_nicole" => Voice::AfNicole(1.0),
|
||||
"af_nova" => Voice::AfNova(1.0),
|
||||
"af_sarah" => Voice::AfSarah(1.0),
|
||||
"af_sky" => Voice::AfSky(1.0),
|
||||
"af_river" => Voice::AfRiver(1.0),
|
||||
"af_alloy" => Voice::AfAlloy(1.0),
|
||||
"af_aoede" => Voice::AfAoede(1.0),
|
||||
"af_kore" => Voice::AfKore(1.0),
|
||||
"am_adam" => Voice::AmAdam(1.0),
|
||||
"am_echo" => Voice::AmEcho(1.0),
|
||||
"am_eric" => Voice::AmEric(1.0),
|
||||
"am_fenrir" => Voice::AmFenrir(1.0),
|
||||
"am_liam" => Voice::AmLiam(1.0),
|
||||
"am_michael" => Voice::AmMichael(1.0),
|
||||
"am_onyx" => Voice::AmOnyx(1.0),
|
||||
"am_puck" => Voice::AmPuck(1.0),
|
||||
"bf_alice" => Voice::BfAlice(1.0),
|
||||
"bf_emma" => Voice::BfEmma(1.0),
|
||||
"bf_isabella" => Voice::BfIsabella(1.0),
|
||||
"bf_lily" => Voice::BfLily(1.0),
|
||||
"bm_daniel" => Voice::BmDaniel(1.0),
|
||||
"bm_fable" => Voice::BmFable(1.0),
|
||||
"bm_george" => Voice::BmGeorge(1.0),
|
||||
"bm_lewis" => Voice::BmLewis(1.0),
|
||||
_ => {
|
||||
eprintln!("[tts-engine] unknown voice '{}', falling back to af_bella", name);
|
||||
Voice::AfBella(1.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let (model_path, voices_path, output_path, voice_name, text_arg) = match parse_args() {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
eprintln!("Error: {}", e);
|
||||
eprintln!("Usage: tts-engine --model <model.onnx> --voices <voices.bin> --output <output.wav> [--text <text>] [--voice <voice_name>]");
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
// Get text from --text arg or stdin.
|
||||
let text = match text_arg {
|
||||
Some(t) => t,
|
||||
None => {
|
||||
let mut buf = String::new();
|
||||
io::stdin().read_to_string(&mut buf).expect("failed to read stdin");
|
||||
buf.trim().to_string()
|
||||
}
|
||||
};
|
||||
|
||||
if text.is_empty() {
|
||||
eprintln!("[tts-engine] no text provided");
|
||||
std::process::exit(1);
|
||||
}
|
||||
|
||||
eprintln!("[tts-engine] loading model: {}", model_path);
|
||||
let tts = match KokoroTts::new(&model_path, &voices_path).await {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
eprintln!("[tts-engine] failed to load model: {:?}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
let voice = select_voice(&voice_name);
|
||||
eprintln!("[tts-engine] synthesizing with voice '{}': \"{}\"", voice_name, text);
|
||||
|
||||
let (samples, duration) = match tts.synth(&text, voice).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
eprintln!("[tts-engine] synthesis failed: {:?}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
eprintln!("[tts-engine] synthesized {} samples in {:?}", samples.len(), duration);
|
||||
|
||||
// Write WAV: 24kHz, 16-bit, mono (same format announcement.ts expects).
|
||||
let spec = hound::WavSpec {
|
||||
channels: 1,
|
||||
sample_rate: 24000,
|
||||
bits_per_sample: 16,
|
||||
sample_format: hound::SampleFormat::Int,
|
||||
};
|
||||
|
||||
let mut writer = match hound::WavWriter::create(&output_path, spec) {
|
||||
Ok(w) => w,
|
||||
Err(e) => {
|
||||
eprintln!("[tts-engine] failed to create WAV: {}", e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
for &sample in &samples {
|
||||
let s16 = (sample * 32767.0).round().clamp(-32768.0, 32767.0) as i16;
|
||||
writer.write_sample(s16).unwrap();
|
||||
}
|
||||
writer.finalize().unwrap();
|
||||
|
||||
eprintln!("[tts-engine] wrote {}", output_path);
|
||||
}
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: 'siprouter',
|
||||
version: '1.15.0',
|
||||
version: '1.17.1',
|
||||
description: 'undefined'
|
||||
}
|
||||
|
||||
@@ -1,59 +1,22 @@
|
||||
/**
|
||||
* TTS announcement module — pre-generates audio announcements using espeak-ng
|
||||
* and caches them as encoded RTP packets for playback during call setup.
|
||||
* TTS announcement module — generates announcement WAV files at startup.
|
||||
*
|
||||
* On startup, generates the announcement WAV via espeak-ng (formant-based TTS
|
||||
* with highly accurate pronunciation), encodes each 20ms frame to G.722 (for
|
||||
* SIP) and Opus (for WebRTC) via the Rust transcoder, and caches the packets.
|
||||
* Engine priority: espeak-ng (formant TTS, fast) → Kokoro neural TTS via
|
||||
* proxy-engine → disabled.
|
||||
*
|
||||
* Falls back to the Rust tts-engine (Kokoro neural TTS) if espeak-ng is not
|
||||
* installed, and disables announcements if neither is available.
|
||||
* The generated WAV is left on disk for Rust's audio_player / start_interaction
|
||||
* to play during calls. No encoding or RTP playback happens in TypeScript.
|
||||
*/
|
||||
|
||||
import { execSync } from 'node:child_process';
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { Buffer } from 'node:buffer';
|
||||
import { encodePcm, isCodecReady } from './opusbridge.ts';
|
||||
|
||||
/** RTP clock increment per 20ms frame for each codec. */
|
||||
function rtpClockIncrement(pt: number): number {
|
||||
if (pt === 111) return 960;
|
||||
if (pt === 9) return 160;
|
||||
return 160;
|
||||
}
|
||||
|
||||
/** Build a fresh RTP header. */
|
||||
function buildRtpHeader(pt: number, seq: number, ts: number, ssrc: number, marker: boolean): Buffer {
|
||||
const hdr = Buffer.alloc(12);
|
||||
hdr[0] = 0x80;
|
||||
hdr[1] = (marker ? 0x80 : 0) | (pt & 0x7f);
|
||||
hdr.writeUInt16BE(seq & 0xffff, 2);
|
||||
hdr.writeUInt32BE(ts >>> 0, 4);
|
||||
hdr.writeUInt32BE(ssrc >>> 0, 8);
|
||||
return hdr;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** A pre-encoded announcement ready for RTP playback. */
|
||||
export interface IAnnouncementCache {
|
||||
/** G.722 encoded frames (each is a 20ms frame payload, no RTP header). */
|
||||
g722Frames: Buffer[];
|
||||
/** Opus encoded frames for WebRTC playback. */
|
||||
opusFrames: Buffer[];
|
||||
/** Total duration in milliseconds. */
|
||||
durationMs: number;
|
||||
}
|
||||
import { sendProxyCommand, isProxyReady } from './proxybridge.ts';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// State
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let cachedAnnouncement: IAnnouncementCache | null = null;
|
||||
|
||||
const TTS_DIR = path.join(process.cwd(), '.nogit', 'tts');
|
||||
const ANNOUNCEMENT_TEXT = "Hello. I'm connecting your call now.";
|
||||
const CACHE_WAV = path.join(TTS_DIR, 'announcement.wav');
|
||||
@@ -64,12 +27,10 @@ const KOKORO_VOICES = 'voices.bin';
|
||||
const KOKORO_VOICE = 'af_bella';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Initialization
|
||||
// TTS generators
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Check if espeak-ng is available on the system.
|
||||
*/
|
||||
/** Check if espeak-ng is available on the system. */
|
||||
function isEspeakAvailable(): boolean {
|
||||
try {
|
||||
execSync('which espeak-ng', { stdio: 'pipe' });
|
||||
@@ -79,10 +40,7 @@ function isEspeakAvailable(): boolean {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate announcement WAV via espeak-ng (primary engine).
|
||||
* Returns true on success.
|
||||
*/
|
||||
/** Generate announcement WAV via espeak-ng (primary engine). */
|
||||
function generateViaEspeak(wavPath: string, text: string, log: (msg: string) => void): boolean {
|
||||
log('[tts] generating announcement audio via espeak-ng...');
|
||||
try {
|
||||
@@ -98,11 +56,8 @@ function generateViaEspeak(wavPath: string, text: string, log: (msg: string) =>
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate announcement WAV via Kokoro TTS (fallback engine).
|
||||
* Returns true on success.
|
||||
*/
|
||||
function generateViaKokoro(wavPath: string, text: string, log: (msg: string) => void): boolean {
|
||||
/** Generate announcement WAV via Kokoro TTS (fallback, runs inside proxy-engine). */
|
||||
async function generateViaKokoro(wavPath: string, text: string, log: (msg: string) => void): Promise<boolean> {
|
||||
const modelPath = path.join(TTS_DIR, KOKORO_MODEL);
|
||||
const voicesPath = path.join(TTS_DIR, KOKORO_VOICES);
|
||||
|
||||
@@ -111,25 +66,21 @@ function generateViaKokoro(wavPath: string, text: string, log: (msg: string) =>
|
||||
return false;
|
||||
}
|
||||
|
||||
const root = process.cwd();
|
||||
const ttsBinPaths = [
|
||||
path.join(root, 'dist_rust', 'tts-engine'),
|
||||
path.join(root, 'rust', 'target', 'release', 'tts-engine'),
|
||||
path.join(root, 'rust', 'target', 'debug', 'tts-engine'),
|
||||
];
|
||||
const ttsBin = ttsBinPaths.find((p) => fs.existsSync(p));
|
||||
if (!ttsBin) {
|
||||
log('[tts] tts-engine binary not found — Kokoro fallback unavailable');
|
||||
if (!isProxyReady()) {
|
||||
log('[tts] proxy-engine not ready — Kokoro fallback unavailable');
|
||||
return false;
|
||||
}
|
||||
|
||||
log('[tts] generating announcement audio via Kokoro TTS (fallback)...');
|
||||
try {
|
||||
execSync(
|
||||
`"${ttsBin}" --model "${modelPath}" --voices "${voicesPath}" --voice "${KOKORO_VOICE}" --output "${wavPath}" --text "${text}"`,
|
||||
{ timeout: 120000, stdio: 'pipe' },
|
||||
);
|
||||
log('[tts] Kokoro WAV generated');
|
||||
await sendProxyCommand('generate_tts', {
|
||||
model: modelPath,
|
||||
voices: voicesPath,
|
||||
voice: KOKORO_VOICE,
|
||||
text,
|
||||
output: wavPath,
|
||||
});
|
||||
log('[tts] Kokoro WAV generated (via proxy-engine)');
|
||||
return true;
|
||||
} catch (e: any) {
|
||||
log(`[tts] Kokoro failed: ${e.message}`);
|
||||
@@ -137,40 +88,13 @@ function generateViaKokoro(wavPath: string, text: string, log: (msg: string) =>
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a WAV file and detect its sample rate from the fmt chunk.
|
||||
* Returns { pcm, sampleRate } or null on failure.
|
||||
*/
|
||||
function readWavWithRate(wavPath: string): { pcm: Buffer; sampleRate: number } | null {
|
||||
const wav = fs.readFileSync(wavPath);
|
||||
if (wav.length < 44) return null;
|
||||
if (wav.toString('ascii', 0, 4) !== 'RIFF') return null;
|
||||
if (wav.toString('ascii', 8, 12) !== 'WAVE') return null;
|
||||
|
||||
let sampleRate = 22050; // default
|
||||
let offset = 12;
|
||||
let pcm: Buffer | null = null;
|
||||
|
||||
while (offset < wav.length - 8) {
|
||||
const chunkId = wav.toString('ascii', offset, offset + 4);
|
||||
const chunkSize = wav.readUInt32LE(offset + 4);
|
||||
if (chunkId === 'fmt ') {
|
||||
sampleRate = wav.readUInt32LE(offset + 12);
|
||||
}
|
||||
if (chunkId === 'data') {
|
||||
pcm = wav.subarray(offset + 8, offset + 8 + chunkSize);
|
||||
}
|
||||
offset += 8 + chunkSize;
|
||||
if (offset % 2 !== 0) offset++;
|
||||
}
|
||||
|
||||
if (!pcm) return null;
|
||||
return { pcm, sampleRate };
|
||||
}
|
||||
// ---------------------------------------------------------------------------
|
||||
// Initialization
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Pre-generate the announcement audio and encode to G.722 + Opus frames.
|
||||
* Must be called after the codec bridge is initialized.
|
||||
* Pre-generate the announcement WAV file.
|
||||
* Must be called after the proxy engine is initialized.
|
||||
*
|
||||
* Engine priority: espeak-ng → Kokoro → disabled.
|
||||
*/
|
||||
@@ -178,7 +102,6 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
|
||||
fs.mkdirSync(TTS_DIR, { recursive: true });
|
||||
|
||||
try {
|
||||
// Generate WAV if not cached.
|
||||
if (!fs.existsSync(CACHE_WAV)) {
|
||||
let generated = false;
|
||||
|
||||
@@ -189,9 +112,9 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
|
||||
log('[tts] espeak-ng not installed — trying Kokoro fallback');
|
||||
}
|
||||
|
||||
// Fall back to Kokoro.
|
||||
// Fall back to Kokoro (via proxy-engine).
|
||||
if (!generated) {
|
||||
generated = generateViaKokoro(CACHE_WAV, ANNOUNCEMENT_TEXT, log);
|
||||
generated = await generateViaKokoro(CACHE_WAV, ANNOUNCEMENT_TEXT, log);
|
||||
}
|
||||
|
||||
if (!generated) {
|
||||
@@ -200,49 +123,7 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
|
||||
}
|
||||
}
|
||||
|
||||
// Read WAV and extract raw PCM + sample rate.
|
||||
const result = readWavWithRate(CACHE_WAV);
|
||||
if (!result) {
|
||||
log('[tts] failed to parse WAV file');
|
||||
return false;
|
||||
}
|
||||
|
||||
const { pcm, sampleRate } = result;
|
||||
|
||||
// Wait for codec bridge to be ready.
|
||||
if (!isCodecReady()) {
|
||||
log('[tts] codec bridge not ready — will retry');
|
||||
return false;
|
||||
}
|
||||
|
||||
// Encode in 20ms chunks. The Rust encoder resamples to each codec's native rate.
|
||||
const FRAME_SAMPLES = Math.floor(sampleRate * 0.02);
|
||||
const FRAME_BYTES = FRAME_SAMPLES * 2; // 16-bit = 2 bytes per sample
|
||||
const totalFrames = Math.floor(pcm.length / FRAME_BYTES);
|
||||
|
||||
const g722Frames: Buffer[] = [];
|
||||
const opusFrames: Buffer[] = [];
|
||||
|
||||
log(`[tts] encoding ${totalFrames} frames (${FRAME_SAMPLES} samples/frame @ ${sampleRate}Hz)...`);
|
||||
for (let i = 0; i < totalFrames; i++) {
|
||||
const framePcm = pcm.subarray(i * FRAME_BYTES, (i + 1) * FRAME_BYTES);
|
||||
const pcmBuf = Buffer.from(framePcm);
|
||||
const [g722, opus] = await Promise.all([
|
||||
encodePcm(pcmBuf, sampleRate, 9), // G.722 for SIP devices
|
||||
encodePcm(pcmBuf, sampleRate, 111), // Opus for WebRTC browsers
|
||||
]);
|
||||
if (g722) g722Frames.push(g722);
|
||||
if (opus) opusFrames.push(opus);
|
||||
if (!g722 && !opus && i < 3) log(`[tts] frame ${i} encode failed`);
|
||||
}
|
||||
|
||||
cachedAnnouncement = {
|
||||
g722Frames,
|
||||
opusFrames,
|
||||
durationMs: totalFrames * 20,
|
||||
};
|
||||
|
||||
log(`[tts] announcement cached: ${g722Frames.length} frames (${(totalFrames * 20 / 1000).toFixed(1)}s)`);
|
||||
log('[tts] announcement WAV ready');
|
||||
return true;
|
||||
} catch (e: any) {
|
||||
log(`[tts] init error: ${e.message}`);
|
||||
@@ -250,100 +131,7 @@ export async function initAnnouncement(log: (msg: string) => void): Promise<bool
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Playback
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Play the pre-cached announcement to an RTP endpoint.
|
||||
*
|
||||
* @param sendPacket - function to send a raw RTP packet
|
||||
* @param ssrc - SSRC to use in RTP headers
|
||||
* @param onDone - called when the announcement finishes
|
||||
* @returns a cancel function, or null if no announcement is cached
|
||||
*/
|
||||
export function playAnnouncement(
|
||||
sendPacket: (pkt: Buffer) => void,
|
||||
ssrc: number,
|
||||
onDone?: () => void,
|
||||
): (() => void) | null {
|
||||
if (!cachedAnnouncement || cachedAnnouncement.g722Frames.length === 0) {
|
||||
onDone?.();
|
||||
return null;
|
||||
}
|
||||
|
||||
const frames = cachedAnnouncement.g722Frames;
|
||||
const PT = 9; // G.722
|
||||
let frameIdx = 0;
|
||||
let seq = Math.floor(Math.random() * 0xffff);
|
||||
let rtpTs = Math.floor(Math.random() * 0xffffffff);
|
||||
|
||||
const timer = setInterval(() => {
|
||||
if (frameIdx >= frames.length) {
|
||||
clearInterval(timer);
|
||||
onDone?.();
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = frames[frameIdx];
|
||||
const hdr = buildRtpHeader(PT, seq & 0xffff, rtpTs >>> 0, ssrc >>> 0, frameIdx === 0);
|
||||
const pkt = Buffer.concat([hdr, payload]);
|
||||
sendPacket(pkt);
|
||||
|
||||
seq++;
|
||||
rtpTs += rtpClockIncrement(PT);
|
||||
frameIdx++;
|
||||
}, 20);
|
||||
|
||||
// Return cancel function.
|
||||
return () => clearInterval(timer);
|
||||
/** Get the path to the cached announcement WAV, or null if not generated. */
|
||||
export function getAnnouncementWavPath(): string | null {
|
||||
return fs.existsSync(CACHE_WAV) ? CACHE_WAV : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Play pre-cached Opus announcement to a WebRTC PeerConnection sender.
|
||||
*
|
||||
* @param sendRtpPacket - function to send a raw RTP packet via sender.sendRtp()
|
||||
* @param ssrc - SSRC to use in RTP headers
|
||||
* @param onDone - called when announcement finishes
|
||||
* @returns cancel function, or null if no announcement cached
|
||||
*/
|
||||
export function playAnnouncementToWebRtc(
|
||||
sendRtpPacket: (pkt: Buffer) => void,
|
||||
ssrc: number,
|
||||
counters: { seq: number; ts: number },
|
||||
onDone?: () => void,
|
||||
): (() => void) | null {
|
||||
if (!cachedAnnouncement || cachedAnnouncement.opusFrames.length === 0) {
|
||||
onDone?.();
|
||||
return null;
|
||||
}
|
||||
|
||||
const frames = cachedAnnouncement.opusFrames;
|
||||
const PT = 111; // Opus
|
||||
let frameIdx = 0;
|
||||
|
||||
const timer = setInterval(() => {
|
||||
if (frameIdx >= frames.length) {
|
||||
clearInterval(timer);
|
||||
onDone?.();
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = frames[frameIdx];
|
||||
const hdr = buildRtpHeader(PT, counters.seq & 0xffff, counters.ts >>> 0, ssrc >>> 0, frameIdx === 0);
|
||||
const pkt = Buffer.concat([hdr, payload]);
|
||||
sendRtpPacket(pkt);
|
||||
|
||||
counters.seq++;
|
||||
counters.ts += 960; // Opus at 48kHz: 960 samples per 20ms
|
||||
frameIdx++;
|
||||
}, 20);
|
||||
|
||||
return () => clearInterval(timer);
|
||||
}
|
||||
|
||||
/** Check if an announcement is cached and ready. */
|
||||
export function isAnnouncementReady(): boolean {
|
||||
return cachedAnnouncement !== null && cachedAnnouncement.g722Frames.length > 0;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,55 +1,31 @@
|
||||
/**
|
||||
* PromptCache — manages multiple named audio prompts for IVR and voicemail.
|
||||
* PromptCache — manages named audio prompt WAV files for IVR and voicemail.
|
||||
*
|
||||
* Each prompt is pre-encoded as both G.722 frames (for SIP legs) and Opus
|
||||
* frames (for WebRTC legs), ready for 20ms RTP playback.
|
||||
* Generates WAV files via espeak-ng (primary) or Kokoro TTS through the
|
||||
* proxy-engine (fallback). Also supports loading pre-existing WAV files
|
||||
* and programmatic tone generation.
|
||||
*
|
||||
* Supports three sources:
|
||||
* 1. TTS generation via espeak-ng (primary) or Kokoro (fallback)
|
||||
* 2. Loading from a pre-existing WAV file
|
||||
* 3. Programmatic tone generation (beep, etc.)
|
||||
*
|
||||
* The existing announcement.ts system continues to work independently;
|
||||
* this module provides generalized prompt management for IVR/voicemail.
|
||||
* All audio playback happens in Rust (audio_player / start_interaction).
|
||||
* This module only manages WAV files on disk.
|
||||
*/
|
||||
|
||||
import { execSync } from 'node:child_process';
|
||||
import fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { Buffer } from 'node:buffer';
|
||||
import { encodePcm, isCodecReady } from '../opusbridge.ts';
|
||||
|
||||
/** RTP clock increment per 20ms frame for each codec. */
|
||||
function rtpClockIncrement(pt: number): number {
|
||||
if (pt === 111) return 960;
|
||||
if (pt === 9) return 160;
|
||||
return 160;
|
||||
}
|
||||
|
||||
/** Build a fresh RTP header. */
|
||||
function buildRtpHeader(pt: number, seq: number, ts: number, ssrc: number, marker: boolean): Buffer {
|
||||
const hdr = Buffer.alloc(12);
|
||||
hdr[0] = 0x80;
|
||||
hdr[1] = (marker ? 0x80 : 0) | (pt & 0x7f);
|
||||
hdr.writeUInt16BE(seq & 0xffff, 2);
|
||||
hdr.writeUInt32BE(ts >>> 0, 4);
|
||||
hdr.writeUInt32BE(ssrc >>> 0, 8);
|
||||
return hdr;
|
||||
}
|
||||
import { sendProxyCommand, isProxyReady } from '../proxybridge.ts';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** A pre-encoded prompt ready for RTP playback. */
|
||||
/** A cached prompt — just a WAV file path and metadata. */
|
||||
export interface ICachedPrompt {
|
||||
/** Unique prompt identifier. */
|
||||
id: string;
|
||||
/** G.722 encoded frames (20ms each, no RTP header). */
|
||||
g722Frames: Buffer[];
|
||||
/** Opus encoded frames (20ms each, no RTP header). */
|
||||
opusFrames: Buffer[];
|
||||
/** Total duration in milliseconds. */
|
||||
/** Path to the WAV file on disk. */
|
||||
wavPath: string;
|
||||
/** Total duration in milliseconds (approximate, from WAV header). */
|
||||
durationMs: number;
|
||||
}
|
||||
|
||||
@@ -82,84 +58,61 @@ function generateViaEspeak(wavPath: string, text: string): boolean {
|
||||
}
|
||||
}
|
||||
|
||||
/** Generate WAV via Kokoro TTS. */
|
||||
function generateViaKokoro(wavPath: string, text: string, voice: string): boolean {
|
||||
/** Generate WAV via Kokoro TTS (runs inside proxy-engine). */
|
||||
async function generateViaKokoro(wavPath: string, text: string, voice: string): Promise<boolean> {
|
||||
const modelPath = path.join(TTS_DIR, 'kokoro-v1.0.onnx');
|
||||
const voicesPath = path.join(TTS_DIR, 'voices.bin');
|
||||
if (!fs.existsSync(modelPath) || !fs.existsSync(voicesPath)) return false;
|
||||
|
||||
const root = process.cwd();
|
||||
const ttsBin = [
|
||||
path.join(root, 'dist_rust', 'tts-engine'),
|
||||
path.join(root, 'rust', 'target', 'release', 'tts-engine'),
|
||||
path.join(root, 'rust', 'target', 'debug', 'tts-engine'),
|
||||
].find((p) => fs.existsSync(p));
|
||||
if (!ttsBin) return false;
|
||||
if (!isProxyReady()) return false;
|
||||
|
||||
try {
|
||||
execSync(
|
||||
`"${ttsBin}" --model "${modelPath}" --voices "${voicesPath}" --voice "${voice}" --output "${wavPath}" --text "${text}"`,
|
||||
{ timeout: 120000, stdio: 'pipe' },
|
||||
);
|
||||
await sendProxyCommand('generate_tts', {
|
||||
model: modelPath,
|
||||
voices: voicesPath,
|
||||
voice,
|
||||
text,
|
||||
output: wavPath,
|
||||
});
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/** Read a WAV file and return raw PCM + sample rate. */
|
||||
function readWavWithRate(wavPath: string): { pcm: Buffer; sampleRate: number } | null {
|
||||
const wav = fs.readFileSync(wavPath);
|
||||
if (wav.length < 44) return null;
|
||||
if (wav.toString('ascii', 0, 4) !== 'RIFF') return null;
|
||||
if (wav.toString('ascii', 8, 12) !== 'WAVE') return null;
|
||||
/** Read a WAV file's duration from its header. */
|
||||
function getWavDurationMs(wavPath: string): number {
|
||||
try {
|
||||
const wav = fs.readFileSync(wavPath);
|
||||
if (wav.length < 44) return 0;
|
||||
if (wav.toString('ascii', 0, 4) !== 'RIFF') return 0;
|
||||
|
||||
let sampleRate = 22050;
|
||||
let pcm: Buffer | null = null;
|
||||
let offset = 12;
|
||||
let sampleRate = 16000;
|
||||
let dataSize = 0;
|
||||
let bitsPerSample = 16;
|
||||
let channels = 1;
|
||||
let offset = 12;
|
||||
|
||||
while (offset < wav.length - 8) {
|
||||
const chunkId = wav.toString('ascii', offset, offset + 4);
|
||||
const chunkSize = wav.readUInt32LE(offset + 4);
|
||||
if (chunkId === 'fmt ') {
|
||||
sampleRate = wav.readUInt32LE(offset + 12);
|
||||
while (offset < wav.length - 8) {
|
||||
const chunkId = wav.toString('ascii', offset, offset + 4);
|
||||
const chunkSize = wav.readUInt32LE(offset + 4);
|
||||
if (chunkId === 'fmt ') {
|
||||
channels = wav.readUInt16LE(offset + 10);
|
||||
sampleRate = wav.readUInt32LE(offset + 12);
|
||||
bitsPerSample = wav.readUInt16LE(offset + 22);
|
||||
}
|
||||
if (chunkId === 'data') {
|
||||
dataSize = chunkSize;
|
||||
}
|
||||
offset += 8 + chunkSize;
|
||||
if (offset % 2 !== 0) offset++;
|
||||
}
|
||||
if (chunkId === 'data') {
|
||||
pcm = wav.subarray(offset + 8, offset + 8 + chunkSize);
|
||||
}
|
||||
offset += 8 + chunkSize;
|
||||
if (offset % 2 !== 0) offset++;
|
||||
|
||||
const bytesPerSample = (bitsPerSample / 8) * channels;
|
||||
const totalSamples = bytesPerSample > 0 ? dataSize / bytesPerSample : 0;
|
||||
return sampleRate > 0 ? Math.round((totalSamples / sampleRate) * 1000) : 0;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return pcm ? { pcm, sampleRate } : null;
|
||||
}
|
||||
|
||||
/** Encode raw PCM frames to G.722 + Opus. */
|
||||
async function encodePcmFrames(
|
||||
pcm: Buffer,
|
||||
sampleRate: number,
|
||||
log: (msg: string) => void,
|
||||
): Promise<{ g722Frames: Buffer[]; opusFrames: Buffer[] } | null> {
|
||||
if (!isCodecReady()) return null;
|
||||
|
||||
const frameSamples = Math.floor(sampleRate * 0.02); // 20ms
|
||||
const frameBytes = frameSamples * 2; // 16-bit
|
||||
const totalFrames = Math.floor(pcm.length / frameBytes);
|
||||
|
||||
const g722Frames: Buffer[] = [];
|
||||
const opusFrames: Buffer[] = [];
|
||||
|
||||
for (let i = 0; i < totalFrames; i++) {
|
||||
const framePcm = Buffer.from(pcm.subarray(i * frameBytes, (i + 1) * frameBytes));
|
||||
const [g722, opus] = await Promise.all([
|
||||
encodePcm(framePcm, sampleRate, 9), // G.722
|
||||
encodePcm(framePcm, sampleRate, 111), // Opus
|
||||
]);
|
||||
if (g722) g722Frames.push(g722);
|
||||
if (opus) opusFrames.push(opus);
|
||||
}
|
||||
|
||||
return { g722Frames, opusFrames };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -195,7 +148,7 @@ export class PromptCache {
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a TTS prompt and cache it.
|
||||
* Generate a TTS prompt WAV and cache its path.
|
||||
* Uses espeak-ng (primary) or Kokoro (fallback).
|
||||
*/
|
||||
async generatePrompt(id: string, text: string, voice = 'af_bella'): Promise<ICachedPrompt | null> {
|
||||
@@ -207,14 +160,14 @@ export class PromptCache {
|
||||
this.espeakAvailable = isEspeakAvailable();
|
||||
}
|
||||
|
||||
// Generate WAV.
|
||||
let generated = false;
|
||||
// Generate WAV if not already on disk.
|
||||
if (!fs.existsSync(wavPath)) {
|
||||
let generated = false;
|
||||
if (this.espeakAvailable) {
|
||||
generated = generateViaEspeak(wavPath, text);
|
||||
}
|
||||
if (!generated) {
|
||||
generated = generateViaKokoro(wavPath, text, voice);
|
||||
generated = await generateViaKokoro(wavPath, text, voice);
|
||||
}
|
||||
if (!generated) {
|
||||
this.log(`[prompt-cache] failed to generate TTS for "${id}"`);
|
||||
@@ -223,49 +176,22 @@ export class PromptCache {
|
||||
this.log(`[prompt-cache] generated WAV for "${id}"`);
|
||||
}
|
||||
|
||||
return this.loadWavPrompt(id, wavPath);
|
||||
return this.registerWav(id, wavPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Load a WAV file as a prompt and cache it.
|
||||
* Load a pre-existing WAV file as a prompt.
|
||||
*/
|
||||
async loadWavPrompt(id: string, wavPath: string): Promise<ICachedPrompt | null> {
|
||||
if (!fs.existsSync(wavPath)) {
|
||||
this.log(`[prompt-cache] WAV not found: ${wavPath}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const result = readWavWithRate(wavPath);
|
||||
if (!result) {
|
||||
this.log(`[prompt-cache] failed to parse WAV: ${wavPath}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const encoded = await encodePcmFrames(result.pcm, result.sampleRate, this.log);
|
||||
if (!encoded) {
|
||||
this.log(`[prompt-cache] encoding failed for "${id}" (codec bridge not ready?)`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const durationMs = encoded.g722Frames.length * 20;
|
||||
const prompt: ICachedPrompt = {
|
||||
id,
|
||||
g722Frames: encoded.g722Frames,
|
||||
opusFrames: encoded.opusFrames,
|
||||
durationMs,
|
||||
};
|
||||
|
||||
this.prompts.set(id, prompt);
|
||||
this.log(`[prompt-cache] cached "${id}": ${encoded.g722Frames.length} frames (${(durationMs / 1000).toFixed(1)}s)`);
|
||||
return prompt;
|
||||
return this.registerWav(id, wavPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a beep tone prompt (sine wave).
|
||||
* @param id - prompt ID
|
||||
* @param freqHz - tone frequency (default 1000 Hz)
|
||||
* @param durationMs - tone duration (default 500ms)
|
||||
* @param amplitude - 16-bit amplitude (default 8000)
|
||||
* Generate a beep tone WAV and cache it.
|
||||
*/
|
||||
async generateBeep(
|
||||
id: string,
|
||||
@@ -273,149 +199,77 @@ export class PromptCache {
|
||||
durationMs = 500,
|
||||
amplitude = 8000,
|
||||
): Promise<ICachedPrompt | null> {
|
||||
// Generate at 16kHz for decent quality.
|
||||
const sampleRate = 16000;
|
||||
const totalSamples = Math.floor((sampleRate * durationMs) / 1000);
|
||||
const pcm = Buffer.alloc(totalSamples * 2);
|
||||
fs.mkdirSync(TTS_DIR, { recursive: true });
|
||||
const wavPath = path.join(TTS_DIR, `prompt-${id}.wav`);
|
||||
|
||||
for (let i = 0; i < totalSamples; i++) {
|
||||
const t = i / sampleRate;
|
||||
// Apply a short fade-in/fade-out to avoid click artifacts.
|
||||
const fadeLen = Math.floor(sampleRate * 0.01); // 10ms fade
|
||||
let envelope = 1.0;
|
||||
if (i < fadeLen) envelope = i / fadeLen;
|
||||
else if (i > totalSamples - fadeLen) envelope = (totalSamples - i) / fadeLen;
|
||||
if (!fs.existsSync(wavPath)) {
|
||||
// Generate 16kHz 16-bit mono sine wave WAV.
|
||||
const sampleRate = 16000;
|
||||
const totalSamples = Math.floor((sampleRate * durationMs) / 1000);
|
||||
const pcm = Buffer.alloc(totalSamples * 2);
|
||||
|
||||
const sample = Math.round(Math.sin(2 * Math.PI * freqHz * t) * amplitude * envelope);
|
||||
pcm.writeInt16LE(Math.max(-32768, Math.min(32767, sample)), i * 2);
|
||||
for (let i = 0; i < totalSamples; i++) {
|
||||
const t = i / sampleRate;
|
||||
const fadeLen = Math.floor(sampleRate * 0.01); // 10ms fade
|
||||
let envelope = 1.0;
|
||||
if (i < fadeLen) envelope = i / fadeLen;
|
||||
else if (i > totalSamples - fadeLen) envelope = (totalSamples - i) / fadeLen;
|
||||
|
||||
const sample = Math.round(Math.sin(2 * Math.PI * freqHz * t) * amplitude * envelope);
|
||||
pcm.writeInt16LE(Math.max(-32768, Math.min(32767, sample)), i * 2);
|
||||
}
|
||||
|
||||
// Write WAV file.
|
||||
const headerSize = 44;
|
||||
const dataSize = pcm.length;
|
||||
const wav = Buffer.alloc(headerSize + dataSize);
|
||||
|
||||
// RIFF header
|
||||
wav.write('RIFF', 0);
|
||||
wav.writeUInt32LE(36 + dataSize, 4);
|
||||
wav.write('WAVE', 8);
|
||||
|
||||
// fmt chunk
|
||||
wav.write('fmt ', 12);
|
||||
wav.writeUInt32LE(16, 16); // chunk size
|
||||
wav.writeUInt16LE(1, 20); // PCM format
|
||||
wav.writeUInt16LE(1, 22); // mono
|
||||
wav.writeUInt32LE(sampleRate, 24);
|
||||
wav.writeUInt32LE(sampleRate * 2, 28); // byte rate
|
||||
wav.writeUInt16LE(2, 32); // block align
|
||||
wav.writeUInt16LE(16, 34); // bits per sample
|
||||
|
||||
// data chunk
|
||||
wav.write('data', 36);
|
||||
wav.writeUInt32LE(dataSize, 40);
|
||||
pcm.copy(wav, 44);
|
||||
|
||||
fs.writeFileSync(wavPath, wav);
|
||||
this.log(`[prompt-cache] beep WAV generated for "${id}"`);
|
||||
}
|
||||
|
||||
const encoded = await encodePcmFrames(pcm, sampleRate, this.log);
|
||||
if (!encoded) {
|
||||
this.log(`[prompt-cache] beep encoding failed for "${id}"`);
|
||||
return null;
|
||||
}
|
||||
|
||||
const actualDuration = encoded.g722Frames.length * 20;
|
||||
const prompt: ICachedPrompt = {
|
||||
id,
|
||||
g722Frames: encoded.g722Frames,
|
||||
opusFrames: encoded.opusFrames,
|
||||
durationMs: actualDuration,
|
||||
};
|
||||
|
||||
this.prompts.set(id, prompt);
|
||||
this.log(`[prompt-cache] beep "${id}" cached: ${actualDuration}ms @ ${freqHz}Hz`);
|
||||
return prompt;
|
||||
return this.registerWav(id, wavPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a prompt from the cache.
|
||||
*/
|
||||
/** Remove a prompt from the cache. */
|
||||
remove(id: string): void {
|
||||
this.prompts.delete(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all cached prompts.
|
||||
*/
|
||||
/** Clear all cached prompts. */
|
||||
clear(): void {
|
||||
this.prompts.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Standalone playback helpers (for use by SystemLeg)
|
||||
// ---------------------------------------------------------------------------
|
||||
// -------------------------------------------------------------------------
|
||||
// Internal
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Play a cached prompt's G.722 frames as RTP packets at 20ms intervals.
|
||||
*
|
||||
* @param prompt - the cached prompt to play
|
||||
* @param sendPacket - function to send a raw RTP packet (12-byte header + payload)
|
||||
* @param ssrc - SSRC for RTP headers
|
||||
* @param onDone - called when playback finishes
|
||||
* @returns cancel function, or null if prompt has no G.722 frames
|
||||
*/
|
||||
export function playPromptG722(
|
||||
prompt: ICachedPrompt,
|
||||
sendPacket: (pkt: Buffer) => void,
|
||||
ssrc: number,
|
||||
onDone?: () => void,
|
||||
): (() => void) | null {
|
||||
if (prompt.g722Frames.length === 0) {
|
||||
onDone?.();
|
||||
return null;
|
||||
private registerWav(id: string, wavPath: string): ICachedPrompt {
|
||||
const durationMs = getWavDurationMs(wavPath);
|
||||
const prompt: ICachedPrompt = { id, wavPath, durationMs };
|
||||
this.prompts.set(id, prompt);
|
||||
this.log(`[prompt-cache] cached "${id}": ${wavPath} (${(durationMs / 1000).toFixed(1)}s)`);
|
||||
return prompt;
|
||||
}
|
||||
|
||||
const frames = prompt.g722Frames;
|
||||
const PT = 9;
|
||||
let frameIdx = 0;
|
||||
let seq = Math.floor(Math.random() * 0xffff);
|
||||
let rtpTs = Math.floor(Math.random() * 0xffffffff);
|
||||
|
||||
const timer = setInterval(() => {
|
||||
if (frameIdx >= frames.length) {
|
||||
clearInterval(timer);
|
||||
onDone?.();
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = frames[frameIdx];
|
||||
const hdr = buildRtpHeader(PT, seq & 0xffff, rtpTs >>> 0, ssrc >>> 0, frameIdx === 0);
|
||||
const pkt = Buffer.concat([hdr, payload]);
|
||||
sendPacket(pkt);
|
||||
|
||||
seq++;
|
||||
rtpTs += rtpClockIncrement(PT);
|
||||
frameIdx++;
|
||||
}, 20);
|
||||
|
||||
return () => clearInterval(timer);
|
||||
}
|
||||
|
||||
/**
|
||||
* Play a cached prompt's Opus frames as RTP packets at 20ms intervals.
|
||||
*
|
||||
* @param prompt - the cached prompt to play
|
||||
* @param sendPacket - function to send a raw RTP packet
|
||||
* @param ssrc - SSRC for RTP headers
|
||||
* @param counters - shared seq/ts counters (mutated in place for seamless transitions)
|
||||
* @param onDone - called when playback finishes
|
||||
* @returns cancel function, or null if prompt has no Opus frames
|
||||
*/
|
||||
export function playPromptOpus(
|
||||
prompt: ICachedPrompt,
|
||||
sendPacket: (pkt: Buffer) => void,
|
||||
ssrc: number,
|
||||
counters: { seq: number; ts: number },
|
||||
onDone?: () => void,
|
||||
): (() => void) | null {
|
||||
if (prompt.opusFrames.length === 0) {
|
||||
onDone?.();
|
||||
return null;
|
||||
}
|
||||
|
||||
const frames = prompt.opusFrames;
|
||||
const PT = 111;
|
||||
let frameIdx = 0;
|
||||
|
||||
const timer = setInterval(() => {
|
||||
if (frameIdx >= frames.length) {
|
||||
clearInterval(timer);
|
||||
onDone?.();
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = frames[frameIdx];
|
||||
const hdr = buildRtpHeader(PT, counters.seq & 0xffff, counters.ts >>> 0, ssrc >>> 0, frameIdx === 0);
|
||||
const pkt = Buffer.concat([hdr, payload]);
|
||||
sendPacket(pkt);
|
||||
|
||||
counters.seq++;
|
||||
counters.ts += 960; // Opus 48kHz: 960 samples per 20ms
|
||||
frameIdx++;
|
||||
}, 20);
|
||||
|
||||
return () => clearInterval(timer);
|
||||
}
|
||||
|
||||
199
ts/opusbridge.ts
199
ts/opusbridge.ts
@@ -1,199 +0,0 @@
|
||||
/**
|
||||
* Audio transcoding bridge — uses smartrust to communicate with the Rust
|
||||
* opus-codec binary, which handles Opus ↔ G.722 ↔ PCMU/PCMA transcoding.
|
||||
*
|
||||
* All codec conversion happens in Rust (libopus + SpanDSP G.722 port).
|
||||
* The TypeScript side just passes raw payloads back and forth.
|
||||
*/
|
||||
|
||||
import path from 'node:path';
|
||||
import { RustBridge } from '@push.rocks/smartrust';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Command type map for smartrust
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type TCodecCommands = {
|
||||
init: {
|
||||
params: Record<string, never>;
|
||||
result: Record<string, never>;
|
||||
};
|
||||
create_session: {
|
||||
params: { session_id: string };
|
||||
result: Record<string, never>;
|
||||
};
|
||||
destroy_session: {
|
||||
params: { session_id: string };
|
||||
result: Record<string, never>;
|
||||
};
|
||||
transcode: {
|
||||
params: { data_b64: string; from_pt: number; to_pt: number; session_id?: string; direction?: string };
|
||||
result: { data_b64: string };
|
||||
};
|
||||
encode_pcm: {
|
||||
params: { data_b64: string; sample_rate: number; to_pt: number; session_id?: string };
|
||||
result: { data_b64: string };
|
||||
};
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Bridge singleton
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
let bridge: RustBridge<TCodecCommands> | null = null;
|
||||
let initialized = false;
|
||||
|
||||
function buildLocalPaths(): string[] {
|
||||
const root = process.cwd();
|
||||
return [
|
||||
path.join(root, 'dist_rust', 'opus-codec'),
|
||||
path.join(root, 'rust', 'target', 'release', 'opus-codec'),
|
||||
path.join(root, 'rust', 'target', 'debug', 'opus-codec'),
|
||||
];
|
||||
}
|
||||
|
||||
let logFn: ((msg: string) => void) | undefined;
|
||||
|
||||
/**
|
||||
* Initialize the audio transcoding bridge. Spawns the Rust binary.
|
||||
*/
|
||||
export async function initCodecBridge(log?: (msg: string) => void): Promise<boolean> {
|
||||
if (initialized && bridge) return true;
|
||||
logFn = log;
|
||||
|
||||
try {
|
||||
bridge = new RustBridge<TCodecCommands>({
|
||||
binaryName: 'opus-codec',
|
||||
localPaths: buildLocalPaths(),
|
||||
});
|
||||
|
||||
const spawned = await bridge.spawn();
|
||||
if (!spawned) {
|
||||
log?.('[codec] failed to spawn opus-codec binary');
|
||||
bridge = null;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Auto-restart: reset state when the Rust process exits so the next
|
||||
// transcode attempt triggers re-initialization instead of silent failure.
|
||||
bridge.on('exit', () => {
|
||||
logFn?.('[codec] Rust audio transcoder process exited — will re-init on next use');
|
||||
bridge = null;
|
||||
initialized = false;
|
||||
});
|
||||
|
||||
await bridge.sendCommand('init', {} as any);
|
||||
initialized = true;
|
||||
log?.('[codec] Rust audio transcoder initialized (Opus + G.722 + PCMU/PCMA)');
|
||||
return true;
|
||||
} catch (e: any) {
|
||||
log?.(`[codec] init error: ${e.message}`);
|
||||
bridge = null;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Session management — per-call codec isolation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Create an isolated codec session. Each session gets its own Opus/G.722
|
||||
* encoder/decoder state, preventing concurrent calls from corrupting each
|
||||
* other's stateful codec predictions.
|
||||
*/
|
||||
export async function createSession(sessionId: string): Promise<boolean> {
|
||||
if (!bridge || !initialized) {
|
||||
// Attempt auto-reinit if bridge died.
|
||||
const ok = await initCodecBridge(logFn);
|
||||
if (!ok) return false;
|
||||
}
|
||||
try {
|
||||
await bridge!.sendCommand('create_session', { session_id: sessionId });
|
||||
return true;
|
||||
} catch (e: any) {
|
||||
logFn?.(`[codec] create_session error: ${e?.message || e}`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroy a codec session, freeing its encoder/decoder state.
|
||||
*/
|
||||
export async function destroySession(sessionId: string): Promise<void> {
|
||||
if (!bridge || !initialized) return;
|
||||
try {
|
||||
await bridge.sendCommand('destroy_session', { session_id: sessionId });
|
||||
} catch {
|
||||
// Best-effort cleanup.
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Transcoding
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Transcode an RTP payload between two codecs.
|
||||
* All codec work (Opus, G.722, PCMU, PCMA) + resampling happens in Rust.
|
||||
*
|
||||
* @param data - raw RTP payload (no header)
|
||||
* @param fromPT - source payload type (0=PCMU, 8=PCMA, 9=G.722, 111=Opus)
|
||||
* @param toPT - target payload type
|
||||
* @param sessionId - optional session for isolated codec state
|
||||
* @returns transcoded payload, or null on failure
|
||||
*/
|
||||
export async function transcode(data: Buffer, fromPT: number, toPT: number, sessionId?: string, direction?: string): Promise<Buffer | null> {
|
||||
if (!bridge || !initialized) return null;
|
||||
try {
|
||||
const params: any = {
|
||||
data_b64: data.toString('base64'),
|
||||
from_pt: fromPT,
|
||||
to_pt: toPT,
|
||||
};
|
||||
if (sessionId) params.session_id = sessionId;
|
||||
if (direction) params.direction = direction;
|
||||
const result = await bridge.sendCommand('transcode', params);
|
||||
return Buffer.from(result.data_b64, 'base64');
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode raw 16-bit PCM to a target codec.
|
||||
* @param pcmData - raw 16-bit LE PCM bytes
|
||||
* @param sampleRate - input sample rate (e.g. 22050 for Piper TTS)
|
||||
* @param toPT - target payload type (9=G.722, 111=Opus, 0=PCMU, 8=PCMA)
|
||||
* @param sessionId - optional session for isolated codec state
|
||||
*/
|
||||
export async function encodePcm(pcmData: Buffer, sampleRate: number, toPT: number, sessionId?: string): Promise<Buffer | null> {
|
||||
if (!bridge || !initialized) return null;
|
||||
try {
|
||||
const params: any = {
|
||||
data_b64: pcmData.toString('base64'),
|
||||
sample_rate: sampleRate,
|
||||
to_pt: toPT,
|
||||
};
|
||||
if (sessionId) params.session_id = sessionId;
|
||||
const result = await bridge.sendCommand('encode_pcm', params);
|
||||
return Buffer.from(result.data_b64, 'base64');
|
||||
} catch (e: any) {
|
||||
console.error('[encodePcm] error:', e?.message || e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/** Check if the codec bridge is ready. */
|
||||
export function isCodecReady(): boolean {
|
||||
return initialized && bridge !== null;
|
||||
}
|
||||
|
||||
/** Shut down the codec bridge. */
|
||||
export function shutdownCodecBridge(): void {
|
||||
if (bridge) {
|
||||
try { bridge.kill(); } catch { /* ignore */ }
|
||||
bridge = null;
|
||||
initialized = false;
|
||||
}
|
||||
}
|
||||
@@ -79,6 +79,10 @@ type TProxyCommands = {
|
||||
params: { call_id: string; leg_id: string; key: string; value: unknown };
|
||||
result: Record<string, never>;
|
||||
};
|
||||
generate_tts: {
|
||||
params: { model: string; voices: string; voice: string; text: string; output: string };
|
||||
result: { output: string };
|
||||
};
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -493,6 +497,15 @@ export function isProxyReady(): boolean {
|
||||
return initialized && bridge !== null;
|
||||
}
|
||||
|
||||
/** Send an arbitrary command to the proxy engine bridge. */
|
||||
export async function sendProxyCommand<K extends keyof TProxyCommands>(
|
||||
method: K,
|
||||
params: TProxyCommands[K]['params'],
|
||||
): Promise<TProxyCommands[K]['result']> {
|
||||
if (!bridge || !initialized) throw new Error('proxy engine not initialized');
|
||||
return bridge.sendCommand(method as string, params as any) as any;
|
||||
}
|
||||
|
||||
/** Shut down the proxy engine. */
|
||||
export function shutdownProxyEngine(): void {
|
||||
if (bridge) {
|
||||
|
||||
@@ -24,7 +24,6 @@ import {
|
||||
getAllBrowserDeviceIds,
|
||||
getBrowserDeviceWs,
|
||||
} from './webrtcbridge.ts';
|
||||
import { initCodecBridge } from './opusbridge.ts';
|
||||
import { initAnnouncement } from './announcement.ts';
|
||||
import { PromptCache } from './call/prompt-cache.ts';
|
||||
import { VoiceboxManager } from './voicebox.ts';
|
||||
@@ -426,9 +425,9 @@ async function startProxyEngine(): Promise<void> {
|
||||
id: data.leg_id,
|
||||
type: data.kind,
|
||||
state: data.state,
|
||||
codec: null,
|
||||
rtpPort: null,
|
||||
remoteMedia: null,
|
||||
codec: data.codec ?? null,
|
||||
rtpPort: data.rtpPort ?? null,
|
||||
remoteMedia: data.remoteMedia ?? null,
|
||||
metadata: data.metadata || {},
|
||||
});
|
||||
}
|
||||
@@ -523,9 +522,8 @@ async function startProxyEngine(): Promise<void> {
|
||||
const deviceList = appConfig.devices.map((d) => d.displayName).join(', ');
|
||||
log(`proxy engine started | LAN ${appConfig.proxy.lanIp}:${appConfig.proxy.lanPort} | providers: ${providerList} | devices: ${deviceList}`);
|
||||
|
||||
// Initialize audio codec bridge (still needed for WebRTC transcoding).
|
||||
// Generate TTS audio (WAV files on disk, played by Rust audio_player).
|
||||
try {
|
||||
await initCodecBridge(log);
|
||||
await initAnnouncement(log);
|
||||
|
||||
// Pre-generate prompts.
|
||||
@@ -547,7 +545,7 @@ async function startProxyEngine(): Promise<void> {
|
||||
}
|
||||
log(`[startup] prompts cached: ${promptCache.listIds().join(', ') || 'none'}`);
|
||||
} catch (e) {
|
||||
log(`[codec] init failed: ${e}`);
|
||||
log(`[tts] init failed: ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,6 @@
|
||||
*/
|
||||
export const commitinfo = {
|
||||
name: 'siprouter',
|
||||
version: '1.15.0',
|
||||
version: '1.17.1',
|
||||
description: 'undefined'
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user