//! Tool leg consumers — background tasks that process per-source unmerged audio. //! //! Tool legs are observer legs that receive individual audio streams from each //! participant in a call. The mixer pipes `ToolAudioBatch` every 20ms containing //! each participant's decoded PCM@16kHz tagged with source leg ID. //! //! Consumers: //! - **Recording**: writes per-source WAV files for speaker-separated recording. //! - **Transcription**: stub for future Whisper integration (accumulates audio in Rust). use crate::ipc::{emit_event, OutTx}; use crate::mixer::ToolAudioBatch; use crate::recorder::Recorder; use std::collections::HashMap; use tokio::sync::mpsc; use tokio::task::JoinHandle; // --------------------------------------------------------------------------- // Recording consumer // --------------------------------------------------------------------------- /// Spawn a recording tool leg that writes per-source WAV files. /// /// Returns the channel sender (for the mixer to send batches) and the task handle. /// When the channel is closed (tool leg removed), all WAV files are finalized /// and a `tool_recording_done` event is emitted. pub fn spawn_recording_tool( tool_leg_id: String, call_id: String, base_dir: String, out_tx: OutTx, ) -> (mpsc::Sender, JoinHandle<()>) { let (tx, mut rx) = mpsc::channel::(64); let handle = tokio::spawn(async move { let mut recorders: HashMap = HashMap::new(); while let Some(batch) = rx.recv().await { for source in &batch.sources { // Skip silence-only frames (all zeros = no audio activity). let has_audio = source.pcm_16k.iter().any(|&s| s != 0); if !has_audio && !recorders.contains_key(&source.leg_id) { continue; // Don't create a file for silence-only sources. } let recorder = recorders.entry(source.leg_id.clone()).or_insert_with(|| { let path = format!("{}/{}-{}.wav", base_dir, call_id, source.leg_id); Recorder::new_pcm(&path, 16000, None).unwrap_or_else(|e| { panic!("failed to create recorder for {}: {e}", source.leg_id); }) }); if !recorder.write_pcm(&source.pcm_16k) { // Max duration reached — stop recording this source. break; } } } // Channel closed — finalize all recordings. let mut files = Vec::new(); for (leg_id, rec) in recorders { let result = rec.stop(); files.push(serde_json::json!({ "source_leg_id": leg_id, "file_path": result.file_path, "duration_ms": result.duration_ms, })); } emit_event( &out_tx, "tool_recording_done", serde_json::json!({ "call_id": call_id, "tool_leg_id": tool_leg_id, "files": files, }), ); }); (tx, handle) } // --------------------------------------------------------------------------- // Transcription consumer (stub — real plumbing, stub consumer) // --------------------------------------------------------------------------- /// Spawn a transcription tool leg. /// /// The plumbing is fully real: it receives per-source unmerged PCM@16kHz from /// the mixer every 20ms. The consumer is a stub that accumulates audio and /// reports metadata on close. Future: will stream to a Whisper HTTP endpoint. pub fn spawn_transcription_tool( tool_leg_id: String, call_id: String, out_tx: OutTx, ) -> (mpsc::Sender, JoinHandle<()>) { let (tx, mut rx) = mpsc::channel::(64); let handle = tokio::spawn(async move { // Track per-source sample counts for duration reporting. let mut source_samples: HashMap = HashMap::new(); while let Some(batch) = rx.recv().await { for source in &batch.sources { *source_samples.entry(source.leg_id.clone()).or_insert(0) += source.pcm_16k.len() as u64; // TODO: Future — accumulate chunks and stream to Whisper endpoint. // For now, the audio is received and counted but not processed. } } // Channel closed — report metadata. let sources: Vec = source_samples .iter() .map(|(leg_id, samples)| { serde_json::json!({ "source_leg_id": leg_id, "duration_ms": (samples * 1000) / 16000, }) }) .collect(); emit_event( &out_tx, "tool_transcription_done", serde_json::json!({ "call_id": call_id, "tool_leg_id": tool_leg_id, "sources": sources, }), ); }); (tx, handle) }