Skip to content

Commit

Permalink
Use xlock to ensure non-blocking on browser main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
surban committed Dec 18, 2024
1 parent 93117cc commit 9b1e2f9
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 59 deletions.
10 changes: 6 additions & 4 deletions remoc/src/chmux/any_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::{
};
use uuid::Uuid;

use crate::executor::MutexExt;

/// Box containing any value that is Send, Sync and static.
pub type AnyBox = Box<dyn Any + Send + Sync + 'static>;

Expand All @@ -26,7 +28,7 @@ pub struct AnyStorage {

impl fmt::Debug for AnyStorage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let entries = self.entries.lock().unwrap();
let entries = self.entries.xlock().unwrap();
write!(f, "{:?}", *entries)
}
}
Expand All @@ -39,7 +41,7 @@ impl AnyStorage {

/// Insert a new entry into the storage and return its key.
pub fn insert(&self, entry: AnyEntry) -> Uuid {
let mut entries = self.entries.lock().unwrap();
let mut entries = self.entries.xlock().unwrap();
loop {
let key = Uuid::new_v4();
if let Entry::Vacant(e) = entries.entry(key) {
Expand All @@ -51,13 +53,13 @@ impl AnyStorage {

/// Returns the value from the storage for the specified key.
pub fn get(&self, key: Uuid) -> Option<AnyEntry> {
let entries = self.entries.lock().unwrap();
let entries = self.entries.xlock().unwrap();
entries.get(&key).cloned()
}

/// Removes the value for the specified key from the storage and returns it.
pub fn remove(&self, key: Uuid) -> Option<AnyEntry> {
let mut entries = self.entries.lock().unwrap();
let mut entries = self.entries.xlock().unwrap();
entries.remove(&key)
}
}
12 changes: 7 additions & 5 deletions remoc/src/chmux/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};

use crate::{executor, executor::task::JoinHandle};

use super::{
port_allocator::{PortAllocator, PortNumber},
receiver::Receiver,
sender::Sender,
PortReq,
};
use crate::{
executor,
executor::{task::JoinHandle, MutexExt},
};

/// An error occurred during connecting to a remote service.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -87,7 +89,7 @@ impl ConntectRequestCrediter {
pub async fn request(&self) -> ConnectRequestCredit {
loop {
let rx = {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();

if inner.used < inner.limit {
inner.used += 1;
Expand All @@ -107,7 +109,7 @@ impl ConntectRequestCrediter {
///
/// Does not wait for the credit to become available.
pub fn try_request(&self) -> Option<ConnectRequestCredit> {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();

if inner.used < inner.limit {
inner.used += 1;
Expand All @@ -124,7 +126,7 @@ pub(crate) struct ConnectRequestCredit(Arc<Mutex<ConntectRequestCrediterInner>>)
impl Drop for ConnectRequestCredit {
fn drop(&mut self) {
let notify_tx = {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();
inner.used -= 1;
mem::take(&mut inner.notify_tx)
};
Expand Down
15 changes: 8 additions & 7 deletions remoc/src/chmux/credit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio::sync::{
};

use super::{mux::PortEvt, ChMuxError, SendError};
use crate::executor::MutexExt;

// ===========================================================================
// Credit accounting for sending data
Expand Down Expand Up @@ -56,7 +57,7 @@ impl Drop for AssignedCredits {
fn drop(&mut self) {
if self.port > 0 {
if let Some(port) = self.port_inner.upgrade() {
let mut port = port.lock().unwrap();
let mut port = port.xlock().unwrap();
port.credits += self.port;
}
}
Expand All @@ -80,7 +81,7 @@ impl CreditProvider {
&self, credits: u32,
) -> Result<(), ChMuxError<SinkError, StreamError>> {
let notify = {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();

match inner.credits.checked_add(credits) {
Some(new_credits) => inner.credits = new_credits,
Expand All @@ -100,7 +101,7 @@ impl CreditProvider {
/// Closes the channel.
pub fn close(&self, gracefully: bool) {
let notify = {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();

inner.closed = Some(gracefully);

Expand Down Expand Up @@ -132,7 +133,7 @@ impl CreditUser {
Some(channel) => channel,
None => return Err(SendError::ChMux),
};
let mut channel = channel.lock().unwrap();
let mut channel = channel.xlock().unwrap();
if let Some(gracefully) = channel.closed {
if !self.override_graceful_close || !gracefully {
return Err(SendError::Closed { gracefully });
Expand Down Expand Up @@ -164,7 +165,7 @@ impl CreditUser {
Some(channel) => channel,
None => return Err(SendError::ChMux),
};
let mut channel = channel.lock().unwrap();
let mut channel = channel.xlock().unwrap();
if let Some(gracefully) = channel.closed {
if !self.override_graceful_close || !gracefully {
return Err(SendError::Closed { gracefully });
Expand Down Expand Up @@ -213,7 +214,7 @@ impl ChannelCreditMonitor {
pub fn use_credits<SinkError, StreamError>(
&self, credits: u32,
) -> Result<UsedCredit, ChMuxError<SinkError, StreamError>> {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();
match inner.used.checked_add(credits) {
Some(new_used) if new_used <= inner.limit => {
inner.used = new_used;
Expand All @@ -239,7 +240,7 @@ impl ChannelCreditReturner {
assert!(self.return_fut.is_none(), "start_return_one called without poll_return_flush");

if let Some(monitor) = self.monitor.upgrade() {
let mut monitor = monitor.lock().unwrap();
let mut monitor = monitor.xlock().unwrap();

monitor.used -= credit.0;
self.to_return += credit.0;
Expand Down
9 changes: 6 additions & 3 deletions remoc/src/chmux/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ use super::{
sender::Sender,
AnyStorage, Cfg, ChMuxError, PortReq, PROTOCOL_VERSION, PROTOCOL_VERSION_PORT_ID,
};
use crate::executor::time::{sleep, timeout};
use crate::executor::{
time::{sleep, timeout},
MutexExt,
};

/// Multiplexer protocol error.
fn protocol_err<SinkError, StreamError>(msg: impl AsRef<str>) -> super::ChMuxError<SinkError, StreamError> {
Expand Down Expand Up @@ -1089,7 +1092,7 @@ where

// Send hangup notifications.
remote_receiver_closed.store(true, Ordering::SeqCst);
let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap();
let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap();
for tx in notifies {
let _ = tx.send(());
}
Expand Down Expand Up @@ -1125,7 +1128,7 @@ where

// Send hangup notifications.
remote_receiver_closed.store(true, Ordering::SeqCst);
let notifies = remote_receiver_closed_notify.lock().unwrap().take().unwrap();
let notifies = remote_receiver_closed_notify.xlock().unwrap().take().unwrap();
for tx in notifies {
let _ = tx.send(());
}
Expand Down
10 changes: 6 additions & 4 deletions remoc/src/chmux/port_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::{
};
use tokio::sync::oneshot;

use crate::executor::MutexExt;

struct PortAllocatorInner {
used: HashSet<u32>,
limit: u32,
Expand Down Expand Up @@ -45,7 +47,7 @@ pub struct PortAllocator(Arc<Mutex<PortAllocatorInner>>);

impl fmt::Debug for PortAllocator {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let inner = self.0.lock().unwrap();
let inner = self.0.xlock().unwrap();
f.debug_struct("PortAllocator").field("used", &inner.used.len()).field("limit", &inner.limit).finish()
}
}
Expand All @@ -64,7 +66,7 @@ impl PortAllocator {
pub async fn allocate(&self) -> PortNumber {
loop {
let rx = {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();
match inner.try_allocate(self.0.clone()) {
Some(number) => return number,
None => {
Expand All @@ -83,7 +85,7 @@ impl PortAllocator {
///
/// If all port are currently in use, this returns [None].
pub fn try_allocate(&self) -> Option<PortNumber> {
let mut inner = self.0.lock().unwrap();
let mut inner = self.0.xlock().unwrap();
inner.try_allocate(self.0.clone())
}
}
Expand Down Expand Up @@ -151,7 +153,7 @@ impl Borrow<u32> for PortNumber {
impl Drop for PortNumber {
fn drop(&mut self) {
let notify_tx = {
let mut inner = self.allocator.lock().unwrap();
let mut inner = self.allocator.xlock().unwrap();
inner.used.remove(&self.number);
mem::take(&mut inner.notify_tx)
};
Expand Down
5 changes: 2 additions & 3 deletions remoc/src/chmux/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ use std::{
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio_util::sync::ReusableBoxFuture;

use crate::executor;

use super::{
client::ConnectResponse,
credit::{AssignedCredits, CreditUser},
mux::PortEvt,
AnyStorage, Connect, ConnectError, PortAllocator, PortReq,
};
use crate::executor::{self, MutexExt};

/// An error occurred during sending of a message.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -163,7 +162,7 @@ impl fmt::Debug for Closed {
impl Closed {
fn new(hangup_notify: &Weak<std::sync::Mutex<Option<Vec<oneshot::Sender<()>>>>>) -> Self {
if let Some(hangup_notify) = hangup_notify.upgrade() {
if let Some(notifiers) = hangup_notify.lock().unwrap().as_mut() {
if let Some(notifiers) = hangup_notify.xlock().unwrap().as_mut() {
let (tx, rx) = oneshot::channel();
notifiers.push(tx);
Self {
Expand Down
11 changes: 8 additions & 3 deletions remoc/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ mod js;
#[cfg(feature = "js")]
pub use js::*;

pub use task::spawn;

/// Whether threads are available and working on this platform.
#[inline]
pub fn are_threads_available() -> bool {
Expand Down Expand Up @@ -61,10 +63,13 @@ impl<T> MutexExt<T> for std::sync::Mutex<T> {
match self.try_lock() {
Ok(guard) => return Ok(guard),
Err(std::sync::TryLockError::Poisoned(p)) => return Err(p),
Err(std::sync::TryLockError::WouldBlock) => (),
Err(std::sync::TryLockError::WouldBlock) => {
std::thread::yield_now();

#[cfg(all(target_family = "wasm", not(target_feature = "atomics")))]
panic!("mutex deadlock");
}
}
}
}
}

pub use task::spawn;
4 changes: 2 additions & 2 deletions remoc/src/rch/base/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
chmux::{self, AnyStorage, PortReq},
codec::{self, SerializationError, StreamingUnavailable},
executor,
executor::task,
executor::{task, MutexExt},
};

pub use crate::chmux::Closed;
Expand Down Expand Up @@ -294,7 +294,7 @@ where
let result = task::spawn_blocking(move || {
let ps_ref = PortSerializer::start(allocator, storage);

let item = item_arc_task.lock().unwrap();
let item = item_arc_task.xlock().unwrap();
<Codec as codec::Codec>::serialize(&mut cbw, &*item)?;

let cbw = cbw.into_inner().map_err(|_| {
Expand Down
8 changes: 4 additions & 4 deletions remoc/src/rch/bin/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
},
Interlock, Location,
};
use crate::chmux;
use crate::{chmux, executor::MutexExt};

/// A binary channel receiver.
pub struct Receiver {
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Serialize for Receiver {
{
let sender_tx = self.sender_tx.clone();
let interlock_confirm = {
let mut interlock = self.interlock.lock().unwrap();
let mut interlock = self.interlock.xlock().unwrap();
if interlock.sender.check_local() {
Some(interlock.sender.start_send())
} else {
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Serialize for Receiver {
// Forwarding.
_ => {
let (successor_tx, successor_rx) = tokio::sync::oneshot::channel();
*self.successor_tx.lock().unwrap() = Some(successor_tx);
*self.successor_tx.xlock().unwrap() = Some(successor_tx);
let (tx, rx) = super::channel();
PortSerializer::spawn(Self::forward(successor_rx, tx))?;

Expand Down Expand Up @@ -154,7 +154,7 @@ impl<'de> Deserialize<'de> for Receiver {

impl Drop for Receiver {
fn drop(&mut self) {
let successor_tx = self.successor_tx.lock().unwrap().take();
let successor_tx = self.successor_tx.xlock().unwrap().take();
if let Some(successor_tx) = successor_tx {
let dummy = Self {
receiver: None,
Expand Down
8 changes: 4 additions & 4 deletions remoc/src/rch/bin/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
},
Interlock, Location,
};
use crate::chmux;
use crate::{chmux, executor::MutexExt};

/// A binary channel sender.
pub struct Sender {
Expand Down Expand Up @@ -76,7 +76,7 @@ impl Serialize for Sender {
{
let receiver_tx = self.receiver_tx.clone();
let interlock_confirm = {
let mut interlock = self.interlock.lock().unwrap();
let mut interlock = self.interlock.xlock().unwrap();
if interlock.receiver.check_local() {
Some(interlock.receiver.start_send())
} else {
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Serialize for Sender {
// Forwarding.
_ => {
let (successor_tx, successor_rx) = tokio::sync::oneshot::channel();
*self.successor_tx.lock().unwrap() = Some(successor_tx);
*self.successor_tx.xlock().unwrap() = Some(successor_tx);
let (tx, rx) = super::channel();
PortSerializer::spawn(Self::forward(successor_rx, rx))?;

Expand Down Expand Up @@ -154,7 +154,7 @@ impl<'de> Deserialize<'de> for Sender {

impl Drop for Sender {
fn drop(&mut self) {
let successor_tx = self.successor_tx.lock().unwrap().take();
let successor_tx = self.successor_tx.xlock().unwrap().take();
if let Some(successor_tx) = successor_tx {
let dummy = Self {
sender: None,
Expand Down
Loading

0 comments on commit 9b1e2f9

Please sign in to comment.