Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
// copied, modified, or distributed except according to those terms.

use crate::mutex::MutexGuard;
use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL};
use crate::{deadlock, util};
use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL, TOKEN_RESTORE_PARKED_BIT};
use crate::util;
use core::{
fmt, ptr,
sync::atomic::{AtomicPtr, Ordering},
Expand Down Expand Up @@ -229,9 +229,10 @@ impl Condvar {
// If we requeued threads to the mutex, mark it as having
// parked threads. The RequeueAll case is already handled above.
if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 {
unsafe { (*mutex).mark_parked() };
TOKEN_RESTORE_PARKED_BIT
} else {
TOKEN_NORMAL
}
TOKEN_NORMAL
};
let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) };

Expand Down Expand Up @@ -350,10 +351,10 @@ impl Condvar {
}

// ... and re-lock it once we are done sleeping
if result == ParkResult::Unparked(TOKEN_HANDOFF) {
unsafe { deadlock::acquire_resource(mutex as *const _ as usize) };
} else {
mutex.lock();
match result {
ParkResult::Unparked(TOKEN_HANDOFF) => unreachable!("can't be handed off"),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TOKEN_HANDOFF is actually reachable if we are requeued onto a mutex and then another unlocks that mutex with unlock_fair.

ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => mutex.lock_contention(),
_ => mutex.lock(),
}

WaitTimeoutResult(!(result.is_unparked() || requeued))
Expand Down
49 changes: 31 additions & 18 deletions src/raw_mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub(crate) const TOKEN_NORMAL: UnparkToken = UnparkToken(0);
// thread directly without unlocking it.
pub(crate) const TOKEN_HANDOFF: UnparkToken = UnparkToken(1);

// UnparkToken used to indicate that the waiter should restore PARKED_BIT.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// UnparkToken used to indicate that the waiter should restore PARKED_BIT.
// UnparkToken used to indicate that the waiter should restore PARKED_BIT and then continue attempting to acquire the mutex.

pub(crate) const TOKEN_RESTORE_PARKED_BIT: UnparkToken = UnparkToken(2);

/// This bit is set in the `state` of a `RawMutex` when that mutex is locked by some thread.
const LOCKED_BIT: u8 = 0b01;
/// This bit is set in the `state` of a `RawMutex` just before parking a thread. A thread is being
Expand Down Expand Up @@ -69,7 +72,7 @@ unsafe impl lock_api::RawMutex for RawMutex {
.compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
self.lock_slow(None);
self.lock_slow(None, false);
}
unsafe { deadlock::acquire_resource(self as *const _ as usize) };
}
Expand Down Expand Up @@ -99,11 +102,8 @@ unsafe impl lock_api::RawMutex for RawMutex {
#[inline]
unsafe fn unlock(&self) {
deadlock::release_resource(self as *const _ as usize);
if self
.state
.compare_exchange(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
let prev = self.state.swap(0, Ordering::Release);
if prev == LOCKED_BIT {
return;
}
self.unlock_slow(false);
Expand Down Expand Up @@ -151,7 +151,7 @@ unsafe impl lock_api::RawMutexTimed for RawMutex {
{
true
} else {
self.lock_slow(Some(timeout))
self.lock_slow(Some(timeout), false)
};
if result {
unsafe { deadlock::acquire_resource(self as *const _ as usize) };
Expand All @@ -168,7 +168,7 @@ unsafe impl lock_api::RawMutexTimed for RawMutex {
{
true
} else {
self.lock_slow(util::to_deadline(timeout))
self.lock_slow(util::to_deadline(timeout), false)
};
if result {
unsafe { deadlock::acquire_resource(self as *const _ as usize) };
Expand Down Expand Up @@ -199,23 +199,27 @@ impl RawMutex {
}
}

// Used by Condvar when requeuing threads to us, must be called while
// holding the queue lock.
#[inline]
pub(crate) fn mark_parked(&self) {
self.state.fetch_or(PARKED_BIT, Ordering::Relaxed);
pub(crate) fn lock_contention(&self) {
self.lock_slow(None, true);
}

#[cold]
fn lock_slow(&self, timeout: Option<Instant>) -> bool {
fn lock_slow(&self, timeout: Option<Instant>, in_contention: bool) -> bool {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn lock_slow(&self, timeout: Option<Instant>, in_contention: bool) -> bool {
fn lock_slow(&self, timeout: Option<Instant>, set_parked_bit: bool) -> bool {

I think set_parked_bit is a clearer name for this.

let mut spinwait = SpinWait::new();
let mut state = self.state.load(Ordering::Relaxed);
let mut extra_flags;
if in_contention {
extra_flags = PARKED_BIT;
} else {
extra_flags = 0;
}
loop {
// Grab the lock if it isn't locked, even if there is a queue on it
if state & LOCKED_BIT == 0 {
match self.state.compare_exchange_weak(
state,
state | LOCKED_BIT,
state | LOCKED_BIT | extra_flags,
Ordering::Acquire,
Ordering::Relaxed,
) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spin loop on line 233 should be disabled if set_parked_bit since there are actually parked threads even though the bit isn't set.

Expand Down Expand Up @@ -254,6 +258,7 @@ impl RawMutex {
self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
}
};
extra_flags = 0;
// SAFETY:
// * `addr` is an address we control.
// * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
Expand All @@ -271,12 +276,16 @@ impl RawMutex {
// The thread that unparked us passed the lock on to us
// directly without unlocking it.
ParkResult::Unparked(TOKEN_HANDOFF) => return true,
ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => extra_flags = PARKED_BIT,

// We were unparked normally, try acquiring the lock again
ParkResult::Unparked(_) => (),

// The validation function failed, try locking again
ParkResult::Invalid => (),
// This thread doesn't sleep, so it's not sure whether it's the last thread
// in queue. Setting PARKED_BIT can lead to false wake up. But false wake up
// is good for throughput during high contention.
ParkResult::Invalid => extra_flags = PARKED_BIT,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct: we should only set PARKED_BIT if we are required to by an UnparkToken or if the current thread is about to park. Otherwise this just causes unnecessary work.

What numbers do you get on the benchmark without this?

Copy link
Author

@BusyJay BusyJay May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point here is to ask for more wake. During high contention, random wake up may make more threads stay on CPU. Because high contention means the lock is acquired and released very frequently, more on CPU time means higher possibility to acquire the lock. Leaving CPU and then being scheduled back up one by one is very slow, we should do that only when there is probably no way to make progress anytime soon.

This is also why I name the new arg as in_contention instead of set park bit to highlight that park bit makes more sense as contention than parking.

When thread count is more than 9, the number can be lower than 30% ~ 40% without setting the bit.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main effect that setting the parked bit has is that it prevents threads from spinning (since we only spin when the parked bit is clear). This has the effect of causing threads to go directly to parking, which as you said is quite slow. However since other threads are no longer actively trying to acquire the lock, it means that one thread can quickly acquire and release the lock since there is no cache interference from other threads.

Although this may look good on benchmarks, it actually isn't good since other threads are wasting time doing work that isn't useful instead of attempting to acquire the lock. This is effectively equivalent to just pausing for a longer period between attempts to acquire the lock.

Copy link
Author

@BusyJay BusyJay May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the effect of causing threads to go directly to parking, which as you said is quite slow

Perf stats shows that setting the PARKED_BIT here can lead to more context switch and a lot higher cache miss, this is the prof that more threads are staying on CPU instead of going to sleep.

The reason why PARKED_BIT will wake more threads is because some thread will acquire lock without any competing during contention. For example, if thread a acquire lock, and thread b and c are waiting for a. When a release lock and wake thread b, another thread d that is on CPU right now may acquire lock earlier than thread b. There are two possible behavior of thread d, it can acquire lock directly, or it fails to try lock and try to park but fail again due to validation. Setting parked bit here is utilizing the second situation, so that when d acquire lock, it can still wake thread c later.

it actually isn't good since other threads are wasting time doing work that isn't useful instead of attempting to acquire the lock.

I notice this performance pitfall when I try to implement a linked list with mutex, which is short generally but can grow very long occasionally. After this PR, there is no obvious performance difference between pthread and parking_lot.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that situation, it's not thread D's job to set the parked bit: thread B will set it before parking itself.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread B may not as it may be still spinning to try lock.


// Timeout expired
ParkResult::TimedOut => return false,
Expand All @@ -296,7 +305,7 @@ impl RawMutex {
let callback = |result: UnparkResult| {
// If we are using a fair unlock then we should keep the
// mutex locked and hand it off to the unparked thread.
if result.unparked_threads != 0 && (force_fair || result.be_fair) {
if result.unparked_threads != 0 && force_fair {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is very different in the unlock_fair case so it would be better to have a separate unlock_fair_slow method.

// Clear the parked bit if there are no more parked
// threads.
if !result.have_more_threads {
Expand All @@ -308,8 +317,12 @@ impl RawMutex {
// Clear the locked bit, and the parked bit as well if there
// are no more parked threads.
if result.have_more_threads {
self.state.store(PARKED_BIT, Ordering::Release);
} else {
if force_fair {
self.state.store(PARKED_BIT, Ordering::Release);
} else {
return TOKEN_RESTORE_PARKED_BIT;
}
} else if force_fair {
self.state.store(0, Ordering::Release);
}
TOKEN_NORMAL
Expand Down
53 changes: 30 additions & 23 deletions src/raw_rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// copied, modified, or distributed except according to those terms.

use crate::elision::{have_elision, AtomicElisionExt};
use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL, TOKEN_RESTORE_PARKED_BIT};
use crate::util;
use core::{
cell::Cell,
Expand Down Expand Up @@ -93,11 +93,8 @@ unsafe impl lock_api::RawRwLock for RawRwLock {
#[inline]
unsafe fn unlock_exclusive(&self) {
self.deadlock_release();
if self
.state
.compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
let prev = self.state.swap(0, Ordering::Release);
if prev == WRITER_BIT {
return;
}
self.unlock_exclusive_slow(false);
Expand Down Expand Up @@ -613,7 +610,7 @@ impl RawRwLock {

#[cold]
fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
let try_lock = |state: &mut usize| {
let try_lock = |state: &mut usize, extra_flags: usize| {
loop {
if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
return false;
Expand All @@ -622,7 +619,7 @@ impl RawRwLock {
// Grab WRITER_BIT if it isn't set, even if there are parked threads.
match self.state.compare_exchange_weak(
*state,
*state | WRITER_BIT,
*state | WRITER_BIT | extra_flags,
Ordering::Acquire,
Ordering::Relaxed,
) {
Expand Down Expand Up @@ -653,7 +650,7 @@ impl RawRwLock {
let callback = |mut new_state, result: UnparkResult| {
// If we are using a fair unlock then we should keep the
// rwlock locked and hand it off to the unparked threads.
if result.unparked_threads != 0 && (force_fair || result.be_fair) {
if result.unparked_threads != 0 && force_fair {
if result.have_more_threads {
new_state |= PARKED_BIT;
}
Expand All @@ -662,8 +659,12 @@ impl RawRwLock {
} else {
// Clear the parked bit if there are no more parked threads.
if result.have_more_threads {
self.state.store(PARKED_BIT, Ordering::Release);
} else {
if force_fair {
self.state.store(PARKED_BIT, Ordering::Release);
} else {
return TOKEN_RESTORE_PARKED_BIT;
}
} else if force_fair {
self.state.store(0, Ordering::Release);
}
TOKEN_NORMAL
Expand All @@ -677,13 +678,13 @@ impl RawRwLock {

#[cold]
fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
let try_lock = |state: &mut usize| {
let try_lock = |state: &mut usize, extra_flags: usize| {
let mut spinwait_shared = SpinWait::new();
loop {
// Use hardware lock elision to avoid cache conflicts when multiple
// readers try to acquire the lock. We only do this if the lock is
// completely empty since elision handles conflicts poorly.
if have_elision() && *state == 0 {
if have_elision() && *state == 0 && extra_flags == 0 {
match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
Ok(_) => return true,
Err(x) => *state = x,
Expand All @@ -702,9 +703,10 @@ impl RawRwLock {
.state
.compare_exchange_weak(
*state,
state
.checked_add(ONE_READER)
.expect("RwLock reader count overflow"),
extra_flags
| state
.checked_add(ONE_READER)
.expect("RwLock reader count overflow"),
Ordering::Acquire,
Ordering::Relaxed,
)
Expand Down Expand Up @@ -745,7 +747,7 @@ impl RawRwLock {

#[cold]
fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
let try_lock = |state: &mut usize| {
let try_lock = |state: &mut usize, extra_flags: usize| {
let mut spinwait_shared = SpinWait::new();
loop {
if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
Expand All @@ -756,9 +758,10 @@ impl RawRwLock {
.state
.compare_exchange_weak(
*state,
state
.checked_add(ONE_READER | UPGRADABLE_BIT)
.expect("RwLock reader count overflow"),
extra_flags
| state
.checked_add(ONE_READER | UPGRADABLE_BIT)
.expect("RwLock reader count overflow"),
Ordering::Acquire,
Ordering::Relaxed,
)
Expand Down Expand Up @@ -1067,14 +1070,15 @@ impl RawRwLock {
&self,
timeout: Option<Instant>,
token: ParkToken,
mut try_lock: impl FnMut(&mut usize) -> bool,
mut try_lock: impl FnMut(&mut usize, usize) -> bool,
validate_flags: usize,
) -> bool {
let mut spinwait = SpinWait::new();
let mut state = self.state.load(Ordering::Relaxed);
let mut extra_flags = 0;
loop {
// Attempt to grab the lock
if try_lock(&mut state) {
if try_lock(&mut state, extra_flags) {
return true;
}

Expand Down Expand Up @@ -1118,16 +1122,19 @@ impl RawRwLock {
let park_result = unsafe {
parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
};
extra_flags = 0;
match park_result {
// The thread that unparked us passed the lock on to us
// directly without unlocking it.
ParkResult::Unparked(TOKEN_HANDOFF) => return true,
ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => extra_flags = PARKED_BIT,

// We were unparked normally, try acquiring the lock again
ParkResult::Unparked(_) => (),

// The validation function failed, try locking again
ParkResult::Invalid => (),
// Check raw_mutex.rs for why setting PARKED_BIT here.
ParkResult::Invalid => extra_flags = PARKED_BIT,

// Timeout expired
ParkResult::TimedOut => return false,
Expand Down
Loading