diff --git a/remoc/src/chmux/any_storage.rs b/remoc/src/chmux/any_storage.rs index 21cec49..6ba682f 100644 --- a/remoc/src/chmux/any_storage.rs +++ b/remoc/src/chmux/any_storage.rs @@ -8,6 +8,8 @@ use std::{ }; use uuid::Uuid; +use crate::executor::MutexExt; + /// Box containing any value that is Send, Sync and static. pub type AnyBox = Box; @@ -26,7 +28,7 @@ pub struct AnyStorage { impl fmt::Debug for AnyStorage { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let entries = self.entries.lock().unwrap(); + let entries = self.entries.xlock().unwrap(); write!(f, "{:?}", *entries) } } @@ -39,7 +41,7 @@ impl AnyStorage { /// Insert a new entry into the storage and return its key. pub fn insert(&self, entry: AnyEntry) -> Uuid { - let mut entries = self.entries.lock().unwrap(); + let mut entries = self.entries.xlock().unwrap(); loop { let key = Uuid::new_v4(); if let Entry::Vacant(e) = entries.entry(key) { @@ -51,13 +53,13 @@ impl AnyStorage { /// Returns the value from the storage for the specified key. pub fn get(&self, key: Uuid) -> Option { - let entries = self.entries.lock().unwrap(); + let entries = self.entries.xlock().unwrap(); entries.get(&key).cloned() } /// Removes the value for the specified key from the storage and returns it. pub fn remove(&self, key: Uuid) -> Option { - let mut entries = self.entries.lock().unwrap(); + let mut entries = self.entries.xlock().unwrap(); entries.remove(&key) } } diff --git a/remoc/src/chmux/client.rs b/remoc/src/chmux/client.rs index 4200ee0..24b6bf0 100644 --- a/remoc/src/chmux/client.rs +++ b/remoc/src/chmux/client.rs @@ -12,14 +12,16 @@ use std::{ }; use tokio::sync::{mpsc, oneshot}; -use crate::{executor, executor::task::JoinHandle}; - use super::{ port_allocator::{PortAllocator, PortNumber}, receiver::Receiver, sender::Sender, PortReq, }; +use crate::{ + executor, + executor::{task::JoinHandle, MutexExt}, +}; /// An error occurred during connecting to a remote service. #[derive(Debug, Clone)] @@ -87,7 +89,7 @@ impl ConntectRequestCrediter { pub async fn request(&self) -> ConnectRequestCredit { loop { let rx = { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); if inner.used < inner.limit { inner.used += 1; @@ -107,7 +109,7 @@ impl ConntectRequestCrediter { /// /// Does not wait for the credit to become available. pub fn try_request(&self) -> Option { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); if inner.used < inner.limit { inner.used += 1; @@ -124,7 +126,7 @@ pub(crate) struct ConnectRequestCredit(Arc>) impl Drop for ConnectRequestCredit { fn drop(&mut self) { let notify_tx = { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); inner.used -= 1; mem::take(&mut inner.notify_tx) }; diff --git a/remoc/src/chmux/credit.rs b/remoc/src/chmux/credit.rs index 71e0713..532c83b 100644 --- a/remoc/src/chmux/credit.rs +++ b/remoc/src/chmux/credit.rs @@ -9,6 +9,7 @@ use tokio::sync::{ }; use super::{mux::PortEvt, ChMuxError, SendError}; +use crate::executor::MutexExt; // =========================================================================== // Credit accounting for sending data @@ -56,7 +57,7 @@ impl Drop for AssignedCredits { fn drop(&mut self) { if self.port > 0 { if let Some(port) = self.port_inner.upgrade() { - let mut port = port.lock().unwrap(); + let mut port = port.xlock().unwrap(); port.credits += self.port; } } @@ -80,7 +81,7 @@ impl CreditProvider { &self, credits: u32, ) -> Result<(), ChMuxError> { let notify = { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); match inner.credits.checked_add(credits) { Some(new_credits) => inner.credits = new_credits, @@ -100,7 +101,7 @@ impl CreditProvider { /// Closes the channel. pub fn close(&self, gracefully: bool) { let notify = { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); inner.closed = Some(gracefully); @@ -132,7 +133,7 @@ impl CreditUser { Some(channel) => channel, None => return Err(SendError::ChMux), }; - let mut channel = channel.lock().unwrap(); + let mut channel = channel.xlock().unwrap(); if let Some(gracefully) = channel.closed { if !self.override_graceful_close || !gracefully { return Err(SendError::Closed { gracefully }); @@ -164,7 +165,7 @@ impl CreditUser { Some(channel) => channel, None => return Err(SendError::ChMux), }; - let mut channel = channel.lock().unwrap(); + let mut channel = channel.xlock().unwrap(); if let Some(gracefully) = channel.closed { if !self.override_graceful_close || !gracefully { return Err(SendError::Closed { gracefully }); @@ -213,7 +214,7 @@ impl ChannelCreditMonitor { pub fn use_credits( &self, credits: u32, ) -> Result> { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); match inner.used.checked_add(credits) { Some(new_used) if new_used <= inner.limit => { inner.used = new_used; @@ -239,7 +240,7 @@ impl ChannelCreditReturner { assert!(self.return_fut.is_none(), "start_return_one called without poll_return_flush"); if let Some(monitor) = self.monitor.upgrade() { - let mut monitor = monitor.lock().unwrap(); + let mut monitor = monitor.xlock().unwrap(); monitor.used -= credit.0; self.to_return += credit.0; diff --git a/remoc/src/chmux/mux.rs b/remoc/src/chmux/mux.rs index bbaf8cd..da4bfa8 100644 --- a/remoc/src/chmux/mux.rs +++ b/remoc/src/chmux/mux.rs @@ -35,7 +35,10 @@ use super::{ sender::Sender, AnyStorage, Cfg, ChMuxError, PortReq, PROTOCOL_VERSION, PROTOCOL_VERSION_PORT_ID, }; -use crate::executor::time::{sleep, timeout}; +use crate::executor::{ + time::{sleep, timeout}, + MutexExt, +}; /// Multiplexer protocol error. fn protocol_err(msg: impl AsRef) -> super::ChMuxError { @@ -1089,7 +1092,7 @@ where // Send hangup notifications. remote_receiver_closed.store(true, Ordering::SeqCst); - let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap(); + let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap(); for tx in notifies { let _ = tx.send(()); } @@ -1125,7 +1128,7 @@ where // Send hangup notifications. remote_receiver_closed.store(true, Ordering::SeqCst); - let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap(); + let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap(); for tx in notifies { let _ = tx.send(()); } diff --git a/remoc/src/chmux/port_allocator.rs b/remoc/src/chmux/port_allocator.rs index a0fc12b..8099c60 100644 --- a/remoc/src/chmux/port_allocator.rs +++ b/remoc/src/chmux/port_allocator.rs @@ -9,6 +9,8 @@ use std::{ }; use tokio::sync::oneshot; +use crate::executor::MutexExt; + struct PortAllocatorInner { used: HashSet, limit: u32, @@ -45,7 +47,7 @@ pub struct PortAllocator(Arc>); impl fmt::Debug for PortAllocator { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let inner = self.0.lock().unwrap(); + let inner = self.0.xlock().unwrap(); f.debug_struct("PortAllocator").field("used", &inner.used.len()).field("limit", &inner.limit).finish() } } @@ -64,7 +66,7 @@ impl PortAllocator { pub async fn allocate(&self) -> PortNumber { loop { let rx = { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); match inner.try_allocate(self.0.clone()) { Some(number) => return number, None => { @@ -83,7 +85,7 @@ impl PortAllocator { /// /// If all port are currently in use, this returns [None]. pub fn try_allocate(&self) -> Option { - let mut inner = self.0.lock().unwrap(); + let mut inner = self.0.xlock().unwrap(); inner.try_allocate(self.0.clone()) } } @@ -151,7 +153,7 @@ impl Borrow for PortNumber { impl Drop for PortNumber { fn drop(&mut self) { let notify_tx = { - let mut inner = self.allocator.lock().unwrap(); + let mut inner = self.allocator.xlock().unwrap(); inner.used.remove(&self.number); mem::take(&mut inner.notify_tx) }; diff --git a/remoc/src/chmux/sender.rs b/remoc/src/chmux/sender.rs index 67739e4..c87885b 100644 --- a/remoc/src/chmux/sender.rs +++ b/remoc/src/chmux/sender.rs @@ -18,14 +18,13 @@ use std::{ use tokio::sync::{mpsc, oneshot, Mutex}; use tokio_util::sync::ReusableBoxFuture; -use crate::executor; - use super::{ client::ConnectResponse, credit::{AssignedCredits, CreditUser}, mux::PortEvt, AnyStorage, Connect, ConnectError, PortAllocator, PortReq, }; +use crate::executor::{self, MutexExt}; /// An error occurred during sending of a message. #[derive(Debug, Clone)] @@ -163,7 +162,7 @@ impl fmt::Debug for Closed { impl Closed { fn new(hangup_notify: &Weak>>>>) -> Self { if let Some(hangup_notify) = hangup_notify.upgrade() { - if let Some(notifiers) = hangup_notify.lock().unwrap().as_mut() { + if let Some(notifiers) = hangup_notify.xlock().unwrap().as_mut() { let (tx, rx) = oneshot::channel(); notifiers.push(tx); Self { diff --git a/remoc/src/executor/mod.rs b/remoc/src/executor/mod.rs index a93ea0c..d377794 100644 --- a/remoc/src/executor/mod.rs +++ b/remoc/src/executor/mod.rs @@ -15,6 +15,8 @@ mod js; #[cfg(feature = "js")] pub use js::*; +pub use task::spawn; + /// Whether threads are available and working on this platform. #[inline] pub fn are_threads_available() -> bool { @@ -61,10 +63,13 @@ impl MutexExt for std::sync::Mutex { match self.try_lock() { Ok(guard) => return Ok(guard), Err(std::sync::TryLockError::Poisoned(p)) => return Err(p), - Err(std::sync::TryLockError::WouldBlock) => (), + Err(std::sync::TryLockError::WouldBlock) => { + std::thread::yield_now(); + + #[cfg(all(target_family = "wasm", not(target_feature = "atomics")))] + panic!("mutex deadlock"); + } } } } } - -pub use task::spawn; diff --git a/remoc/src/rch/base/sender.rs b/remoc/src/rch/base/sender.rs index c2569e9..922bcdb 100644 --- a/remoc/src/rch/base/sender.rs +++ b/remoc/src/rch/base/sender.rs @@ -24,7 +24,7 @@ use crate::{ chmux::{self, AnyStorage, PortReq}, codec::{self, SerializationError, StreamingUnavailable}, executor, - executor::task, + executor::{task, MutexExt}, }; pub use crate::chmux::Closed; @@ -294,7 +294,7 @@ where let result = task::spawn_blocking(move || { let ps_ref = PortSerializer::start(allocator, storage); - let item = item_arc_task.lock().unwrap(); + let item = item_arc_task.xlock().unwrap(); ::serialize(&mut cbw, &*item)?; let cbw = cbw.into_inner().map_err(|_| { diff --git a/remoc/src/rch/bin/receiver.rs b/remoc/src/rch/bin/receiver.rs index 4e93215..9e1bc36 100644 --- a/remoc/src/rch/bin/receiver.rs +++ b/remoc/src/rch/bin/receiver.rs @@ -12,7 +12,7 @@ use super::{ }, Interlock, Location, }; -use crate::chmux; +use crate::{chmux, executor::MutexExt}; /// A binary channel receiver. pub struct Receiver { @@ -76,7 +76,7 @@ impl Serialize for Receiver { { let sender_tx = self.sender_tx.clone(); let interlock_confirm = { - let mut interlock = self.interlock.lock().unwrap(); + let mut interlock = self.interlock.xlock().unwrap(); if interlock.sender.check_local() { Some(interlock.sender.start_send()) } else { @@ -109,7 +109,7 @@ impl Serialize for Receiver { // Forwarding. _ => { let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.lock().unwrap() = Some(successor_tx); + *self.successor_tx.xlock().unwrap() = Some(successor_tx); let (tx, rx) = super::channel(); PortSerializer::spawn(Self::forward(successor_rx, tx))?; @@ -154,7 +154,7 @@ impl<'de> Deserialize<'de> for Receiver { impl Drop for Receiver { fn drop(&mut self) { - let successor_tx = self.successor_tx.lock().unwrap().take(); + let successor_tx = self.successor_tx.xlock().unwrap().take(); if let Some(successor_tx) = successor_tx { let dummy = Self { receiver: None, diff --git a/remoc/src/rch/bin/sender.rs b/remoc/src/rch/bin/sender.rs index 5ce4258..e43f5fe 100644 --- a/remoc/src/rch/bin/sender.rs +++ b/remoc/src/rch/bin/sender.rs @@ -12,7 +12,7 @@ use super::{ }, Interlock, Location, }; -use crate::chmux; +use crate::{chmux, executor::MutexExt}; /// A binary channel sender. pub struct Sender { @@ -76,7 +76,7 @@ impl Serialize for Sender { { let receiver_tx = self.receiver_tx.clone(); let interlock_confirm = { - let mut interlock = self.interlock.lock().unwrap(); + let mut interlock = self.interlock.xlock().unwrap(); if interlock.receiver.check_local() { Some(interlock.receiver.start_send()) } else { @@ -109,7 +109,7 @@ impl Serialize for Sender { // Forwarding. _ => { let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.lock().unwrap() = Some(successor_tx); + *self.successor_tx.xlock().unwrap() = Some(successor_tx); let (tx, rx) = super::channel(); PortSerializer::spawn(Self::forward(successor_rx, rx))?; @@ -154,7 +154,7 @@ impl<'de> Deserialize<'de> for Sender { impl Drop for Sender { fn drop(&mut self) { - let successor_tx = self.successor_tx.lock().unwrap().take(); + let successor_tx = self.successor_tx.xlock().unwrap().take(); if let Some(successor_tx) = successor_tx { let dummy = Self { sender: None, diff --git a/remoc/src/rch/broadcast/sender.rs b/remoc/src/rch/broadcast/sender.rs index cf3bcfc..fee7552 100644 --- a/remoc/src/rch/broadcast/sender.rs +++ b/remoc/src/rch/broadcast/sender.rs @@ -10,7 +10,7 @@ use super::{ super::{base, mpsc, SendErrorExt}, BroadcastMsg, Receiver, }; -use crate::{chmux, codec, executor, RemoteSend}; +use crate::{chmux, codec, executor, executor::MutexExt, RemoteSend}; /// An error occurred during sending over a broadcast channel. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -149,7 +149,7 @@ where /// No back-pressure is provided. #[inline] pub fn send(&self, value: T) -> Result<(), SendError> { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.xlock().unwrap(); // Fetch subscribers that have become ready again. while let Ok(sub) = inner.ready_rx.try_recv() { @@ -201,7 +201,7 @@ where pub fn subscribe( &self, send_buffer: usize, ) -> Receiver { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.xlock().unwrap(); let (tx, rx) = mpsc::channel(send_buffer); let tx = tx.set_buffer(); @@ -214,7 +214,7 @@ where pub fn subscribe_with_max_item_size( &self, send_buffer: usize, ) -> Receiver { - let mut inner = self.inner.lock().unwrap(); + let mut inner = self.inner.xlock().unwrap(); let (tx, rx) = mpsc::channel(send_buffer); let mut tx = tx.set_buffer(); diff --git a/remoc/src/rch/lr/receiver.rs b/remoc/src/rch/lr/receiver.rs index d288fe7..172b6dd 100644 --- a/remoc/src/rch/lr/receiver.rs +++ b/remoc/src/rch/lr/receiver.rs @@ -17,6 +17,7 @@ use super::{ use crate::{ chmux, codec::{self, DeserializationError}, + executor::MutexExt, }; /// An error that occurred during receiving from a remote endpoint. @@ -183,7 +184,7 @@ where self.sender_tx.clone().ok_or_else(|| ser::Error::custom("cannot forward received receiver"))?; let interlock_confirm = { - let mut interlock = self.interlock.lock().unwrap(); + let mut interlock = self.interlock.xlock().unwrap(); if !interlock.sender.check_local() { return Err(ser::Error::custom("cannot send receiver because sender has been sent")); } diff --git a/remoc/src/rch/lr/sender.rs b/remoc/src/rch/lr/sender.rs index 0a3d616..8d2dc57 100644 --- a/remoc/src/rch/lr/sender.rs +++ b/remoc/src/rch/lr/sender.rs @@ -17,6 +17,7 @@ use super::{ use crate::{ chmux, codec::{self, SerializationError}, + executor::MutexExt, }; pub use super::super::base::Closed; @@ -228,7 +229,7 @@ where self.receiver_tx.clone().ok_or_else(|| ser::Error::custom("cannot forward received sender"))?; let interlock_confirm = { - let mut interlock = self.interlock.lock().unwrap(); + let mut interlock = self.interlock.xlock().unwrap(); if !interlock.receiver.check_local() { return Err(ser::Error::custom("cannot send sender because receiver has been sent")); } diff --git a/remoc/src/rch/mpsc/receiver.rs b/remoc/src/rch/mpsc/receiver.rs index 677c845..32b2c2b 100644 --- a/remoc/src/rch/mpsc/receiver.rs +++ b/remoc/src/rch/mpsc/receiver.rs @@ -17,7 +17,7 @@ use super::{ }, Distributor, }; -use crate::{chmux, codec, executor, RemoteSend}; +use crate::{chmux, codec, executor, executor::MutexExt, RemoteSend}; /// An error occurred during receiving over an mpsc channel. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -384,7 +384,7 @@ impl Drop { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - let mut successor_tx = self.successor_tx.lock().unwrap(); + let mut successor_tx = self.successor_tx.xlock().unwrap(); if let Some(successor_tx) = successor_tx.take() { let _ = successor_tx.send(inner); } else if !inner.closed { @@ -408,7 +408,7 @@ where { // Register successor of this receiver. let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.lock().unwrap() = Some(successor_tx); + *self.successor_tx.xlock().unwrap() = Some(successor_tx); let port = PortSerializer::connect(|connect| { async move { diff --git a/remoc/src/rch/watch/sender.rs b/remoc/src/rch/watch/sender.rs index 47378b0..bbce42e 100644 --- a/remoc/src/rch/watch/sender.rs +++ b/remoc/src/rch/watch/sender.rs @@ -10,7 +10,7 @@ use super::{ receiver::RecvError, Receiver, Ref, }; -use crate::{chmux, codec, RemoteSend}; +use crate::{chmux, codec, executor::MutexExt, RemoteSend}; /// An error occurred during sending over an mpsc channel. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -228,12 +228,12 @@ where fn update_error(&self) { let inner = self.inner.as_ref().unwrap(); - let mut current_err = inner.current_err.lock().unwrap(); + let mut current_err = inner.current_err.xlock().unwrap(); if current_err.is_some() { return; } - let mut remote_send_err_rx = inner.remote_send_err_rx.lock().unwrap(); + let mut remote_send_err_rx = inner.remote_send_err_rx.xlock().unwrap(); if let Ok(err) = remote_send_err_rx.try_recv() { *current_err = Some(err); } @@ -248,7 +248,7 @@ where self.update_error(); let inner = self.inner.as_ref().unwrap(); - let current_err = inner.current_err.lock().unwrap(); + let current_err = inner.current_err.xlock().unwrap(); current_err.clone().map(|err| err.into()) } @@ -257,7 +257,7 @@ where self.update_error(); let inner = self.inner.as_ref().unwrap(); - let mut current_err = inner.current_err.lock().unwrap(); + let mut current_err = inner.current_err.xlock().unwrap(); *current_err = None; } @@ -296,7 +296,7 @@ where impl Drop for Sender { fn drop(&mut self) { - if let Some(successor_tx) = self.successor_tx.lock().unwrap().take() { + if let Some(successor_tx) = self.successor_tx.xlock().unwrap().take() { let _ = successor_tx.send(self.inner.take().unwrap()); } } @@ -317,7 +317,7 @@ where // Prepare channel for takeover. let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.lock().unwrap() = Some(successor_tx); + *self.successor_tx.xlock().unwrap() = Some(successor_tx); let port = PortSerializer::connect(move |connect| { async move { diff --git a/remoc/src/robj/lazy_blob/fw_bin.rs b/remoc/src/robj/lazy_blob/fw_bin.rs index e181482..4ba433b 100644 --- a/remoc/src/robj/lazy_blob/fw_bin.rs +++ b/remoc/src/robj/lazy_blob/fw_bin.rs @@ -3,7 +3,7 @@ use serde::{ser, Deserialize, Serialize}; use std::sync::Mutex; -use crate::{executor, rch::bin}; +use crate::{executor, executor::MutexExt, rch::bin}; /// A chmux sender that can be remotely sent and forwarded. pub(crate) struct Sender { @@ -13,7 +13,7 @@ pub(crate) struct Sender { impl Sender { pub fn into_inner(self) -> Option { - let mut bin_tx = self.bin_tx.lock().unwrap(); + let mut bin_tx = self.bin_tx.xlock().unwrap(); bin_tx.take() } } @@ -29,8 +29,8 @@ impl Serialize for Sender { where S: serde::Serializer, { - let mut bin_tx = self.bin_tx.lock().unwrap(); - let mut bin_rx_tx = self.bin_rx_tx.lock().unwrap(); + let mut bin_tx = self.bin_tx.xlock().unwrap(); + let mut bin_rx_tx = self.bin_rx_tx.xlock().unwrap(); match (bin_tx.take(), bin_rx_tx.take()) { // Initial send.