Skip to content

Commit

Permalink
Remove spinning mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
surban committed Dec 20, 2024
1 parent b04c5f2 commit 0a03428
Show file tree
Hide file tree
Showing 19 changed files with 56 additions and 120 deletions.
10 changes: 4 additions & 6 deletions remoc/src/chmux/any_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use std::{
};
use uuid::Uuid;

use crate::exec::MutexExt;

/// Box containing any value that is Send, Sync and static.
pub type AnyBox = Box<dyn Any + Send + Sync + 'static>;

Expand All @@ -28,7 +26,7 @@ pub struct AnyStorage {

impl fmt::Debug for AnyStorage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let entries = self.entries.xlock().unwrap();
let entries = self.entries.lock().unwrap();
write!(f, "{:?}", *entries)
}
}
Expand All @@ -41,7 +39,7 @@ impl AnyStorage {

/// Insert a new entry into the storage and return its key.
pub fn insert(&self, entry: AnyEntry) -> Uuid {
let mut entries = self.entries.xlock().unwrap();
let mut entries = self.entries.lock().unwrap();
loop {
let key = Uuid::new_v4();
if let Entry::Vacant(e) = entries.entry(key) {
Expand All @@ -53,13 +51,13 @@ impl AnyStorage {

/// Returns the value from the storage for the specified key.
pub fn get(&self, key: Uuid) -> Option<AnyEntry> {
let entries = self.entries.xlock().unwrap();
let entries = self.entries.lock().unwrap();
entries.get(&key).cloned()
}

/// Removes the value for the specified key from the storage and returns it.
pub fn remove(&self, key: Uuid) -> Option<AnyEntry> {
let mut entries = self.entries.xlock().unwrap();
let mut entries = self.entries.lock().unwrap();
entries.remove(&key)
}
}
15 changes: 7 additions & 8 deletions remoc/src/chmux/credit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use tokio::sync::{
};

use super::{mux::PortEvt, ChMuxError, SendError};
use crate::exec::MutexExt;

// ===========================================================================
// Credit accounting for sending data
Expand Down Expand Up @@ -57,7 +56,7 @@ impl Drop for AssignedCredits {
fn drop(&mut self) {
if self.port > 0 {
if let Some(port) = self.port_inner.upgrade() {
let mut port = port.xlock().unwrap();
let mut port = port.lock().unwrap();
port.credits += self.port;
}
}
Expand All @@ -81,7 +80,7 @@ impl CreditProvider {
&self, credits: u32,
) -> Result<(), ChMuxError<SinkError, StreamError>> {
let notify = {
let mut inner = self.0.xlock().unwrap();
let mut inner = self.0.lock().unwrap();

match inner.credits.checked_add(credits) {
Some(new_credits) => inner.credits = new_credits,
Expand All @@ -101,7 +100,7 @@ impl CreditProvider {
/// Closes the channel.
pub fn close(&self, gracefully: bool) {
let notify = {
let mut inner = self.0.xlock().unwrap();
let mut inner = self.0.lock().unwrap();

inner.closed = Some(gracefully);

Expand Down Expand Up @@ -133,7 +132,7 @@ impl CreditUser {
Some(channel) => channel,
None => return Err(SendError::ChMux),
};
let mut channel = channel.xlock().unwrap();
let mut channel = channel.lock().unwrap();
if let Some(gracefully) = channel.closed {
if !self.override_graceful_close || !gracefully {
return Err(SendError::Closed { gracefully });
Expand Down Expand Up @@ -165,7 +164,7 @@ impl CreditUser {
Some(channel) => channel,
None => return Err(SendError::ChMux),
};
let mut channel = channel.xlock().unwrap();
let mut channel = channel.lock().unwrap();
if let Some(gracefully) = channel.closed {
if !self.override_graceful_close || !gracefully {
return Err(SendError::Closed { gracefully });
Expand Down Expand Up @@ -214,7 +213,7 @@ impl ChannelCreditMonitor {
pub fn use_credits<SinkError, StreamError>(
&self, credits: u32,
) -> Result<UsedCredit, ChMuxError<SinkError, StreamError>> {
let mut inner = self.0.xlock().unwrap();
let mut inner = self.0.lock().unwrap();
match inner.used.checked_add(credits) {
Some(new_used) if new_used <= inner.limit => {
inner.used = new_used;
Expand All @@ -240,7 +239,7 @@ impl ChannelCreditReturner {
assert!(self.return_fut.is_none(), "start_return_one called without poll_return_flush");

if let Some(monitor) = self.monitor.upgrade() {
let mut monitor = monitor.xlock().unwrap();
let mut monitor = monitor.lock().unwrap();

monitor.used -= credit.0;
self.to_return += credit.0;
Expand Down
9 changes: 3 additions & 6 deletions remoc/src/chmux/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ use super::{
sender::Sender,
AnyStorage, Cfg, ChMuxError, PortReq, PROTOCOL_VERSION, PROTOCOL_VERSION_PORT_ID,
};
use crate::exec::{
time::{sleep, timeout},
MutexExt,
};
use crate::exec::time::{sleep, timeout};

/// Multiplexer protocol error.
fn protocol_err<SinkError, StreamError>(msg: impl AsRef<str>) -> super::ChMuxError<SinkError, StreamError> {
Expand Down Expand Up @@ -1092,7 +1089,7 @@ where

// Send hangup notifications.
remote_receiver_closed.store(true, Ordering::Relaxed);
let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap();
let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap();
for tx in notifies {
let _ = tx.send(());
}
Expand Down Expand Up @@ -1128,7 +1125,7 @@ where

// Send hangup notifications.
remote_receiver_closed.store(true, Ordering::Relaxed);
let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap();
let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap();
for tx in notifies {
let _ = tx.send(());
}
Expand Down
10 changes: 4 additions & 6 deletions remoc/src/chmux/port_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use std::{
};
use tokio::sync::oneshot;

use crate::exec::MutexExt;

struct PortAllocatorInner {
used: HashSet<u32>,
limit: u32,
Expand Down Expand Up @@ -47,7 +45,7 @@ pub struct PortAllocator(Arc<Mutex<PortAllocatorInner>>);

impl fmt::Debug for PortAllocator {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let inner = self.0.xlock().unwrap();
let inner = self.0.lock().unwrap();
f.debug_struct("PortAllocator").field("used", &inner.used.len()).field("limit", &inner.limit).finish()
}
}
Expand All @@ -66,7 +64,7 @@ impl PortAllocator {
pub async fn allocate(&self) -> PortNumber {
loop {
let rx = {
let mut inner = self.0.xlock().unwrap();
let mut inner = self.0.lock().unwrap();
match inner.try_allocate(self.0.clone()) {
Some(number) => return number,
None => {
Expand All @@ -85,7 +83,7 @@ impl PortAllocator {
///
/// If all port are currently in use, this returns [None].
pub fn try_allocate(&self) -> Option<PortNumber> {
let mut inner = self.0.xlock().unwrap();
let mut inner = self.0.lock().unwrap();
inner.try_allocate(self.0.clone())
}
}
Expand Down Expand Up @@ -153,7 +151,7 @@ impl Borrow<u32> for PortNumber {
impl Drop for PortNumber {
fn drop(&mut self) {
let notify_tx = {
let mut inner = self.allocator.xlock().unwrap();
let mut inner = self.allocator.lock().unwrap();
inner.used.remove(&self.number);
mem::take(&mut inner.notify_tx)
};
Expand Down
4 changes: 2 additions & 2 deletions remoc/src/chmux/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use super::{
mux::PortEvt,
AnyStorage, Connect, ConnectError, PortAllocator, PortReq,
};
use crate::exec::{self, MutexExt};
use crate::exec;

/// An error occurred during sending of a message.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -162,7 +162,7 @@ impl fmt::Debug for Closed {
impl Closed {
fn new(hangup_notify: &Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>) -> Self {
if let Some(hangup_notify) = hangup_notify.upgrade() {
if let Some(notifiers) = hangup_notify.xlock().unwrap().as_mut() {
if let Some(notifiers) = hangup_notify.lock().unwrap().as_mut() {
let (tx, rx) = oneshot::channel();
notifiers.push(tx);
Self {
Expand Down
14 changes: 0 additions & 14 deletions remoc/src/exec/js/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,3 @@ pub mod time;

mod sync_wrapper;
mod thread_pool;

/// Whether blocking is allowed on this thread.
///
/// On JavaScript blocking is only allowed on worker threads.
#[inline]
pub fn is_blocking_allowed() -> bool {
thread_local! {
static ALLOWED: LazyCell<bool> = LazyCell::new(||
js_sys::global().is_instance_of::<WorkerGlobalScope>()
);
}

ALLOWED.with(|allowed| **allowed)
}
6 changes: 3 additions & 3 deletions remoc/src/exec/js/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl ThreadPool {
///
/// Returns an error if worker thread spawning failed.
pub fn exec(&self, f: impl FnOnce() + Send + 'static) -> Result<(), std::io::Error> {
let mut inner = self.inner.xlock().unwrap();
let mut inner = self.inner.lock().unwrap();
inner.tasks.push_back(Box::new(f));

// Check if we need to spawn a new worker.
Expand All @@ -69,7 +69,7 @@ impl ThreadPool {
let mut idle = false;

loop {
let mut inner = inner.xlock().unwrap();
let mut inner = inner.lock().unwrap();
if let Some(task) = inner.tasks.pop_front() {
if idle {
inner.idle -= 1;
Expand All @@ -95,7 +95,7 @@ impl ThreadPool {

impl Drop for ThreadPool {
fn drop(&mut self) {
let mut inner = self.inner.xlock().unwrap();
let mut inner = self.inner.lock().unwrap();

inner.exit = true;

Expand Down
6 changes: 3 additions & 3 deletions remoc/src/exec/js/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Sleep {
let callback = {
let inner = inner.clone();
Closure::new(move || {
let mut inner = inner.xlock().unwrap();
let mut inner = inner.lock().unwrap();
inner.fired = true;
if let Some(waker) = inner.waker.take() {
waker.wake();
Expand Down Expand Up @@ -92,7 +92,7 @@ impl Future for Sleep {

/// Waits until the sleep duration has elapsed.
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = self.inner.xlock().unwrap();
let mut inner = self.inner.lock().unwrap();

if inner.fired {
return Poll::Ready(());
Expand All @@ -105,7 +105,7 @@ impl Future for Sleep {

impl Drop for Sleep {
fn drop(&mut self) {
let inner = self.inner.xlock().unwrap();
let inner = self.inner.lock().unwrap();
if !inner.fired {
Self::unregister_timeout(self.timeout_id);
}
Expand Down
32 changes: 0 additions & 32 deletions remoc/src/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,3 @@ pub fn are_threads_available() -> bool {

*AVAILABLE
}

/// Mutex extensions for WebAssembly support.
pub trait MutexExt<T> {
/// Acquires a mutex, blocking the current thread until it is able to do so.
///
/// If blocking is not allowed on the current thread, it spins until the
/// lock is acquired.
fn xlock(&self) -> std::sync::LockResult<std::sync::MutexGuard<'_, T>>;
}

impl<T> MutexExt<T> for std::sync::Mutex<T> {
#[inline]
fn xlock(&self) -> std::sync::LockResult<std::sync::MutexGuard<'_, T>> {
if is_blocking_allowed() {
return self.lock();
}

// Spin until lock is acquired.
loop {
match self.try_lock() {
Ok(guard) => return Ok(guard),
Err(std::sync::TryLockError::Poisoned(p)) => return Err(p),
Err(std::sync::TryLockError::WouldBlock) => {
std::thread::yield_now();

#[cfg(all(target_family = "wasm", not(target_feature = "atomics")))]
panic!("mutex deadlock");
}
}
}
}
}
8 changes: 0 additions & 8 deletions remoc/src/exec/native/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,3 @@ pub mod time {
pub use tokio::time::error::Elapsed;
}
}

/// Whether blocking is allowed on this thread.
///
/// On native targets blocking is always allowed.
#[inline]
pub fn is_blocking_allowed() -> bool {
true
}
4 changes: 2 additions & 2 deletions remoc/src/rch/base/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
chmux::{self, AnyStorage, PortReq},
codec::{self, SerializationError, StreamingUnavailable},
exec,
exec::{task, MutexExt},
exec::task,
};

pub use crate::chmux::Closed;
Expand Down Expand Up @@ -294,7 +294,7 @@ where
let result = task::spawn_blocking(move || {
let ps_ref = PortSerializer::start(allocator, storage);

let item = item_arc_task.xlock().unwrap();
let item = item_arc_task.lock().unwrap();
<Codec as codec::Codec>::serialize(&mut cbw, &*item)?;

let cbw = cbw.into_inner().map_err(|_| {
Expand Down
8 changes: 4 additions & 4 deletions remoc/src/rch/bin/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
},
Interlock, Location,
};
use crate::{chmux, exec::MutexExt};
use crate::chmux;

/// A binary channel receiver.
pub struct Receiver {
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Serialize for Receiver {
{
let sender_tx = self.sender_tx.clone();
let interlock_confirm = {
let mut interlock = self.interlock.xlock().unwrap();
let mut interlock = self.interlock.lock().unwrap();
if interlock.sender.check_local() {
Some(interlock.sender.start_send())
} else {
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Serialize for Receiver {
// Forwarding.
_ => {
let (successor_tx, successor_rx) = tokio::sync::oneshot::channel();
*self.successor_tx.xlock().unwrap() = Some(successor_tx);
*self.successor_tx.lock().unwrap() = Some(successor_tx);
let (tx, rx) = super::channel();
PortSerializer::spawn(Self::forward(successor_rx, tx))?;

Expand Down Expand Up @@ -154,7 +154,7 @@ impl<'de> Deserialize<'de> for Receiver {

impl Drop for Receiver {
fn drop(&mut self) {
let successor_tx = self.successor_tx.xlock().unwrap().take();
let successor_tx = self.successor_tx.lock().unwrap().take();
if let Some(successor_tx) = successor_tx {
let dummy = Self {
receiver: None,
Expand Down
Loading

0 comments on commit 0a03428

Please sign in to comment.