/// Audio transcoding bridge for smartrust. /// /// Handles Opus ↔ G.722 ↔ PCMU transcoding for the SIP router. /// Uses audiopus (libopus) for Opus and ezk-g722 (SpanDSP port) for G.722. /// /// Supports per-session codec state so concurrent calls don't corrupt each /// other's stateful codecs (Opus, G.722 ADPCM). /// /// Protocol: /// -> {"id":"1","method":"init","params":{}} /// <- {"id":"1","success":true,"result":{}} /// -> {"id":"2","method":"create_session","params":{"session_id":"call-abc"}} /// <- {"id":"2","success":true,"result":{}} /// -> {"id":"3","method":"transcode","params":{"session_id":"call-abc","data_b64":"...","from_pt":111,"to_pt":9}} /// <- {"id":"3","success":true,"result":{"data_b64":"..."}} /// -> {"id":"4","method":"destroy_session","params":{"session_id":"call-abc"}} /// <- {"id":"4","success":true,"result":{}} use audiopus::coder::{Decoder as OpusDecoder, Encoder as OpusEncoder}; use audiopus::packet::Packet as OpusPacket; use audiopus::{Application, Bitrate as OpusBitrate, Channels, MutSignals, SampleRate}; use base64::engine::general_purpose::STANDARD as B64; use base64::Engine as _; use ezk_g722::libg722::{self, Bitrate}; use nnnoiseless::DenoiseState; use rubato::{FftFixedIn, Resampler}; use serde::Deserialize; use std::collections::HashMap; use std::io::{self, BufRead, Write}; // Payload type constants. const PT_PCMU: u8 = 0; const PT_PCMA: u8 = 8; const PT_G722: u8 = 9; const PT_OPUS: u8 = 111; #[derive(Deserialize)] struct Request { id: String, method: String, #[serde(default)] params: serde_json::Value, } fn respond(out: &mut impl Write, id: &str, success: bool, result: Option, error: Option<&str>) { let mut resp = serde_json::json!({ "id": id, "success": success }); if let Some(r) = result { resp["result"] = r; } if let Some(e) = error { resp["error"] = serde_json::Value::String(e.to_string()); } let _ = writeln!(out, "{}", resp); let _ = out.flush(); } // --------------------------------------------------------------------------- // Codec state // --------------------------------------------------------------------------- struct TranscodeState { opus_enc: OpusEncoder, opus_dec: OpusDecoder, g722_enc: libg722::encoder::Encoder, g722_dec: libg722::decoder::Decoder, // Cached FFT resamplers keyed by (from_rate, to_rate, chunk_size). resamplers: HashMap<(u32, u32, usize), FftFixedIn>, // Per-direction ML noise suppression (RNNoise). Separate state per direction // prevents the RNN hidden state from being corrupted by interleaved audio streams. denoiser_to_sip: Box>, denoiser_to_browser: Box>, } impl TranscodeState { fn new() -> Result { let mut opus_enc = OpusEncoder::new(SampleRate::Hz48000, Channels::Mono, Application::Voip) .map_err(|e| format!("opus encoder: {e}"))?; // Telephony-grade tuning: complexity 5 is sufficient for voice bridged to G.722. opus_enc.set_complexity(5).map_err(|e| format!("opus set_complexity: {e}"))?; opus_enc.set_bitrate(OpusBitrate::BitsPerSecond(24000)).map_err(|e| format!("opus set_bitrate: {e}"))?; let opus_dec = OpusDecoder::new(SampleRate::Hz48000, Channels::Mono) .map_err(|e| format!("opus decoder: {e}"))?; let g722_enc = libg722::encoder::Encoder::new(Bitrate::Mode1_64000, false, false); let g722_dec = libg722::decoder::Decoder::new(Bitrate::Mode1_64000, false, false); Ok(Self { opus_enc, opus_dec, g722_enc, g722_dec, resamplers: HashMap::new(), denoiser_to_sip: DenoiseState::new(), denoiser_to_browser: DenoiseState::new(), }) } /// High-quality sample rate conversion using rubato FFT resampler. /// Resamplers are cached by (from_rate, to_rate, chunk_size) and reused, /// maintaining proper inter-frame state for continuous audio streams. fn resample(&mut self, pcm: &[i16], from_rate: u32, to_rate: u32) -> Result, String> { if from_rate == to_rate || pcm.is_empty() { return Ok(pcm.to_vec()); } let chunk = pcm.len(); let key = (from_rate, to_rate, chunk); // Get or create cached resampler for this rate pair + chunk size. if !self.resamplers.contains_key(&key) { let r = FftFixedIn::::new(from_rate as usize, to_rate as usize, chunk, 1, 1) .map_err(|e| format!("resampler {from_rate}->{to_rate}: {e}"))?; self.resamplers.insert(key, r); } let resampler = self.resamplers.get_mut(&key).unwrap(); // i16 → f64 normalized to [-1.0, 1.0] let float_in: Vec = pcm.iter().map(|&s| s as f64 / 32768.0).collect(); let input = vec![float_in]; let result = resampler.process(&input, None) .map_err(|e| format!("resample {from_rate}->{to_rate}: {e}"))?; // f64 → i16 Ok(result[0].iter() .map(|&s| (s * 32767.0).round().clamp(-32768.0, 32767.0) as i16) .collect()) } /// Apply RNNoise ML noise suppression to 48kHz PCM audio. /// Processes in 480-sample (10ms) frames. State persists across calls. fn denoise(denoiser: &mut DenoiseState, pcm: &[i16]) -> Vec { let frame_size = DenoiseState::FRAME_SIZE; // 480 let total = pcm.len(); // Round down to whole frames — don't process partial frames to avoid // injecting artificial silence into the RNN state. let whole = (total / frame_size) * frame_size; let mut output = Vec::with_capacity(total); let mut out_buf = [0.0f32; 480]; for offset in (0..whole).step_by(frame_size) { let input: Vec = pcm[offset..offset + frame_size] .iter().map(|&s| s as f32).collect(); denoiser.process_frame(&mut out_buf, &input); output.extend(out_buf.iter() .map(|&s| s.round().clamp(-32768.0, 32767.0) as i16)); } // Pass through any trailing partial-frame samples unmodified. if whole < total { output.extend_from_slice(&pcm[whole..]); } output } /// Transcode audio payload from one codec to another. /// `direction`: "to_sip" or "to_browser" — selects the per-direction denoiser. /// If None, denoising is skipped (backward compat). fn transcode(&mut self, data: &[u8], from_pt: u8, to_pt: u8, direction: Option<&str>) -> Result, String> { if from_pt == to_pt { return Ok(data.to_vec()); } // Decode to PCM (at source sample rate). let (pcm, rate) = self.decode_to_pcm(data, from_pt)?; // Apply noise suppression if direction is specified. let processed = if let Some(dir) = direction { // Resample to 48kHz for denoising (no-op when already 48kHz). let pcm_48k = self.resample(&pcm, rate, 48000)?; let denoiser = match dir { "to_sip" => &mut self.denoiser_to_sip, _ => &mut self.denoiser_to_browser, }; let denoised = Self::denoise(denoiser, &pcm_48k); // Resample to target rate (no-op when target is 48kHz). let target_rate = codec_sample_rate(to_pt); self.resample(&denoised, 48000, target_rate)? } else { // No denoising — direct resample. let target_rate = codec_sample_rate(to_pt); if rate == target_rate { pcm } else { self.resample(&pcm, rate, target_rate)? } }; // Encode from PCM. self.encode_from_pcm(&processed, to_pt) } fn decode_to_pcm(&mut self, data: &[u8], pt: u8) -> Result<(Vec, u32), String> { match pt { PT_OPUS => { let mut pcm = vec![0i16; 5760]; // up to 120ms at 48kHz (RFC 6716 max) let packet = OpusPacket::try_from(data) .map_err(|e| format!("opus packet: {e}"))?; let out = MutSignals::try_from(&mut pcm[..]) .map_err(|e| format!("opus signals: {e}"))?; let n: usize = self.opus_dec.decode(Some(packet), out, false) .map_err(|e| format!("opus decode: {e}"))?.into(); pcm.truncate(n); Ok((pcm, 48000)) } PT_G722 => { let pcm = self.g722_dec.decode(data); Ok((pcm, 16000)) } PT_PCMU => { let pcm: Vec = data.iter().map(|&b| mulaw_decode(b)).collect(); Ok((pcm, 8000)) } PT_PCMA => { let pcm: Vec = data.iter().map(|&b| alaw_decode(b)).collect(); Ok((pcm, 8000)) } _ => Err(format!("unsupported source PT {pt}")), } } fn encode_from_pcm(&mut self, pcm: &[i16], pt: u8) -> Result, String> { match pt { PT_OPUS => { let mut buf = vec![0u8; 4000]; let n: usize = self.opus_enc.encode(pcm, &mut buf) .map_err(|e| format!("opus encode: {e}"))?.into(); buf.truncate(n); Ok(buf) } PT_G722 => { Ok(self.g722_enc.encode(pcm)) } PT_PCMU => { Ok(pcm.iter().map(|&s| mulaw_encode(s)).collect()) } PT_PCMA => { Ok(pcm.iter().map(|&s| alaw_encode(s)).collect()) } _ => Err(format!("unsupported target PT {pt}")), } } } fn codec_sample_rate(pt: u8) -> u32 { match pt { PT_OPUS => 48000, PT_G722 => 16000, _ => 8000, // PCMU, PCMA } } // --------------------------------------------------------------------------- // G.711 µ-law (PCMU) // --------------------------------------------------------------------------- fn mulaw_encode(sample: i16) -> u8 { const BIAS: i16 = 0x84; const CLIP: i16 = 32635; let sign = if sample < 0 { 0x80u8 } else { 0 }; // Use i32 to avoid overflow when sample == i16::MIN (-32768). let mut s = (sample as i32).unsigned_abs().min(CLIP as u32) as i16; s += BIAS; let mut exp = 7u8; let mut mask = 0x4000i16; while exp > 0 && (s & mask) == 0 { exp -= 1; mask >>= 1; } let mantissa = ((s >> (exp + 3)) & 0x0f) as u8; !(sign | (exp << 4) | mantissa) } fn mulaw_decode(mulaw: u8) -> i16 { let v = !mulaw; let sign = v & 0x80; let exp = (v >> 4) & 0x07; let mantissa = v & 0x0f; let mut sample = (((mantissa as i16) << 4) + 0x84) << exp; sample -= 0x84; if sign != 0 { -sample } else { sample } } // --------------------------------------------------------------------------- // G.711 A-law (PCMA) // --------------------------------------------------------------------------- fn alaw_encode(sample: i16) -> u8 { let sign = if sample >= 0 { 0x80u8 } else { 0 }; // Use i32 to avoid overflow when sample == i16::MIN (-32768). let s = (sample as i32).unsigned_abs().min(32767) as i16; let mut exp = 7u8; let mut mask = 0x4000i16; while exp > 0 && (s & mask) == 0 { exp -= 1; mask >>= 1; } let mantissa = if exp > 0 { ((s >> (exp + 3)) & 0x0f) as u8 } else { ((s >> 4) & 0x0f) as u8 }; (sign | (exp << 4) | mantissa) ^ 0x55 } fn alaw_decode(alaw: u8) -> i16 { let v = alaw ^ 0x55; let sign = v & 0x80; let exp = (v >> 4) & 0x07; let mantissa = v & 0x0f; let sample = if exp == 0 { ((mantissa as i16) << 4) + 8 } else { (((mantissa as i16) << 4) + 0x108) << (exp - 1) }; if sign != 0 { sample } else { -sample } } // --------------------------------------------------------------------------- // Main loop // --------------------------------------------------------------------------- /// Resolve a session: if session_id is provided, look it up in the sessions map; /// otherwise fall back to the default state (backward compat with `init`). fn get_session<'a>( sessions: &'a mut HashMap, default: &'a mut Option, params: &serde_json::Value, ) -> Option<&'a mut TranscodeState> { if let Some(sid) = params.get("session_id").and_then(|v| v.as_str()) { sessions.get_mut(sid) } else { default.as_mut() } } fn main() { let stdin = io::stdin(); let stdout = io::stdout(); let mut out = io::BufWriter::new(stdout.lock()); let _ = writeln!(out, r#"{{"event":"ready","data":{{}}}}"#); let _ = out.flush(); // Default state for backward-compat `init` (no session_id). let mut default_state: Option = None; // Per-session codec state for concurrent call isolation. let mut sessions: HashMap = HashMap::new(); for line in stdin.lock().lines() { let line = match line { Ok(l) if !l.trim().is_empty() => l, Ok(_) => continue, Err(_) => break, }; let req: Request = match serde_json::from_str(&line) { Ok(r) => r, Err(e) => { respond(&mut out, "", false, None, Some(&format!("parse: {e}"))); continue; } }; match req.method.as_str() { // Backward-compat: init the default (shared) session. "init" => { match TranscodeState::new() { Ok(s) => { default_state = Some(s); respond(&mut out, &req.id, true, Some(serde_json::json!({})), None); } Err(e) => respond(&mut out, &req.id, false, None, Some(&e)), } } // Create an isolated session with its own codec state. "create_session" => { let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s.to_string(), None => { respond(&mut out, &req.id, false, None, Some("missing session_id")); continue; } }; if sessions.contains_key(&session_id) { respond(&mut out, &req.id, true, Some(serde_json::json!({})), None); continue; } match TranscodeState::new() { Ok(s) => { sessions.insert(session_id, s); respond(&mut out, &req.id, true, Some(serde_json::json!({})), None); } Err(e) => respond(&mut out, &req.id, false, None, Some(&e)), } } // Destroy a session, freeing its codec state. "destroy_session" => { let session_id = match req.params.get("session_id").and_then(|v| v.as_str()) { Some(s) => s, None => { respond(&mut out, &req.id, false, None, Some("missing session_id")); continue; } }; sessions.remove(session_id); respond(&mut out, &req.id, true, Some(serde_json::json!({})), None); } // Transcode: uses session_id if provided, else default state. "transcode" => { let st = match get_session(&mut sessions, &mut default_state, &req.params) { Some(s) => s, None => { respond(&mut out, &req.id, false, None, Some("not initialized (no session or default state)")); continue; } }; let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) { Some(s) => s, None => { respond(&mut out, &req.id, false, None, Some("missing data_b64")); continue; } }; let from_pt = req.params.get("from_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8; let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(0) as u8; let direction = req.params.get("direction").and_then(|v| v.as_str()); let data = match B64.decode(data_b64) { Ok(b) => b, Err(e) => { respond(&mut out, &req.id, false, None, Some(&format!("b64: {e}"))); continue; } }; match st.transcode(&data, from_pt, to_pt, direction) { Ok(result) => { respond(&mut out, &req.id, true, Some(serde_json::json!({ "data_b64": B64.encode(&result) })), None); } Err(e) => respond(&mut out, &req.id, false, None, Some(&e)), } } // Encode raw 16-bit PCM to a target codec. // Params: data_b64 (raw PCM bytes, 16-bit LE), sample_rate (input Hz), to_pt // Optional: session_id for isolated codec state. "encode_pcm" => { let st = match get_session(&mut sessions, &mut default_state, &req.params) { Some(s) => s, None => { respond(&mut out, &req.id, false, None, Some("not initialized (no session or default state)")); continue; } }; let data_b64 = match req.params.get("data_b64").and_then(|v| v.as_str()) { Some(s) => s, None => { respond(&mut out, &req.id, false, None, Some("missing data_b64")); continue; } }; let sample_rate = req.params.get("sample_rate").and_then(|v| v.as_u64()).unwrap_or(22050) as u32; let to_pt = req.params.get("to_pt").and_then(|v| v.as_u64()).unwrap_or(9) as u8; let data = match B64.decode(data_b64) { Ok(b) => b, Err(e) => { respond(&mut out, &req.id, false, None, Some(&format!("b64: {e}"))); continue; } }; if data.len() % 2 != 0 { respond(&mut out, &req.id, false, None, Some("PCM data has odd byte count (expected 16-bit LE samples)")); continue; } // Convert raw bytes to i16 samples. let pcm: Vec = data.chunks_exact(2) .map(|c| i16::from_le_bytes([c[0], c[1]])) .collect(); // Resample to target codec's sample rate. let target_rate = codec_sample_rate(to_pt); let resampled = match st.resample(&pcm, sample_rate, target_rate) { Ok(r) => r, Err(e) => { respond(&mut out, &req.id, false, None, Some(&e)); continue; } }; // Encode to target codec (reuse encode_from_pcm). match st.encode_from_pcm(&resampled, to_pt) { Ok(encoded) => { respond(&mut out, &req.id, true, Some(serde_json::json!({ "data_b64": B64.encode(&encoded) })), None); } Err(e) => { respond(&mut out, &req.id, false, None, Some(&e)); continue; } } } // Legacy commands (kept for backward compat). "encode" | "decode" => { respond(&mut out, &req.id, false, None, Some("use 'transcode' command instead")); } _ => respond(&mut out, &req.id, false, None, Some(&format!("unknown: {}", req.method))), } } }