From 084c41ef783c6088b579808d61e98bb70363646a Mon Sep 17 00:00:00 2001 From: David Kleingeld Date: Thu, 25 Sep 2025 20:41:59 +0200 Subject: [PATCH 1/5] adds less efficient microphone variant that is send `cpal::Stream` is not send on Mac. I need microphone to be send. This 'solves' the non-send stream by parking it on a separate thread. --- src/microphone.rs | 91 +++++++++++++----------- src/microphone/builder.rs | 25 ++++++- src/microphone/sendable.rs | 137 +++++++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 41 deletions(-) create mode 100644 src/microphone/sendable.rs diff --git a/src/microphone.rs b/src/microphone.rs index eb98985d..db6c5193 100644 --- a/src/microphone.rs +++ b/src/microphone.rs @@ -109,6 +109,7 @@ use crate::{Sample, Source}; mod builder; mod config; +pub mod sendable; pub use builder::MicrophoneBuilder; pub use config::InputConfig; use cpal::I24; @@ -116,7 +117,7 @@ use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, Device, }; -use rtrb::RingBuffer; +use rtrb::{Producer, RingBuffer}; /// Error that can occur when we can not list the input devices #[derive(Debug, thiserror::Error, Clone)] @@ -228,10 +229,9 @@ impl Microphone { config: InputConfig, mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static, ) -> Result { - let timeout = Some(Duration::from_millis(100)); let hundred_ms_of_samples = config.channel_count.get() as u32 * config.sample_rate.get() / 10; - let (mut tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); + let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); let error_occurred = Arc::new(AtomicBool::new(false)); let error_callback = { let error_occurred = error_occurred.clone(); @@ -241,43 +241,7 @@ impl Microphone { } }; - macro_rules! build_input_streams { - ($($sample_format:tt, $generic:ty);+) => { - match config.sample_format { - $( - cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>( - &config.stream_config(), - move |data, _info| { - for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) { - let _skip_if_player_is_behind = tx.push(sample); - } - }, - error_callback, - timeout, - ), - )+ - _ => return Err(OpenError::UnsupportedSampleFormat), - } - }; - } - - let stream = build_input_streams!( - F32, f32; - F64, f64; - I8, i8; - I16, i16; - I24, I24; - I32, i32; - I64, i64; - U8, u8; - U16, u16; - // TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged - // U24, cpal::U24; - U32, u32; - U64, u64 - ) - .map_err(OpenError::BuildStream)?; - stream.play().map_err(OpenError::Play)?; + let stream = open_input_stream(device, config, tx, error_callback)?; Ok(Microphone { _stream_handle: stream, @@ -309,3 +273,50 @@ impl Microphone { &self.config } } + +fn open_input_stream( + device: Device, + config: InputConfig, + mut tx: Producer, + error_callback: impl FnMut(cpal::StreamError) + Send + 'static, +) -> Result { + let timeout = Some(Duration::from_millis(100)); + + macro_rules! build_input_streams { + ($($sample_format:tt, $generic:ty);+) => { + match config.sample_format { + $( + cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>( + &config.stream_config(), + move |data, _info| { + for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) { + let _skip_if_player_is_behind = tx.push(sample); + } + }, + error_callback, + timeout, + ), + )+ + _ => return Err(OpenError::UnsupportedSampleFormat), + } + }; + } + let stream = build_input_streams!( + F32, f32; + F64, f64; + I8, i8; + I16, i16; + I24, I24; + I32, i32; + I64, i64; + U8, u8; + U16, u16; + // TODO: uncomment when https://github.com/RustAudio/cpal/pull/1011 is merged + // U24, cpal::U24; + U32, u32; + U64, u64 + ) + .map_err(OpenError::BuildStream)?; + stream.play().map_err(OpenError::Play)?; + Ok(stream) +} diff --git a/src/microphone/builder.rs b/src/microphone/builder.rs index deac72da..1cf8c6dd 100644 --- a/src/microphone/builder.rs +++ b/src/microphone/builder.rs @@ -6,7 +6,9 @@ use cpal::{ }; use crate::{ - common::assert_error_traits, microphone::config::InputConfig, ChannelCount, SampleRate, + common::assert_error_traits, + microphone::{config::InputConfig, sendable}, + ChannelCount, SampleRate, }; use super::Microphone; @@ -542,4 +544,25 @@ where self.error_callback.clone(), ) } + /// Opens the microphone input stream. + /// + /// # Example + /// ```no_run + /// # use rodio::microphone::MicrophoneBuilder; + /// # use rodio::Source; + /// # use std::time::Duration; + /// let mic = MicrophoneBuilder::new() + /// .default_device()? + /// .default_config()? + /// .open_stream()?; + /// let recording = mic.take_duration(Duration::from_secs(3)).record(); + /// # Ok::<(), Box>(()) + /// ``` + pub fn open_sendable_stream(&self) -> Result { + sendable::Microphone::open( + self.device.as_ref().expect("DeviceIsSet").0.clone(), + *self.config.as_ref().expect("ConfigIsSet"), + self.error_callback.clone(), + ) + } } diff --git a/src/microphone/sendable.rs b/src/microphone/sendable.rs new file mode 100644 index 00000000..99a0b512 --- /dev/null +++ b/src/microphone/sendable.rs @@ -0,0 +1,137 @@ +//! Slightly less efficient microphone that multiple sources can draw from +//! think of it as an inverse mixer. + +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, + }, + thread::{self, JoinHandle}, + time::Duration, +}; + +use cpal::Device; +use rtrb::RingBuffer; + +use crate::{microphone::open_input_stream, Source}; +use crate::{ + microphone::{InputConfig, OpenError}, + Sample, +}; + +/// Send on all platforms +pub struct Microphone { + _stream_thread: JoinHandle<()>, + buffer: rtrb::Consumer, + config: InputConfig, + poll_interval: Duration, + error_occurred: Arc, + _drop_tx: mpsc::Sender<()>, +} + +impl Microphone { + pub(crate) fn open( + device: Device, + config: InputConfig, + mut error_callback: impl FnMut(cpal::StreamError) + Send + 'static, + ) -> Result { + let hundred_ms_of_samples = + config.channel_count.get() as u32 * config.sample_rate.get() / 10; + let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); + let error_occurred = Arc::new(AtomicBool::new(false)); + let error_callback = { + let error_occurred = error_occurred.clone(); + move |source| { + error_occurred.store(true, Ordering::Relaxed); + error_callback(source); + } + }; + + let (res_tx, res_rx) = mpsc::channel(); + let (_drop_tx, drop_rx) = mpsc::channel::<()>(); + let _stream_thread = thread::Builder::new() + .name("Rodio cloneable microphone".to_string()) + .spawn(move || { + if let Err(e) = open_input_stream(device, config, tx, error_callback) { + let _ = res_tx.send(Err(e)); + } else { + let _ = res_tx.send(Ok(())); + }; + + let _should_drop = drop_rx.recv(); + }) + .expect("Should be able to spawn threads"); + + res_rx + .recv() + .expect("input stream thread should never panic")?; + + Ok(Microphone { + _stream_thread, + _drop_tx, + buffer: rx, + config, + poll_interval: Duration::from_millis(5), + error_occurred, + }) + } + + /// Get the configuration. + /// + /// # Example + /// Print the sample rate and channel count. + /// ```no_run + /// # use rodio::microphone::MicrophoneBuilder; + /// # fn main() -> Result<(), Box> { + /// let mic = MicrophoneBuilder::new() + /// .default_device()? + /// .default_config()? + /// .open_stream()?; + /// let config = mic.config(); + /// println!("Sample rate: {}", config.sample_rate.get()); + /// println!("Channel count: {}", config.channel_count.get()); + /// # Ok(()) + /// # } + /// ``` + pub fn config(&self) -> &InputConfig { + &self.config + } +} + +impl Source for Microphone { + fn current_span_len(&self) -> Option { + None + } + + fn channels(&self) -> crate::ChannelCount { + self.config.channel_count + } + + fn sample_rate(&self) -> crate::SampleRate { + self.config.sample_rate + } + + fn total_duration(&self) -> Option { + None + } +} + +impl Iterator for Microphone { + type Item = f32; + + fn next(&mut self) -> Option { + loop { + if let Ok(sample) = self.buffer.pop() { + return Some(sample); + } else if self.error_occurred.load(Ordering::Relaxed) { + return None; + } else { + thread::sleep(self.poll_interval) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.buffer.slots(), None) + } +} From 156888b7eb0f06536cb575021d1ab5e1c09db23d Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 27 Sep 2025 22:50:36 +0200 Subject: [PATCH 2/5] refactor: eliminate polling in Microphone by using Condvar notifications Block iterator until new audio data arrives, reducing CPU usage and jitter. Notify waiting threads from the audio callback when new data is available. --- src/microphone.rs | 48 ++++++++++++++++++++++++++++++++------ src/microphone/sendable.rs | 26 ++++++++++++++++----- 2 files changed, 61 insertions(+), 13 deletions(-) diff --git a/src/microphone.rs b/src/microphone.rs index db6c5193..9c360970 100644 --- a/src/microphone.rs +++ b/src/microphone.rs @@ -100,8 +100,8 @@ use core::fmt; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::{thread, time::Duration}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::Duration; use crate::common::assert_error_traits; use crate::conversions::SampleTypeConverter; @@ -166,8 +166,8 @@ pub struct Microphone { _stream_handle: cpal::Stream, buffer: rtrb::Consumer, config: InputConfig, - poll_interval: Duration, error_occurred: Arc, + data_signal: Arc<(Mutex<()>, Condvar)>, } impl Source for Microphone { @@ -198,7 +198,11 @@ impl Iterator for Microphone { } else if self.error_occurred.load(Ordering::Relaxed) { return None; } else { - thread::sleep(self.poll_interval) + // Block until notified instead of sleeping. This eliminates polling overhead and + // reduces jitter by avoiding unnecessary wakeups when no audio data is available. + let (lock, cvar) = &*self.data_signal; + let guard = lock.lock().unwrap(); + let _guard = cvar.wait(guard).unwrap(); } } } @@ -231,24 +235,32 @@ impl Microphone { ) -> Result { let hundred_ms_of_samples = config.channel_count.get() as u32 * config.sample_rate.get() / 10; + // Using rtrb (real-time ring buffer) instead of std::sync::mpsc or the ringbuf crate for + // audio performance. While ringbuf has Send variants that could eliminate the need for + // separate sendable/non-sendable microphone implementations, rtrb has been benchmarked to + // be significantly faster in throughput and provides lower latency operations. let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); let error_occurred = Arc::new(AtomicBool::new(false)); + let data_signal = Arc::new((Mutex::new(()), Condvar::new())); let error_callback = { let error_occurred = error_occurred.clone(); + let data_signal = data_signal.clone(); move |source| { error_occurred.store(true, Ordering::Relaxed); + let (_lock, cvar) = &*data_signal; + cvar.notify_one(); error_callback(source); } }; - let stream = open_input_stream(device, config, tx, error_callback)?; + let stream = open_input_stream(device, config, tx, error_callback, data_signal.clone())?; Ok(Microphone { _stream_handle: stream, buffer: rx, config, - poll_interval: Duration::from_millis(5), error_occurred, + data_signal, }) } @@ -279,9 +291,18 @@ fn open_input_stream( config: InputConfig, mut tx: Producer, error_callback: impl FnMut(cpal::StreamError) + Send + 'static, + data_signal: Arc<(Mutex<()>, Condvar)>, ) -> Result { let timeout = Some(Duration::from_millis(100)); + // Use the actual CPAL buffer size to determine when to notify. This aligns notifications with + // CPAL's natural period boundaries. + let period_samples = match config.buffer_size { + cpal::BufferSize::Fixed(frames) => frames as usize * config.channel_count.get() as usize, + cpal::BufferSize::Default => 1, // Always notify immediately when buffer size is unknown + }; + let mut samples_since_notification = 0; + macro_rules! build_input_streams { ($($sample_format:tt, $generic:ty);+) => { match config.sample_format { @@ -290,7 +311,20 @@ fn open_input_stream( &config.stream_config(), move |data, _info| { for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) { - let _skip_if_player_is_behind = tx.push(sample); + if tx.push(sample).is_ok() { + samples_since_notification += 1; + + // Notify when we've accumulated enough samples to represent one + // CPAL buffer period. + if samples_since_notification >= period_samples { + let (_lock, cvar) = &*data_signal; + cvar.notify_one(); + + // This keeps any "remainder" samples that didn't fit into a + // complete period. + samples_since_notification %= period_samples; + } + } } }, error_callback, diff --git a/src/microphone/sendable.rs b/src/microphone/sendable.rs index 99a0b512..d0b6be8b 100644 --- a/src/microphone/sendable.rs +++ b/src/microphone/sendable.rs @@ -4,10 +4,9 @@ use std::{ sync::{ atomic::{AtomicBool, Ordering}, - mpsc, Arc, + mpsc, Arc, Condvar, Mutex, }, thread::{self, JoinHandle}, - time::Duration, }; use cpal::Device; @@ -24,8 +23,8 @@ pub struct Microphone { _stream_thread: JoinHandle<()>, buffer: rtrb::Consumer, config: InputConfig, - poll_interval: Duration, error_occurred: Arc, + data_signal: Arc<(Mutex<()>, Condvar)>, _drop_tx: mpsc::Sender<()>, } @@ -37,22 +36,33 @@ impl Microphone { ) -> Result { let hundred_ms_of_samples = config.channel_count.get() as u32 * config.sample_rate.get() / 10; + // Using rtrb (real-time ring buffer) instead of std::sync::mpsc or the ringbuf crate for + // audio performance. While ringbuf has Send variants that could eliminate the need for + // separate sendable/non-sendable microphone implementations, rtrb has been benchmarked to + // be significantly faster in throughput and provides lower latency operations. let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); let error_occurred = Arc::new(AtomicBool::new(false)); + let data_signal = Arc::new((Mutex::new(()), Condvar::new())); let error_callback = { let error_occurred = error_occurred.clone(); + let data_signal = data_signal.clone(); move |source| { error_occurred.store(true, Ordering::Relaxed); + let (_lock, cvar) = &*data_signal; + cvar.notify_one(); error_callback(source); } }; let (res_tx, res_rx) = mpsc::channel(); let (_drop_tx, drop_rx) = mpsc::channel::<()>(); + let data_signal_clone = data_signal.clone(); let _stream_thread = thread::Builder::new() .name("Rodio cloneable microphone".to_string()) .spawn(move || { - if let Err(e) = open_input_stream(device, config, tx, error_callback) { + if let Err(e) = + open_input_stream(device, config, tx, error_callback, data_signal_clone) + { let _ = res_tx.send(Err(e)); } else { let _ = res_tx.send(Ok(())); @@ -71,8 +81,8 @@ impl Microphone { _drop_tx, buffer: rx, config, - poll_interval: Duration::from_millis(5), error_occurred, + data_signal, }) } @@ -126,7 +136,11 @@ impl Iterator for Microphone { } else if self.error_occurred.load(Ordering::Relaxed) { return None; } else { - thread::sleep(self.poll_interval) + // Block until notified instead of sleeping. This eliminates polling overhead and + // reduces jitter by avoiding unnecessary wakeups when no audio data is available. + let (lock, cvar) = &*self.data_signal; + let guard = lock.lock().unwrap(); + let _guard = cvar.wait(guard).unwrap(); } } } From d09cb85ac6a78f719cad0f77fe5069456303fba9 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 27 Sep 2025 23:07:33 +0200 Subject: [PATCH 3/5] fix: keep stream alive --- src/microphone/sendable.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/microphone/sendable.rs b/src/microphone/sendable.rs index d0b6be8b..d012c020 100644 --- a/src/microphone/sendable.rs +++ b/src/microphone/sendable.rs @@ -60,15 +60,16 @@ impl Microphone { let _stream_thread = thread::Builder::new() .name("Rodio cloneable microphone".to_string()) .spawn(move || { - if let Err(e) = - open_input_stream(device, config, tx, error_callback, data_signal_clone) - { - let _ = res_tx.send(Err(e)); - } else { - let _ = res_tx.send(Ok(())); - }; - - let _should_drop = drop_rx.recv(); + match open_input_stream(device, config, tx, error_callback, data_signal_clone) { + Err(e) => { + let _ = res_tx.send(Err(e)); + } + Ok(_) => { + let _ = res_tx.send(Ok(())); + // Keep the stream alive until we're told to drop + let _should_drop = drop_rx.recv(); + } + } }) .expect("Should be able to spawn threads"); From 44d2cb128b5ba4b00ee81203f54e6da018147f76 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sun, 28 Sep 2025 10:22:41 +0200 Subject: [PATCH 4/5] refactor: notify on each input callback if samples were pushed This simplifies notification logic and avoids issues when the ring buffer is smaller than the CPAL period or partially full. --- src/microphone.rs | 31 +++++++++++-------------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/src/microphone.rs b/src/microphone.rs index 9c360970..039e3570 100644 --- a/src/microphone.rs +++ b/src/microphone.rs @@ -295,14 +295,6 @@ fn open_input_stream( ) -> Result { let timeout = Some(Duration::from_millis(100)); - // Use the actual CPAL buffer size to determine when to notify. This aligns notifications with - // CPAL's natural period boundaries. - let period_samples = match config.buffer_size { - cpal::BufferSize::Fixed(frames) => frames as usize * config.channel_count.get() as usize, - cpal::BufferSize::Default => 1, // Always notify immediately when buffer size is unknown - }; - let mut samples_since_notification = 0; - macro_rules! build_input_streams { ($($sample_format:tt, $generic:ty);+) => { match config.sample_format { @@ -310,22 +302,21 @@ fn open_input_stream( cpal::SampleFormat::$sample_format => device.build_input_stream::<$generic, _, _>( &config.stream_config(), move |data, _info| { + let mut pushed_any = false; for sample in SampleTypeConverter::<_, f32>::new(data.into_iter().copied()) { if tx.push(sample).is_ok() { - samples_since_notification += 1; - - // Notify when we've accumulated enough samples to represent one - // CPAL buffer period. - if samples_since_notification >= period_samples { - let (_lock, cvar) = &*data_signal; - cvar.notify_one(); - - // This keeps any "remainder" samples that didn't fit into a - // complete period. - samples_since_notification %= period_samples; - } + pushed_any = true; } } + + // Notify once per CPAL callback if we pushed any samples. + // This avoids complex sample counting that can get stuck when the ring + // buffer is smaller than the CPAL period, or when the buffer is partially + // full. Each callback represents one input period anyway. + if pushed_any { + let (_lock, cvar) = &*data_signal; + cvar.notify_one(); + } }, error_callback, timeout, From 1763b826f71ff050b11bf82847cc572e8f4291a8 Mon Sep 17 00:00:00 2001 From: dvdsk Date: Sun, 28 Sep 2025 14:29:39 +0200 Subject: [PATCH 5/5] clearify some coments/strings --- src/microphone.rs | 5 +---- src/microphone/sendable.rs | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/microphone.rs b/src/microphone.rs index 039e3570..e71ffe59 100644 --- a/src/microphone.rs +++ b/src/microphone.rs @@ -235,10 +235,7 @@ impl Microphone { ) -> Result { let hundred_ms_of_samples = config.channel_count.get() as u32 * config.sample_rate.get() / 10; - // Using rtrb (real-time ring buffer) instead of std::sync::mpsc or the ringbuf crate for - // audio performance. While ringbuf has Send variants that could eliminate the need for - // separate sendable/non-sendable microphone implementations, rtrb has been benchmarked to - // be significantly faster in throughput and provides lower latency operations. + // rtrb is faster then all other (ring)buffers: https://github.com/mgeier/rtrb/issues/39 let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); let error_occurred = Arc::new(AtomicBool::new(false)); let data_signal = Arc::new((Mutex::new(()), Condvar::new())); diff --git a/src/microphone/sendable.rs b/src/microphone/sendable.rs index d012c020..34bb06e6 100644 --- a/src/microphone/sendable.rs +++ b/src/microphone/sendable.rs @@ -1,5 +1,7 @@ -//! Slightly less efficient microphone that multiple sources can draw from -//! think of it as an inverse mixer. +//! Slightly less efficient microphone that is Send on all platforms. + +// The normal microphone is not send on mac OS as the cpal::stream (handle) is not Send. This +// creates and then keeps that handle on a seperate thread. use std::{ sync::{ @@ -36,10 +38,9 @@ impl Microphone { ) -> Result { let hundred_ms_of_samples = config.channel_count.get() as u32 * config.sample_rate.get() / 10; - // Using rtrb (real-time ring buffer) instead of std::sync::mpsc or the ringbuf crate for - // audio performance. While ringbuf has Send variants that could eliminate the need for - // separate sendable/non-sendable microphone implementations, rtrb has been benchmarked to - // be significantly faster in throughput and provides lower latency operations. + // Using rtrb (real-time ring buffer) instead of std::sync::mpsc or the ringbuf crate rtrb + // has been benchmarked to be significantly faster in throughput and provides lower latency + // operations. let (tx, rx) = RingBuffer::new(hundred_ms_of_samples as usize); let error_occurred = Arc::new(AtomicBool::new(false)); let data_signal = Arc::new((Mutex::new(()), Condvar::new())); @@ -58,7 +59,7 @@ impl Microphone { let (_drop_tx, drop_rx) = mpsc::channel::<()>(); let data_signal_clone = data_signal.clone(); let _stream_thread = thread::Builder::new() - .name("Rodio cloneable microphone".to_string()) + .name("Rodio sendable microphone".to_string()) .spawn(move || { match open_input_stream(device, config, tx, error_callback, data_signal_clone) { Err(e) => { @@ -66,7 +67,7 @@ impl Microphone { } Ok(_) => { let _ = res_tx.send(Ok(())); - // Keep the stream alive until we're told to drop + // Keep this thread alive until the Microphone struct is dropped let _should_drop = drop_rx.recv(); } } @@ -138,7 +139,7 @@ impl Iterator for Microphone { return None; } else { // Block until notified instead of sleeping. This eliminates polling overhead and - // reduces jitter by avoiding unnecessary wakeups when no audio data is available. + // reduces jitter by avoiding unnecessary wakeups when no audio data is available. let (lock, cvar) = &*self.data_signal; let guard = lock.lock().unwrap(); let _guard = cvar.wait(guard).unwrap();