From 0a0342899bf51bfdeb3d21d57c21eaf00d0d8e0c Mon Sep 17 00:00:00 2001 From: Sebastian Urban Date: Fri, 20 Dec 2024 15:23:58 +0100 Subject: [PATCH] Remove spinning mutex --- remoc/src/chmux/any_storage.rs | 10 ++++------ remoc/src/chmux/credit.rs | 15 +++++++------- remoc/src/chmux/mux.rs | 9 +++------ remoc/src/chmux/port_allocator.rs | 10 ++++------ remoc/src/chmux/sender.rs | 4 ++-- remoc/src/exec/js/mod.rs | 14 ------------- remoc/src/exec/js/thread_pool.rs | 6 +++--- remoc/src/exec/js/time.rs | 6 +++--- remoc/src/exec/mod.rs | 32 ------------------------------ remoc/src/exec/native/mod.rs | 8 -------- remoc/src/rch/base/sender.rs | 4 ++-- remoc/src/rch/bin/receiver.rs | 8 ++++---- remoc/src/rch/bin/sender.rs | 8 ++++---- remoc/src/rch/broadcast/sender.rs | 8 ++++---- remoc/src/rch/lr/receiver.rs | 3 +-- remoc/src/rch/lr/sender.rs | 3 +-- remoc/src/rch/mpsc/receiver.rs | 6 +++--- remoc/src/rch/watch/sender.rs | 14 ++++++------- remoc/src/robj/lazy_blob/fw_bin.rs | 8 ++++---- 19 files changed, 56 insertions(+), 120 deletions(-) diff --git a/remoc/src/chmux/any_storage.rs b/remoc/src/chmux/any_storage.rs index 2b22980..21cec49 100644 --- a/remoc/src/chmux/any_storage.rs +++ b/remoc/src/chmux/any_storage.rs @@ -8,8 +8,6 @@ use std::{ }; use uuid::Uuid; -use crate::exec::MutexExt; - /// Box containing any value that is Send, Sync and static. pub type AnyBox = Box; @@ -28,7 +26,7 @@ pub struct AnyStorage { impl fmt::Debug for AnyStorage { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let entries = self.entries.xlock().unwrap(); + let entries = self.entries.lock().unwrap(); write!(f, "{:?}", *entries) } } @@ -41,7 +39,7 @@ impl AnyStorage { /// Insert a new entry into the storage and return its key. pub fn insert(&self, entry: AnyEntry) -> Uuid { - let mut entries = self.entries.xlock().unwrap(); + let mut entries = self.entries.lock().unwrap(); loop { let key = Uuid::new_v4(); if let Entry::Vacant(e) = entries.entry(key) { @@ -53,13 +51,13 @@ impl AnyStorage { /// Returns the value from the storage for the specified key. pub fn get(&self, key: Uuid) -> Option { - let entries = self.entries.xlock().unwrap(); + let entries = self.entries.lock().unwrap(); entries.get(&key).cloned() } /// Removes the value for the specified key from the storage and returns it. pub fn remove(&self, key: Uuid) -> Option { - let mut entries = self.entries.xlock().unwrap(); + let mut entries = self.entries.lock().unwrap(); entries.remove(&key) } } diff --git a/remoc/src/chmux/credit.rs b/remoc/src/chmux/credit.rs index 8b86bab..71e0713 100644 --- a/remoc/src/chmux/credit.rs +++ b/remoc/src/chmux/credit.rs @@ -9,7 +9,6 @@ use tokio::sync::{ }; use super::{mux::PortEvt, ChMuxError, SendError}; -use crate::exec::MutexExt; // =========================================================================== // Credit accounting for sending data @@ -57,7 +56,7 @@ impl Drop for AssignedCredits { fn drop(&mut self) { if self.port > 0 { if let Some(port) = self.port_inner.upgrade() { - let mut port = port.xlock().unwrap(); + let mut port = port.lock().unwrap(); port.credits += self.port; } } @@ -81,7 +80,7 @@ impl CreditProvider { &self, credits: u32, ) -> Result<(), ChMuxError> { let notify = { - let mut inner = self.0.xlock().unwrap(); + let mut inner = self.0.lock().unwrap(); match inner.credits.checked_add(credits) { Some(new_credits) => inner.credits = new_credits, @@ -101,7 +100,7 @@ impl CreditProvider { /// Closes the channel. pub fn close(&self, gracefully: bool) { let notify = { - let mut inner = self.0.xlock().unwrap(); + let mut inner = self.0.lock().unwrap(); inner.closed = Some(gracefully); @@ -133,7 +132,7 @@ impl CreditUser { Some(channel) => channel, None => return Err(SendError::ChMux), }; - let mut channel = channel.xlock().unwrap(); + let mut channel = channel.lock().unwrap(); if let Some(gracefully) = channel.closed { if !self.override_graceful_close || !gracefully { return Err(SendError::Closed { gracefully }); @@ -165,7 +164,7 @@ impl CreditUser { Some(channel) => channel, None => return Err(SendError::ChMux), }; - let mut channel = channel.xlock().unwrap(); + let mut channel = channel.lock().unwrap(); if let Some(gracefully) = channel.closed { if !self.override_graceful_close || !gracefully { return Err(SendError::Closed { gracefully }); @@ -214,7 +213,7 @@ impl ChannelCreditMonitor { pub fn use_credits( &self, credits: u32, ) -> Result> { - let mut inner = self.0.xlock().unwrap(); + let mut inner = self.0.lock().unwrap(); match inner.used.checked_add(credits) { Some(new_used) if new_used <= inner.limit => { inner.used = new_used; @@ -240,7 +239,7 @@ impl ChannelCreditReturner { assert!(self.return_fut.is_none(), "start_return_one called without poll_return_flush"); if let Some(monitor) = self.monitor.upgrade() { - let mut monitor = monitor.xlock().unwrap(); + let mut monitor = monitor.lock().unwrap(); monitor.used -= credit.0; self.to_return += credit.0; diff --git a/remoc/src/chmux/mux.rs b/remoc/src/chmux/mux.rs index a33d0fa..da66efa 100644 --- a/remoc/src/chmux/mux.rs +++ b/remoc/src/chmux/mux.rs @@ -35,10 +35,7 @@ use super::{ sender::Sender, AnyStorage, Cfg, ChMuxError, PortReq, PROTOCOL_VERSION, PROTOCOL_VERSION_PORT_ID, }; -use crate::exec::{ - time::{sleep, timeout}, - MutexExt, -}; +use crate::exec::time::{sleep, timeout}; /// Multiplexer protocol error. fn protocol_err(msg: impl AsRef) -> super::ChMuxError { @@ -1092,7 +1089,7 @@ where // Send hangup notifications. remote_receiver_closed.store(true, Ordering::Relaxed); - let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap(); + let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap(); for tx in notifies { let _ = tx.send(()); } @@ -1128,7 +1125,7 @@ where // Send hangup notifications. remote_receiver_closed.store(true, Ordering::Relaxed); - let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap(); + let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap(); for tx in notifies { let _ = tx.send(()); } diff --git a/remoc/src/chmux/port_allocator.rs b/remoc/src/chmux/port_allocator.rs index d3023fa..a0fc12b 100644 --- a/remoc/src/chmux/port_allocator.rs +++ b/remoc/src/chmux/port_allocator.rs @@ -9,8 +9,6 @@ use std::{ }; use tokio::sync::oneshot; -use crate::exec::MutexExt; - struct PortAllocatorInner { used: HashSet, limit: u32, @@ -47,7 +45,7 @@ pub struct PortAllocator(Arc>); impl fmt::Debug for PortAllocator { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - let inner = self.0.xlock().unwrap(); + let inner = self.0.lock().unwrap(); f.debug_struct("PortAllocator").field("used", &inner.used.len()).field("limit", &inner.limit).finish() } } @@ -66,7 +64,7 @@ impl PortAllocator { pub async fn allocate(&self) -> PortNumber { loop { let rx = { - let mut inner = self.0.xlock().unwrap(); + let mut inner = self.0.lock().unwrap(); match inner.try_allocate(self.0.clone()) { Some(number) => return number, None => { @@ -85,7 +83,7 @@ impl PortAllocator { /// /// If all port are currently in use, this returns [None]. pub fn try_allocate(&self) -> Option { - let mut inner = self.0.xlock().unwrap(); + let mut inner = self.0.lock().unwrap(); inner.try_allocate(self.0.clone()) } } @@ -153,7 +151,7 @@ impl Borrow for PortNumber { impl Drop for PortNumber { fn drop(&mut self) { let notify_tx = { - let mut inner = self.allocator.xlock().unwrap(); + let mut inner = self.allocator.lock().unwrap(); inner.used.remove(&self.number); mem::take(&mut inner.notify_tx) }; diff --git a/remoc/src/chmux/sender.rs b/remoc/src/chmux/sender.rs index 8638e39..39f8d78 100644 --- a/remoc/src/chmux/sender.rs +++ b/remoc/src/chmux/sender.rs @@ -24,7 +24,7 @@ use super::{ mux::PortEvt, AnyStorage, Connect, ConnectError, PortAllocator, PortReq, }; -use crate::exec::{self, MutexExt}; +use crate::exec; /// An error occurred during sending of a message. #[derive(Debug, Clone)] @@ -162,7 +162,7 @@ impl fmt::Debug for Closed { impl Closed { fn new(hangup_notify: &Weak>>>>) -> Self { if let Some(hangup_notify) = hangup_notify.upgrade() { - if let Some(notifiers) = hangup_notify.xlock().unwrap().as_mut() { + if let Some(notifiers) = hangup_notify.lock().unwrap().as_mut() { let (tx, rx) = oneshot::channel(); notifiers.push(tx); Self { diff --git a/remoc/src/exec/js/mod.rs b/remoc/src/exec/js/mod.rs index e9e2838..4ebe581 100644 --- a/remoc/src/exec/js/mod.rs +++ b/remoc/src/exec/js/mod.rs @@ -10,17 +10,3 @@ pub mod time; mod sync_wrapper; mod thread_pool; - -/// Whether blocking is allowed on this thread. -/// -/// On JavaScript blocking is only allowed on worker threads. -#[inline] -pub fn is_blocking_allowed() -> bool { - thread_local! { - static ALLOWED: LazyCell = LazyCell::new(|| - js_sys::global().is_instance_of::() - ); - } - - ALLOWED.with(|allowed| **allowed) -} diff --git a/remoc/src/exec/js/thread_pool.rs b/remoc/src/exec/js/thread_pool.rs index 138469e..41ff599 100644 --- a/remoc/src/exec/js/thread_pool.rs +++ b/remoc/src/exec/js/thread_pool.rs @@ -43,7 +43,7 @@ impl ThreadPool { /// /// Returns an error if worker thread spawning failed. pub fn exec(&self, f: impl FnOnce() + Send + 'static) -> Result<(), std::io::Error> { - let mut inner = self.inner.xlock().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.tasks.push_back(Box::new(f)); // Check if we need to spawn a new worker. @@ -69,7 +69,7 @@ impl ThreadPool { let mut idle = false; loop { - let mut inner = inner.xlock().unwrap(); + let mut inner = inner.lock().unwrap(); if let Some(task) = inner.tasks.pop_front() { if idle { inner.idle -= 1; @@ -95,7 +95,7 @@ impl ThreadPool { impl Drop for ThreadPool { fn drop(&mut self) { - let mut inner = self.inner.xlock().unwrap(); + let mut inner = self.inner.lock().unwrap(); inner.exit = true; diff --git a/remoc/src/exec/js/time.rs b/remoc/src/exec/js/time.rs index e778522..58fffa8 100644 --- a/remoc/src/exec/js/time.rs +++ b/remoc/src/exec/js/time.rs @@ -48,7 +48,7 @@ impl Sleep { let callback = { let inner = inner.clone(); Closure::new(move || { - let mut inner = inner.xlock().unwrap(); + let mut inner = inner.lock().unwrap(); inner.fired = true; if let Some(waker) = inner.waker.take() { waker.wake(); @@ -92,7 +92,7 @@ impl Future for Sleep { /// Waits until the sleep duration has elapsed. fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut inner = self.inner.xlock().unwrap(); + let mut inner = self.inner.lock().unwrap(); if inner.fired { return Poll::Ready(()); @@ -105,7 +105,7 @@ impl Future for Sleep { impl Drop for Sleep { fn drop(&mut self) { - let inner = self.inner.xlock().unwrap(); + let inner = self.inner.lock().unwrap(); if !inner.fired { Self::unregister_timeout(self.timeout_id); } diff --git a/remoc/src/exec/mod.rs b/remoc/src/exec/mod.rs index e98d3ac..2be44e9 100644 --- a/remoc/src/exec/mod.rs +++ b/remoc/src/exec/mod.rs @@ -41,35 +41,3 @@ pub fn are_threads_available() -> bool { *AVAILABLE } - -/// Mutex extensions for WebAssembly support. -pub trait MutexExt { - /// Acquires a mutex, blocking the current thread until it is able to do so. - /// - /// If blocking is not allowed on the current thread, it spins until the - /// lock is acquired. - fn xlock(&self) -> std::sync::LockResult>; -} - -impl MutexExt for std::sync::Mutex { - #[inline] - fn xlock(&self) -> std::sync::LockResult> { - if is_blocking_allowed() { - return self.lock(); - } - - // Spin until lock is acquired. - loop { - match self.try_lock() { - Ok(guard) => return Ok(guard), - Err(std::sync::TryLockError::Poisoned(p)) => return Err(p), - Err(std::sync::TryLockError::WouldBlock) => { - std::thread::yield_now(); - - #[cfg(all(target_family = "wasm", not(target_feature = "atomics")))] - panic!("mutex deadlock"); - } - } - } - } -} diff --git a/remoc/src/exec/native/mod.rs b/remoc/src/exec/native/mod.rs index 396f0e0..287d8c6 100644 --- a/remoc/src/exec/native/mod.rs +++ b/remoc/src/exec/native/mod.rs @@ -24,11 +24,3 @@ pub mod time { pub use tokio::time::error::Elapsed; } } - -/// Whether blocking is allowed on this thread. -/// -/// On native targets blocking is always allowed. -#[inline] -pub fn is_blocking_allowed() -> bool { - true -} diff --git a/remoc/src/rch/base/sender.rs b/remoc/src/rch/base/sender.rs index 266a7b6..3ef5835 100644 --- a/remoc/src/rch/base/sender.rs +++ b/remoc/src/rch/base/sender.rs @@ -24,7 +24,7 @@ use crate::{ chmux::{self, AnyStorage, PortReq}, codec::{self, SerializationError, StreamingUnavailable}, exec, - exec::{task, MutexExt}, + exec::task, }; pub use crate::chmux::Closed; @@ -294,7 +294,7 @@ where let result = task::spawn_blocking(move || { let ps_ref = PortSerializer::start(allocator, storage); - let item = item_arc_task.xlock().unwrap(); + let item = item_arc_task.lock().unwrap(); ::serialize(&mut cbw, &*item)?; let cbw = cbw.into_inner().map_err(|_| { diff --git a/remoc/src/rch/bin/receiver.rs b/remoc/src/rch/bin/receiver.rs index 7fe18ad..4e93215 100644 --- a/remoc/src/rch/bin/receiver.rs +++ b/remoc/src/rch/bin/receiver.rs @@ -12,7 +12,7 @@ use super::{ }, Interlock, Location, }; -use crate::{chmux, exec::MutexExt}; +use crate::chmux; /// A binary channel receiver. pub struct Receiver { @@ -76,7 +76,7 @@ impl Serialize for Receiver { { let sender_tx = self.sender_tx.clone(); let interlock_confirm = { - let mut interlock = self.interlock.xlock().unwrap(); + let mut interlock = self.interlock.lock().unwrap(); if interlock.sender.check_local() { Some(interlock.sender.start_send()) } else { @@ -109,7 +109,7 @@ impl Serialize for Receiver { // Forwarding. _ => { let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.xlock().unwrap() = Some(successor_tx); + *self.successor_tx.lock().unwrap() = Some(successor_tx); let (tx, rx) = super::channel(); PortSerializer::spawn(Self::forward(successor_rx, tx))?; @@ -154,7 +154,7 @@ impl<'de> Deserialize<'de> for Receiver { impl Drop for Receiver { fn drop(&mut self) { - let successor_tx = self.successor_tx.xlock().unwrap().take(); + let successor_tx = self.successor_tx.lock().unwrap().take(); if let Some(successor_tx) = successor_tx { let dummy = Self { receiver: None, diff --git a/remoc/src/rch/bin/sender.rs b/remoc/src/rch/bin/sender.rs index 160dd96..5ce4258 100644 --- a/remoc/src/rch/bin/sender.rs +++ b/remoc/src/rch/bin/sender.rs @@ -12,7 +12,7 @@ use super::{ }, Interlock, Location, }; -use crate::{chmux, exec::MutexExt}; +use crate::chmux; /// A binary channel sender. pub struct Sender { @@ -76,7 +76,7 @@ impl Serialize for Sender { { let receiver_tx = self.receiver_tx.clone(); let interlock_confirm = { - let mut interlock = self.interlock.xlock().unwrap(); + let mut interlock = self.interlock.lock().unwrap(); if interlock.receiver.check_local() { Some(interlock.receiver.start_send()) } else { @@ -109,7 +109,7 @@ impl Serialize for Sender { // Forwarding. _ => { let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.xlock().unwrap() = Some(successor_tx); + *self.successor_tx.lock().unwrap() = Some(successor_tx); let (tx, rx) = super::channel(); PortSerializer::spawn(Self::forward(successor_rx, rx))?; @@ -154,7 +154,7 @@ impl<'de> Deserialize<'de> for Sender { impl Drop for Sender { fn drop(&mut self) { - let successor_tx = self.successor_tx.xlock().unwrap().take(); + let successor_tx = self.successor_tx.lock().unwrap().take(); if let Some(successor_tx) = successor_tx { let dummy = Self { sender: None, diff --git a/remoc/src/rch/broadcast/sender.rs b/remoc/src/rch/broadcast/sender.rs index c2f98a9..5dbdbf7 100644 --- a/remoc/src/rch/broadcast/sender.rs +++ b/remoc/src/rch/broadcast/sender.rs @@ -10,7 +10,7 @@ use super::{ super::{base, mpsc, SendErrorExt}, BroadcastMsg, Receiver, }; -use crate::{chmux, codec, exec, exec::MutexExt, RemoteSend}; +use crate::{chmux, codec, exec, RemoteSend}; /// An error occurred during sending over a broadcast channel. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -149,7 +149,7 @@ where /// No back-pressure is provided. #[inline] pub fn send(&self, value: T) -> Result<(), SendError> { - let mut inner = self.inner.xlock().unwrap(); + let mut inner = self.inner.lock().unwrap(); // Fetch subscribers that have become ready again. while let Ok(sub) = inner.ready_rx.try_recv() { @@ -201,7 +201,7 @@ where pub fn subscribe( &self, send_buffer: usize, ) -> Receiver { - let mut inner = self.inner.xlock().unwrap(); + let mut inner = self.inner.lock().unwrap(); let (tx, rx) = mpsc::channel(send_buffer); let tx = tx.set_buffer(); @@ -214,7 +214,7 @@ where pub fn subscribe_with_max_item_size( &self, send_buffer: usize, ) -> Receiver { - let mut inner = self.inner.xlock().unwrap(); + let mut inner = self.inner.lock().unwrap(); let (tx, rx) = mpsc::channel(send_buffer); let mut tx = tx.set_buffer(); diff --git a/remoc/src/rch/lr/receiver.rs b/remoc/src/rch/lr/receiver.rs index eb3259c..d288fe7 100644 --- a/remoc/src/rch/lr/receiver.rs +++ b/remoc/src/rch/lr/receiver.rs @@ -17,7 +17,6 @@ use super::{ use crate::{ chmux, codec::{self, DeserializationError}, - exec::MutexExt, }; /// An error that occurred during receiving from a remote endpoint. @@ -184,7 +183,7 @@ where self.sender_tx.clone().ok_or_else(|| ser::Error::custom("cannot forward received receiver"))?; let interlock_confirm = { - let mut interlock = self.interlock.xlock().unwrap(); + let mut interlock = self.interlock.lock().unwrap(); if !interlock.sender.check_local() { return Err(ser::Error::custom("cannot send receiver because sender has been sent")); } diff --git a/remoc/src/rch/lr/sender.rs b/remoc/src/rch/lr/sender.rs index 82128a1..0a3d616 100644 --- a/remoc/src/rch/lr/sender.rs +++ b/remoc/src/rch/lr/sender.rs @@ -17,7 +17,6 @@ use super::{ use crate::{ chmux, codec::{self, SerializationError}, - exec::MutexExt, }; pub use super::super::base::Closed; @@ -229,7 +228,7 @@ where self.receiver_tx.clone().ok_or_else(|| ser::Error::custom("cannot forward received sender"))?; let interlock_confirm = { - let mut interlock = self.interlock.xlock().unwrap(); + let mut interlock = self.interlock.lock().unwrap(); if !interlock.receiver.check_local() { return Err(ser::Error::custom("cannot send sender because receiver has been sent")); } diff --git a/remoc/src/rch/mpsc/receiver.rs b/remoc/src/rch/mpsc/receiver.rs index d740834..e1c82a1 100644 --- a/remoc/src/rch/mpsc/receiver.rs +++ b/remoc/src/rch/mpsc/receiver.rs @@ -17,7 +17,7 @@ use super::{ }, Distributor, }; -use crate::{chmux, codec, exec, exec::MutexExt, RemoteSend}; +use crate::{chmux, codec, exec, RemoteSend}; /// An error occurred during receiving over an mpsc channel. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -384,7 +384,7 @@ impl Drop { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - let mut successor_tx = self.successor_tx.xlock().unwrap(); + let mut successor_tx = self.successor_tx.lock().unwrap(); if let Some(successor_tx) = successor_tx.take() { let _ = successor_tx.send(inner); } else if !inner.closed { @@ -408,7 +408,7 @@ where { // Register successor of this receiver. let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.xlock().unwrap() = Some(successor_tx); + *self.successor_tx.lock().unwrap() = Some(successor_tx); let port = PortSerializer::connect(|connect| { async move { diff --git a/remoc/src/rch/watch/sender.rs b/remoc/src/rch/watch/sender.rs index 0414a16..47378b0 100644 --- a/remoc/src/rch/watch/sender.rs +++ b/remoc/src/rch/watch/sender.rs @@ -10,7 +10,7 @@ use super::{ receiver::RecvError, Receiver, Ref, }; -use crate::{chmux, codec, exec::MutexExt, RemoteSend}; +use crate::{chmux, codec, RemoteSend}; /// An error occurred during sending over an mpsc channel. #[derive(Clone, Debug, Serialize, Deserialize)] @@ -228,12 +228,12 @@ where fn update_error(&self) { let inner = self.inner.as_ref().unwrap(); - let mut current_err = inner.current_err.xlock().unwrap(); + let mut current_err = inner.current_err.lock().unwrap(); if current_err.is_some() { return; } - let mut remote_send_err_rx = inner.remote_send_err_rx.xlock().unwrap(); + let mut remote_send_err_rx = inner.remote_send_err_rx.lock().unwrap(); if let Ok(err) = remote_send_err_rx.try_recv() { *current_err = Some(err); } @@ -248,7 +248,7 @@ where self.update_error(); let inner = self.inner.as_ref().unwrap(); - let current_err = inner.current_err.xlock().unwrap(); + let current_err = inner.current_err.lock().unwrap(); current_err.clone().map(|err| err.into()) } @@ -257,7 +257,7 @@ where self.update_error(); let inner = self.inner.as_ref().unwrap(); - let mut current_err = inner.current_err.xlock().unwrap(); + let mut current_err = inner.current_err.lock().unwrap(); *current_err = None; } @@ -296,7 +296,7 @@ where impl Drop for Sender { fn drop(&mut self) { - if let Some(successor_tx) = self.successor_tx.xlock().unwrap().take() { + if let Some(successor_tx) = self.successor_tx.lock().unwrap().take() { let _ = successor_tx.send(self.inner.take().unwrap()); } } @@ -317,7 +317,7 @@ where // Prepare channel for takeover. let (successor_tx, successor_rx) = tokio::sync::oneshot::channel(); - *self.successor_tx.xlock().unwrap() = Some(successor_tx); + *self.successor_tx.lock().unwrap() = Some(successor_tx); let port = PortSerializer::connect(move |connect| { async move { diff --git a/remoc/src/robj/lazy_blob/fw_bin.rs b/remoc/src/robj/lazy_blob/fw_bin.rs index d0a031a..90edc2c 100644 --- a/remoc/src/robj/lazy_blob/fw_bin.rs +++ b/remoc/src/robj/lazy_blob/fw_bin.rs @@ -3,7 +3,7 @@ use serde::{ser, Deserialize, Serialize}; use std::sync::Mutex; -use crate::{exec, exec::MutexExt, rch::bin}; +use crate::{exec, rch::bin}; /// A chmux sender that can be remotely sent and forwarded. pub(crate) struct Sender { @@ -13,7 +13,7 @@ pub(crate) struct Sender { impl Sender { pub fn into_inner(self) -> Option { - let mut bin_tx = self.bin_tx.xlock().unwrap(); + let mut bin_tx = self.bin_tx.lock().unwrap(); bin_tx.take() } } @@ -29,8 +29,8 @@ impl Serialize for Sender { where S: serde::Serializer, { - let mut bin_tx = self.bin_tx.xlock().unwrap(); - let mut bin_rx_tx = self.bin_rx_tx.xlock().unwrap(); + let mut bin_tx = self.bin_tx.lock().unwrap(); + let mut bin_rx_tx = self.bin_rx_tx.lock().unwrap(); match (bin_tx.take(), bin_rx_tx.take()) { // Initial send.