Skip to content

Commit 77ba291

Browse files
committed
refactor: improve error callback handling and disconnect listener robustness
- Use try_lock to avoid panics and recover from poisoned locks when invoking error callbacks - Skip error callback if lock is busy - Enhance disconnect listener thread to report creation errors and ensure proper synchronization - Update stream lock error message for clarity
1 parent 4cc1277 commit 77ba291

File tree

2 files changed

+86
-29
lines changed

2 files changed

+86
-29
lines changed

src/host/coreaudio/macos/device.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,17 @@ impl Device {
707707
// TODO: Need a better way to get delay, for now we assume a double-buffer offset.
708708
let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) {
709709
Err(err) => {
710-
(error_callback.lock().unwrap())(err.into());
710+
// Try to invoke error callback, recovering from poison if needed
711+
match error_callback.try_lock() {
712+
Ok(mut cb) => cb(err.into()),
713+
Err(std::sync::TryLockError::Poisoned(guard)) => {
714+
// Recover from poisoned lock to still report this error
715+
guard.into_inner()(err.into());
716+
}
717+
Err(std::sync::TryLockError::WouldBlock) => {
718+
// Skip if callback is busy
719+
}
720+
}
711721
return Err(());
712722
}
713723
Ok(cb) => cb,
@@ -730,8 +740,15 @@ impl Device {
730740
} else {
731741
let error_callback_clone = error_callback_disconnect.clone();
732742
Box::new(move |err: StreamError| {
733-
if let Ok(mut cb) = error_callback_clone.lock() {
734-
cb(err);
743+
match error_callback_clone.try_lock() {
744+
Ok(mut cb) => cb(err),
745+
Err(std::sync::TryLockError::Poisoned(guard)) => {
746+
// Recover from poisoned lock to still report this error
747+
guard.into_inner()(err);
748+
}
749+
Err(std::sync::TryLockError::WouldBlock) => {
750+
// Skip if callback is busy
751+
}
735752
}
736753
})
737754
};
@@ -751,7 +768,7 @@ impl Device {
751768
.lock()
752769
.map_err(|_| BuildStreamError::BackendSpecific {
753770
err: BackendSpecificError {
754-
description: "Failed to acquire stream lock".to_string(),
771+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
755772
},
756773
})?
757774
.audio_unit
@@ -829,7 +846,17 @@ impl Device {
829846

830847
let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) {
831848
Err(err) => {
832-
(error_callback.lock().unwrap())(err.into());
849+
// Try to invoke error callback, recovering from poison if needed
850+
match error_callback.try_lock() {
851+
Ok(mut cb) => cb(err.into()),
852+
Err(std::sync::TryLockError::Poisoned(guard)) => {
853+
// Recover from poisoned lock to still report this error
854+
guard.into_inner()(err.into());
855+
}
856+
Err(std::sync::TryLockError::WouldBlock) => {
857+
// Skip if callback is busy
858+
}
859+
}
833860
return Err(());
834861
}
835862
Ok(cb) => cb,
@@ -853,8 +880,15 @@ impl Device {
853880
} else {
854881
let error_callback_clone = error_callback_disconnect.clone();
855882
Box::new(move |err: StreamError| {
856-
if let Ok(mut cb) = error_callback_clone.lock() {
857-
cb(err);
883+
match error_callback_clone.try_lock() {
884+
Ok(mut cb) => cb(err),
885+
Err(std::sync::TryLockError::Poisoned(guard)) => {
886+
// Recover from poisoned lock to still report this error
887+
guard.into_inner()(err);
888+
}
889+
Err(std::sync::TryLockError::WouldBlock) => {
890+
// Skip if callback is busy
891+
}
858892
}
859893
})
860894
};
@@ -874,7 +908,7 @@ impl Device {
874908
.lock()
875909
.map_err(|_| BuildStreamError::BackendSpecific {
876910
err: BackendSpecificError {
877-
description: "Failed to acquire stream lock".to_string(),
911+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
878912
},
879913
})?
880914
.audio_unit

src/host/coreaudio/macos/mod.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ type ErrorCallback = Box<dyn FnMut(crate::StreamError) + Send + 'static>;
6161
/// Manages device disconnection listener on a dedicated thread to ensure the
6262
/// AudioObjectPropertyListener is always created and dropped on the same thread.
6363
/// This avoids potential threading issues with CoreAudio APIs.
64+
///
65+
/// When a device disconnects, this manager:
66+
/// 1. Attempts to pause the stream to stop audio I/O
67+
/// 2. Calls the error callback with `StreamError::DeviceNotAvailable`
68+
///
69+
/// The dedicated thread architecture ensures `Stream` can implement `Send`.
6470
struct DisconnectManager {
6571
_shutdown_tx: mpsc::Sender<()>,
6672
}
@@ -74,6 +80,7 @@ impl DisconnectManager {
7480
) -> Result<Self, crate::BuildStreamError> {
7581
let (shutdown_tx, shutdown_rx) = mpsc::channel();
7682
let (disconnect_tx, disconnect_rx) = mpsc::channel();
83+
let (ready_tx, ready_rx) = mpsc::channel();
7784

7885
// Spawn dedicated thread to own the AudioObjectPropertyListener
7986
let disconnect_tx_clone = disconnect_tx.clone();
@@ -85,16 +92,29 @@ impl DisconnectManager {
8592
};
8693

8794
// Create the listener on this dedicated thread
88-
let _listener =
89-
AudioObjectPropertyListener::new(device_id, property_address, move || {
90-
let _ = disconnect_tx_clone.send(());
91-
})
92-
.unwrap();
93-
94-
// Drop the listener on this thread after receiving a shutdown signal
95-
let _ = shutdown_rx.recv();
95+
match AudioObjectPropertyListener::new(device_id, property_address, move || {
96+
let _ = disconnect_tx_clone.send(());
97+
}) {
98+
Ok(_listener) => {
99+
let _ = ready_tx.send(Ok(()));
100+
// Drop the listener on this thread after receiving a shutdown signal
101+
let _ = shutdown_rx.recv();
102+
}
103+
Err(e) => {
104+
let _ = ready_tx.send(Err(e));
105+
}
106+
}
96107
});
97108

109+
// Wait for listener creation to complete or fail
110+
ready_rx
111+
.recv()
112+
.map_err(|_| crate::BuildStreamError::BackendSpecific {
113+
err: BackendSpecificError {
114+
description: "Disconnect listener thread terminated unexpectedly".to_string(),
115+
},
116+
})??;
117+
98118
// Handle disconnect events on the main thread pool
99119
let stream_weak_clone = stream_weak.clone();
100120
let error_callback_clone = error_callback.clone();
@@ -103,21 +123,24 @@ impl DisconnectManager {
103123
// Check if stream still exists
104124
if let Some(stream_arc) = stream_weak_clone.upgrade() {
105125
// First, try to pause the stream to stop playback
106-
match stream_arc.lock() {
107-
Ok(mut stream_inner) => {
108-
let _ = stream_inner.pause();
109-
}
110-
Err(_) => {
111-
// Could not acquire lock. This can occur if there are
112-
// overlapping locks, if the stream is already in use, or if a panic
113-
// occurred during a previous lock. Still notify about device
114-
// disconnection even if we can't pause.
115-
}
126+
if let Ok(mut stream_inner) = stream_arc.try_lock() {
127+
let _ = stream_inner.pause();
116128
}
117129

118-
// Call the error callback to notify about device disconnection
119-
if let Ok(mut cb) = error_callback_clone.lock() {
120-
cb(crate::StreamError::DeviceNotAvailable);
130+
// Always try to notify about device disconnection
131+
match error_callback_clone.try_lock() {
132+
Ok(mut cb) => {
133+
cb(crate::StreamError::DeviceNotAvailable);
134+
}
135+
Err(std::sync::TryLockError::WouldBlock) => {
136+
// Error callback is being invoked - skip this notification
137+
}
138+
Err(std::sync::TryLockError::Poisoned(guard)) => {
139+
// Error callback panicked - try to recover and still notify
140+
// This is critical: device disconnected AND callback is broken
141+
let mut cb = guard.into_inner();
142+
cb(crate::StreamError::DeviceNotAvailable);
143+
}
121144
}
122145
} else {
123146
// Stream is gone, exit the handler thread

0 commit comments

Comments
 (0)