From 58fde3b2786a723f80468650c86008caca757c66 Mon Sep 17 00:00:00 2001 From: Jay Date: Thu, 8 May 2025 12:42:44 +0800 Subject: [PATCH] even fast unlock in contention This is an alternative more aggressive implementation of idea #461. Compared to #461, this PR - maintains parked bit on waiter side, so that waker doesn't have to atomic operation twice. - reset all lock states back to 0 when unlock. This makes fast lock more likely succeed during high contention. - set PARKED_BIT even waiter is prevented from sleep, so that more threads can be woken up during contention to compete for progress. Signed-off-by: Jay --- src/condvar.rs | 17 ++++++++------- src/raw_mutex.rs | 49 +++++++++++++++++++++++++++---------------- src/raw_rwlock.rs | 53 +++++++++++++++++++++++++++-------------------- 3 files changed, 70 insertions(+), 49 deletions(-) diff --git a/src/condvar.rs b/src/condvar.rs index 7818a14c..442ed640 100644 --- a/src/condvar.rs +++ b/src/condvar.rs @@ -6,8 +6,8 @@ // copied, modified, or distributed except according to those terms. use crate::mutex::MutexGuard; -use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL}; -use crate::{deadlock, util}; +use crate::raw_mutex::{RawMutex, TOKEN_HANDOFF, TOKEN_NORMAL, TOKEN_RESTORE_PARKED_BIT}; +use crate::util; use core::{ fmt, ptr, sync::atomic::{AtomicPtr, Ordering}, @@ -229,9 +229,10 @@ impl Condvar { // If we requeued threads to the mutex, mark it as having // parked threads. The RequeueAll case is already handled above. if op == RequeueOp::UnparkOneRequeueRest && result.requeued_threads != 0 { - unsafe { (*mutex).mark_parked() }; + TOKEN_RESTORE_PARKED_BIT + } else { + TOKEN_NORMAL } - TOKEN_NORMAL }; let res = unsafe { parking_lot_core::unpark_requeue(from, to, validate, callback) }; @@ -350,10 +351,10 @@ impl Condvar { } // ... and re-lock it once we are done sleeping - if result == ParkResult::Unparked(TOKEN_HANDOFF) { - unsafe { deadlock::acquire_resource(mutex as *const _ as usize) }; - } else { - mutex.lock(); + match result { + ParkResult::Unparked(TOKEN_HANDOFF) => unreachable!("can't be handed off"), + ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => mutex.lock_contention(), + _ => mutex.lock(), } WaitTimeoutResult(!(result.is_unparked() || requeued)) diff --git a/src/raw_mutex.rs b/src/raw_mutex.rs index b1ae7ee8..29233d52 100644 --- a/src/raw_mutex.rs +++ b/src/raw_mutex.rs @@ -22,6 +22,9 @@ pub(crate) const TOKEN_NORMAL: UnparkToken = UnparkToken(0); // thread directly without unlocking it. pub(crate) const TOKEN_HANDOFF: UnparkToken = UnparkToken(1); +// UnparkToken used to indicate that the waiter should restore PARKED_BIT. +pub(crate) const TOKEN_RESTORE_PARKED_BIT: UnparkToken = UnparkToken(2); + /// This bit is set in the `state` of a `RawMutex` when that mutex is locked by some thread. const LOCKED_BIT: u8 = 0b01; /// This bit is set in the `state` of a `RawMutex` just before parking a thread. A thread is being @@ -69,7 +72,7 @@ unsafe impl lock_api::RawMutex for RawMutex { .compare_exchange_weak(0, LOCKED_BIT, Ordering::Acquire, Ordering::Relaxed) .is_err() { - self.lock_slow(None); + self.lock_slow(None, false); } unsafe { deadlock::acquire_resource(self as *const _ as usize) }; } @@ -99,11 +102,8 @@ unsafe impl lock_api::RawMutex for RawMutex { #[inline] unsafe fn unlock(&self) { deadlock::release_resource(self as *const _ as usize); - if self - .state - .compare_exchange(LOCKED_BIT, 0, Ordering::Release, Ordering::Relaxed) - .is_ok() - { + let prev = self.state.swap(0, Ordering::Release); + if prev == LOCKED_BIT { return; } self.unlock_slow(false); @@ -151,7 +151,7 @@ unsafe impl lock_api::RawMutexTimed for RawMutex { { true } else { - self.lock_slow(Some(timeout)) + self.lock_slow(Some(timeout), false) }; if result { unsafe { deadlock::acquire_resource(self as *const _ as usize) }; @@ -168,7 +168,7 @@ unsafe impl lock_api::RawMutexTimed for RawMutex { { true } else { - self.lock_slow(util::to_deadline(timeout)) + self.lock_slow(util::to_deadline(timeout), false) }; if result { unsafe { deadlock::acquire_resource(self as *const _ as usize) }; @@ -199,23 +199,27 @@ impl RawMutex { } } - // Used by Condvar when requeuing threads to us, must be called while - // holding the queue lock. #[inline] - pub(crate) fn mark_parked(&self) { - self.state.fetch_or(PARKED_BIT, Ordering::Relaxed); + pub(crate) fn lock_contention(&self) { + self.lock_slow(None, true); } #[cold] - fn lock_slow(&self, timeout: Option) -> bool { + fn lock_slow(&self, timeout: Option, in_contention: bool) -> bool { let mut spinwait = SpinWait::new(); let mut state = self.state.load(Ordering::Relaxed); + let mut extra_flags; + if in_contention { + extra_flags = PARKED_BIT; + } else { + extra_flags = 0; + } loop { // Grab the lock if it isn't locked, even if there is a queue on it if state & LOCKED_BIT == 0 { match self.state.compare_exchange_weak( state, - state | LOCKED_BIT, + state | LOCKED_BIT | extra_flags, Ordering::Acquire, Ordering::Relaxed, ) { @@ -254,6 +258,7 @@ impl RawMutex { self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); } }; + extra_flags = 0; // SAFETY: // * `addr` is an address we control. // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. @@ -271,12 +276,16 @@ impl RawMutex { // The thread that unparked us passed the lock on to us // directly without unlocking it. ParkResult::Unparked(TOKEN_HANDOFF) => return true, + ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => extra_flags = PARKED_BIT, // We were unparked normally, try acquiring the lock again ParkResult::Unparked(_) => (), // The validation function failed, try locking again - ParkResult::Invalid => (), + // This thread doesn't sleep, so it's not sure whether it's the last thread + // in queue. Setting PARKED_BIT can lead to false wake up. But false wake up + // is good for throughput during high contention. + ParkResult::Invalid => extra_flags = PARKED_BIT, // Timeout expired ParkResult::TimedOut => return false, @@ -296,7 +305,7 @@ impl RawMutex { let callback = |result: UnparkResult| { // If we are using a fair unlock then we should keep the // mutex locked and hand it off to the unparked thread. - if result.unparked_threads != 0 && (force_fair || result.be_fair) { + if result.unparked_threads != 0 && force_fair { // Clear the parked bit if there are no more parked // threads. if !result.have_more_threads { @@ -308,8 +317,12 @@ impl RawMutex { // Clear the locked bit, and the parked bit as well if there // are no more parked threads. if result.have_more_threads { - self.state.store(PARKED_BIT, Ordering::Release); - } else { + if force_fair { + self.state.store(PARKED_BIT, Ordering::Release); + } else { + return TOKEN_RESTORE_PARKED_BIT; + } + } else if force_fair { self.state.store(0, Ordering::Release); } TOKEN_NORMAL diff --git a/src/raw_rwlock.rs b/src/raw_rwlock.rs index 599889b1..63b0db24 100644 --- a/src/raw_rwlock.rs +++ b/src/raw_rwlock.rs @@ -6,7 +6,7 @@ // copied, modified, or distributed except according to those terms. use crate::elision::{have_elision, AtomicElisionExt}; -use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; +use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL, TOKEN_RESTORE_PARKED_BIT}; use crate::util; use core::{ cell::Cell, @@ -93,11 +93,8 @@ unsafe impl lock_api::RawRwLock for RawRwLock { #[inline] unsafe fn unlock_exclusive(&self) { self.deadlock_release(); - if self - .state - .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) - .is_ok() - { + let prev = self.state.swap(0, Ordering::Release); + if prev == WRITER_BIT { return; } self.unlock_exclusive_slow(false); @@ -613,7 +610,7 @@ impl RawRwLock { #[cold] fn lock_exclusive_slow(&self, timeout: Option) -> bool { - let try_lock = |state: &mut usize| { + let try_lock = |state: &mut usize, extra_flags: usize| { loop { if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { return false; @@ -622,7 +619,7 @@ impl RawRwLock { // Grab WRITER_BIT if it isn't set, even if there are parked threads. match self.state.compare_exchange_weak( *state, - *state | WRITER_BIT, + *state | WRITER_BIT | extra_flags, Ordering::Acquire, Ordering::Relaxed, ) { @@ -653,7 +650,7 @@ impl RawRwLock { let callback = |mut new_state, result: UnparkResult| { // If we are using a fair unlock then we should keep the // rwlock locked and hand it off to the unparked threads. - if result.unparked_threads != 0 && (force_fair || result.be_fair) { + if result.unparked_threads != 0 && force_fair { if result.have_more_threads { new_state |= PARKED_BIT; } @@ -662,8 +659,12 @@ impl RawRwLock { } else { // Clear the parked bit if there are no more parked threads. if result.have_more_threads { - self.state.store(PARKED_BIT, Ordering::Release); - } else { + if force_fair { + self.state.store(PARKED_BIT, Ordering::Release); + } else { + return TOKEN_RESTORE_PARKED_BIT; + } + } else if force_fair { self.state.store(0, Ordering::Release); } TOKEN_NORMAL @@ -677,13 +678,13 @@ impl RawRwLock { #[cold] fn lock_shared_slow(&self, recursive: bool, timeout: Option) -> bool { - let try_lock = |state: &mut usize| { + let try_lock = |state: &mut usize, extra_flags: usize| { let mut spinwait_shared = SpinWait::new(); loop { // Use hardware lock elision to avoid cache conflicts when multiple // readers try to acquire the lock. We only do this if the lock is // completely empty since elision handles conflicts poorly. - if have_elision() && *state == 0 { + if have_elision() && *state == 0 && extra_flags == 0 { match self.state.elision_compare_exchange_acquire(0, ONE_READER) { Ok(_) => return true, Err(x) => *state = x, @@ -702,9 +703,10 @@ impl RawRwLock { .state .compare_exchange_weak( *state, - state - .checked_add(ONE_READER) - .expect("RwLock reader count overflow"), + extra_flags + | state + .checked_add(ONE_READER) + .expect("RwLock reader count overflow"), Ordering::Acquire, Ordering::Relaxed, ) @@ -745,7 +747,7 @@ impl RawRwLock { #[cold] fn lock_upgradable_slow(&self, timeout: Option) -> bool { - let try_lock = |state: &mut usize| { + let try_lock = |state: &mut usize, extra_flags: usize| { let mut spinwait_shared = SpinWait::new(); loop { if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { @@ -756,9 +758,10 @@ impl RawRwLock { .state .compare_exchange_weak( *state, - state - .checked_add(ONE_READER | UPGRADABLE_BIT) - .expect("RwLock reader count overflow"), + extra_flags + | state + .checked_add(ONE_READER | UPGRADABLE_BIT) + .expect("RwLock reader count overflow"), Ordering::Acquire, Ordering::Relaxed, ) @@ -1067,14 +1070,15 @@ impl RawRwLock { &self, timeout: Option, token: ParkToken, - mut try_lock: impl FnMut(&mut usize) -> bool, + mut try_lock: impl FnMut(&mut usize, usize) -> bool, validate_flags: usize, ) -> bool { let mut spinwait = SpinWait::new(); let mut state = self.state.load(Ordering::Relaxed); + let mut extra_flags = 0; loop { // Attempt to grab the lock - if try_lock(&mut state) { + if try_lock(&mut state, extra_flags) { return true; } @@ -1118,16 +1122,19 @@ impl RawRwLock { let park_result = unsafe { parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) }; + extra_flags = 0; match park_result { // The thread that unparked us passed the lock on to us // directly without unlocking it. ParkResult::Unparked(TOKEN_HANDOFF) => return true, + ParkResult::Unparked(TOKEN_RESTORE_PARKED_BIT) => extra_flags = PARKED_BIT, // We were unparked normally, try acquiring the lock again ParkResult::Unparked(_) => (), // The validation function failed, try locking again - ParkResult::Invalid => (), + // Check raw_mutex.rs for why setting PARKED_BIT here. + ParkResult::Invalid => extra_flags = PARKED_BIT, // Timeout expired ParkResult::TimedOut => return false,