From a28b1825459d11bd9c6d765abb513c18fed4871c Mon Sep 17 00:00:00 2001 From: Martin Habovstiak Date: Wed, 10 Feb 2021 14:22:23 +0100 Subject: [PATCH 1/2] Track more safety invariants in the type system This change creates several wrappers around `unsafe` operations to better track the safety invariants. While not perfect, which should reduce the number of footguns in the code. It focuses on locking - using primitives like mutex guards known from `std`. The locked state is tracked in those guards. This crate needs some special features however. It needs to lock all locks in a slice and unlock them in the same order. Similarly it needs to lock a queue inside a `Bucket` while keeping reference to the bucket. Additional specialized wrappers maintain safety of these advanced operations similarly to guards. They are implemented in their own modules to avoid access from other parts of code. This change comes with two more side effects: * `Cell` was removed from the bucket as now locking provides required safety. * Bucket pair uses `Option` to represent same buckets. This adds a slight performance penalty but it's in slow path and may be optimized out by the compiler. It's even possible that absence of `Cell` will enable even more optimizations. --- core/src/parking_lot.rs | 443 +++++++++++++++++++++++++++------------- 1 file changed, 299 insertions(+), 144 deletions(-) diff --git a/core/src/parking_lot.rs b/core/src/parking_lot.rs index 519ce9e3..e00b8cd8 100644 --- a/core/src/parking_lot.rs +++ b/core/src/parking_lot.rs @@ -6,7 +6,6 @@ // copied, modified, or distributed except according to those terms. use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; use crate::util::UncheckedOptionExt; -use crate::word_lock::WordLock; use core::{ cell::{Cell, UnsafeCell}, ptr, @@ -16,6 +15,134 @@ use instant::Instant; use smallvec::SmallVec; use std::time::Duration; +use safe_lock::SafeLock; +use safe_lock::Guard as LockGuard; +use locked_bucket::LockedBucket; + +mod safe_lock { + use crate::word_lock::WordLock; + use core::cell:: UnsafeCell; + + pub(crate) struct SafeLock { + lock: WordLock, + data: UnsafeCell, + } + + impl SafeLock { + pub fn new(value: T) -> Self { + SafeLock { + lock: WordLock::new(), + data: UnsafeCell::new(value), + } + } + + pub fn get_mut(&mut self) -> &mut T { + // get_mut on UnsafeCell requires nightly + unsafe { + &mut *self.data.get() + } + } + + pub fn lock(&self) -> Guard<'_, T> { + unsafe { + self.lock.lock(); + self.assume_locked() + } + } + + pub unsafe fn assume_locked(&self) -> Guard<'_, T> { + Guard { + lock: self, + _mark_not_send: Default::default(), + } + } + } + + pub(crate) struct Guard<'a, T> { + lock: &'a SafeLock, + _mark_not_send: core::marker::PhantomData<*mut T>, + } + + impl<'a, T: Sync> Guard<'a, T> { + pub fn forget(this: Self) -> &'a mut T { + unsafe { + let data = &mut *this.lock.data.get(); + core::mem::forget(this); + data + } + } + } + + // Sync is safe because the guard doesn't do any modification on internal lock data + // using shared reference - same logic as in std + unsafe impl<'a, T: Sync> Sync for Guard<'a, T> {} + + impl<'a, T: Sync> core::ops::Deref for Guard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { + &*self.lock.data.get() + } + } + } + + impl<'a, T: Sync> core::ops::DerefMut for Guard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { + &mut *self.lock.data.get() + } + } + } + + impl<'a, T> Drop for Guard<'a, T> { + fn drop(&mut self) { + unsafe { + self.lock.lock.unlock() + } + } + } +} + +mod locked_hash_table { + use super::{HashTable, LockGuard}; + + pub(crate) struct LockedHashTable<'a> { + hash_table: &'a HashTable, + _mark_not_send: core::marker::PhantomData<*const HashTable>, + } + + impl<'a> LockedHashTable<'a> { + pub(crate) fn new(hash_table: &'a HashTable) -> Self { + for bucket in &hash_table.entries[..] { + LockGuard::forget(bucket.queue.lock()); + } + + LockedHashTable { + hash_table, + _mark_not_send: Default::default(), + } + } + + pub(crate) fn table(&self) -> &'a HashTable { + self.hash_table + } + + pub(crate) fn queues_mut(&mut self) -> impl Iterator { + self.hash_table.entries.iter().map(|bucket| unsafe { LockGuard::forget(bucket.queue.assume_locked()) }) + } + } + + impl<'a> Drop for LockedHashTable<'a> { + fn drop(&mut self) { + for bucket in &self.hash_table.entries[..] { + // SAFETY: We hold the lock here, as required + unsafe { bucket.queue.assume_locked() }; + } + } + } +} + static NUM_THREADS: AtomicUsize = AtomicUsize::new(0); /// Holds the pointer to the currently active `HashTable`. @@ -30,7 +157,7 @@ static HASHTABLE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); // still only a few hundred bytes per thread. const LOAD_FACTOR: usize = 3; -struct HashTable { +pub(crate) struct HashTable { // Hash buckets for the table entries: Box<[Bucket]>, @@ -60,16 +187,79 @@ impl HashTable { _prev: prev, }) } + + fn lock_all_queues(&self) -> locked_hash_table::LockedHashTable<'_> { + locked_hash_table::LockedHashTable::new(self) + } } -#[repr(align(64))] -struct Bucket { - // Lock protecting the queue - mutex: WordLock, +pub(crate) struct Queue { + head: *const ThreadData, + tail: *const ThreadData, +} + +// This is not actually safe yet, the Queue operations must be protected by module boundary to be +// safe. Should be fine for a while. +unsafe impl Sync for Queue {} + +impl Queue { + fn new() -> Self { + Queue { + head: ptr::null(), + tail: ptr::null(), + } + } +} + +mod locked_bucket { + use super::{Bucket, Queue, LockGuard}; + + pub(crate) struct LockedBucket<'a> { + bucket: &'a Bucket, + _mark_not_send: core::marker::PhantomData<*const Bucket>, + } + + impl<'a> LockedBucket<'a> { + pub fn new(bucket: &'a Bucket) -> Self { + LockGuard::forget(bucket.queue.lock()); + LockedBucket { + bucket, + _mark_not_send: Default::default(), + } + } + + pub fn bucket(&self) -> &'a Bucket { + self.bucket + } + + pub fn queue(&mut self) -> &mut Queue { + unsafe { + LockGuard::forget(self.bucket.queue.assume_locked()) + } + } + + pub fn into_queue_guard(self) -> LockGuard<'a, Queue> { + unsafe { + let guard = self.bucket.queue.assume_locked(); + core::mem::forget(self); + guard + } + } + } + + impl<'a> Drop for LockedBucket<'a> { + fn drop(&mut self) { + unsafe { + self.bucket.queue.assume_locked(); + } + } + } +} +#[repr(align(64))] +pub(crate) struct Bucket { // Linked list of threads waiting on this bucket - queue_head: Cell<*const ThreadData>, - queue_tail: Cell<*const ThreadData>, + queue: SafeLock, // Next time at which point be_fair should be set fair_timeout: UnsafeCell, @@ -79,12 +269,14 @@ impl Bucket { #[inline] pub fn new(timeout: Instant, seed: u32) -> Self { Self { - mutex: WordLock::new(), - queue_head: Cell::new(ptr::null()), - queue_tail: Cell::new(ptr::null()), + queue: SafeLock::new(Queue::new()), fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)), } } + + fn lock(&self) -> LockedBucket<'_> { + LockedBucket::new(self) + } } struct FairTimeout { @@ -239,7 +431,7 @@ fn create_hashtable() -> &'static HashTable { // created, which only happens once per thread. fn grow_hashtable(num_threads: usize) { // Lock all buckets in the existing table and get a reference to it - let old_table = loop { + let mut old_table = loop { let table = get_hashtable(); // Check if we need to resize the existing table @@ -247,46 +439,31 @@ fn grow_hashtable(num_threads: usize) { return; } - // Lock all buckets in the old table - for bucket in &table.entries[..] { - bucket.mutex.lock(); - } + let locked = table.lock_all_queues(); // Now check if our table is still the latest one. Another thread could // have grown the hash table between us reading HASHTABLE and locking // the buckets. if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ { - break table; - } - - // Unlock buckets and try again - for bucket in &table.entries[..] { - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; + break locked; } }; // Create the new table - let mut new_table = HashTable::new(num_threads, old_table); + let mut new_table = HashTable::new(num_threads, old_table.table()); // Move the entries from the old table to the new one - for bucket in &old_table.entries[..] { + for queue in old_table.queues_mut() { // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked // lists. All `ThreadData` instances in these lists will remain valid as long as they are // present in the lists, meaning as long as their threads are parked. - unsafe { rehash_bucket_into(bucket, &mut new_table) }; + unsafe { rehash_bucket_into(queue, &mut new_table) }; } // Publish the new table. No races are possible at this point because // any other thread trying to grow the hash table is blocked on the bucket // locks in the old table. HASHTABLE.store(Box::into_raw(new_table), Ordering::Release); - - // Unlock all buckets in the old table - for bucket in &old_table.entries[..] { - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; - } } /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table @@ -298,19 +475,20 @@ fn grow_hashtable(num_threads: usize) { /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use. /// /// The given `table` must only contain buckets with correctly constructed linked lists. -unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) { - let mut current: *const ThreadData = bucket.queue_head.get(); +unsafe fn rehash_bucket_into(queue: &mut Queue, table: &mut HashTable) { + let mut current: *const ThreadData = queue.head; while !current.is_null() { + // This is the unsafety unsafe impl Sync for Queue talks about let next = (*current).next_in_queue.get(); let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits); - if table.entries[hash].queue_tail.get().is_null() { - table.entries[hash].queue_head.set(current); + if table.entries[hash].queue.get_mut().tail.is_null() { + table.entries[hash].queue.get_mut().head = current; } else { - (*table.entries[hash].queue_tail.get()) + (*table.entries[hash].queue.get_mut().tail) .next_in_queue .set(current); } - table.entries[hash].queue_tail.set(current); + table.entries[hash].queue.get_mut().tail = current; (*current).next_in_queue.set(ptr::null()); current = next; } @@ -331,7 +509,7 @@ fn hash(key: usize, bits: u32) -> usize { /// Locks the bucket for the given key and returns a reference to it. /// The returned bucket must be unlocked again in order to not cause deadlocks. #[inline] -fn lock_bucket(key: usize) -> &'static Bucket { +fn lock_bucket(key: usize) -> LockedBucket<'static> { loop { let hashtable = get_hashtable(); @@ -339,25 +517,26 @@ fn lock_bucket(key: usize) -> &'static Bucket { let bucket = &hashtable.entries[hash]; // Lock the bucket - bucket.mutex.lock(); + let guard= bucket.lock(); // If no other thread has rehashed the table before we grabbed the lock // then we are good to go! The lock we grabbed prevents any rehashes. if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { - return bucket; + return guard; } - - // Unlock the bucket and try again - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; } } +#[inline] +fn lock_queue(key: usize) -> LockGuard<'static, Queue> { + lock_bucket(key).into_queue_guard() +} + /// Locks the bucket for the given key and returns a reference to it. But checks that the key /// hasn't been changed in the meantime due to a requeue. /// The returned bucket must be unlocked again in order to not cause deadlocks. #[inline] -fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { +fn lock_bucket_checked(key: &AtomicUsize) -> (usize, LockedBucket<'static>) { loop { let hashtable = get_hashtable(); let current_key = key.load(Ordering::Relaxed); @@ -366,7 +545,7 @@ fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { let bucket = &hashtable.entries[hash]; // Lock the bucket - bucket.mutex.lock(); + let guard = bucket.lock(); // Check that both the hash table and key are correct while the bucket // is locked. Note that the key can't change once we locked the proper @@ -374,22 +553,25 @@ fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ && key.load(Ordering::Relaxed) == current_key { - return (current_key, bucket); + return (current_key, guard); } - - // Unlock the bucket and try again - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; } } +#[inline] +fn lock_queue_checked(key: &AtomicUsize) -> (usize, LockGuard<'static, Queue>) { + let (key, guard) = lock_bucket_checked(key); + (key, guard.into_queue_guard()) +} + /// Locks the two buckets for the given pair of keys and returns references to them. /// The returned buckets must be unlocked again in order to not cause deadlocks. /// -/// If both keys hash to the same value, both returned references will be to the same bucket. Be -/// careful to only unlock it once in this case, always use `unlock_bucket_pair`. +/// If both keys hash to the same value, the second bucket will be `None`. +/// +/// To avoid deadlocks the buckets have to be unlocked (dropped) from left to right. #[inline] -fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) { +fn lock_bucket_pair(key1: usize, key2: usize) -> (LockedBucket<'static>, Option>) { loop { let hashtable = get_hashtable(); @@ -404,41 +586,24 @@ fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Buck }; // Lock the first bucket - bucket1.mutex.lock(); + let guard1 = bucket1.lock(); // If no other thread has rehashed the table before we grabbed the lock // then we are good to go! The lock we grabbed prevents any rehashes. if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { // Now lock the second bucket and return the two buckets if hash1 == hash2 { - return (bucket1, bucket1); + return (guard1, None); } else if hash1 < hash2 { let bucket2 = &hashtable.entries[hash2]; - bucket2.mutex.lock(); - return (bucket1, bucket2); + let guard2 = bucket2.lock(); + return (guard1, Some(guard2)); } else { let bucket2 = &hashtable.entries[hash1]; - bucket2.mutex.lock(); - return (bucket2, bucket1); + let guard2 = bucket2.lock(); + return (guard2, Some(guard1)); } } - - // Unlock the bucket and try again - // SAFETY: We hold the lock here, as required - unsafe { bucket1.mutex.unlock() }; - } -} - -/// Unlock a pair of buckets -/// -/// # Safety -/// -/// Both buckets must be locked -#[inline] -unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) { - bucket1.mutex.unlock(); - if !ptr::eq(bucket1, bucket2) { - bucket2.mutex.unlock(); } } @@ -575,12 +740,10 @@ pub unsafe fn park( // Grab our thread data, this also ensures that the hash table exists with_thread_data(|thread_data| { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut queue = lock_queue(key); // If the validation function fails, just return if !validate() { - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); return ParkResult::Invalid; } @@ -590,14 +753,13 @@ pub unsafe fn park( thread_data.key.store(key, Ordering::Relaxed); thread_data.park_token.set(park_token); thread_data.parker.prepare_park(); - if !bucket.queue_head.get().is_null() { - (*bucket.queue_tail.get()).next_in_queue.set(thread_data); + if !queue.head.is_null() { + (*queue.tail).next_in_queue.set(thread_data); } else { - bucket.queue_head.set(thread_data); + queue.head = thread_data; } - bucket.queue_tail.set(thread_data); - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + queue.tail = thread_data; + drop(queue); // Invoke the pre-sleep callback before_sleep(); @@ -622,27 +784,25 @@ pub unsafe fn park( // Lock our bucket again. Note that the hashtable may have been rehashed in // the meantime. Our key may also have changed if we were requeued. - let (key, bucket) = lock_bucket_checked(&thread_data.key); + let (key, mut queue) = lock_queue_checked(&thread_data.key); // Now we need to check again if we were unparked or timed out. Unlike the // last check this is precise because we hold the bucket lock. if !thread_data.parker.timed_out() { - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); return ParkResult::Unparked(thread_data.unpark_token.get()); } // We timed out, so we now need to remove our thread from the queue - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = queue.head; + let mut link = Cell::from_mut(&mut queue.head); let mut previous = ptr::null(); let mut was_last_thread = true; while !current.is_null() { if current == thread_data { let next = (*current).next_in_queue.get(); link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if queue.tail == current { + queue.tail = previous; } else { // Scan the rest of the queue to see if there are any other // entries with the given key. @@ -674,9 +834,6 @@ pub unsafe fn park( // if we timed out. debug_assert!(!current.is_null()); - // Unlock the bucket, we are done - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); ParkResult::TimedOut }) } @@ -706,11 +863,11 @@ pub unsafe fn unpark_one( callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut bucket = lock_bucket(key); // Find a thread with a matching key and remove it from the queue - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = bucket.queue().head; + let mut link = Cell::from_mut(&mut bucket.queue().head); let mut previous = ptr::null(); let mut result = UnparkResult::default(); while !current.is_null() { @@ -718,8 +875,8 @@ pub unsafe fn unpark_one( // Remove the thread from the queue let next = (*current).next_in_queue.get(); link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if bucket.queue().tail == current { + bucket.queue().tail = previous; } else { // Scan the rest of the queue to see if there are any other // entries with the given key. @@ -735,7 +892,7 @@ pub unsafe fn unpark_one( // Invoke the callback before waking up the thread result.unparked_threads = 1; - result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); + result.be_fair = (*bucket.bucket().fair_timeout.get()).should_timeout(); let token = callback(result); // Set the token for the target thread @@ -747,8 +904,7 @@ pub unsafe fn unpark_one( // the queue locked while we perform a system call. Finally we wake // up the parked thread. let handle = (*current).parker.unpark_lock(); - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + drop(bucket); handle.unpark(); return result; @@ -761,8 +917,6 @@ pub unsafe fn unpark_one( // No threads with a matching key were found in the bucket callback(result); - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); result } @@ -780,11 +934,13 @@ pub unsafe fn unpark_one( #[inline] pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut queue = lock_queue(key); + // borrow checker needs a little help here + let queue_mut = &mut *queue; // Remove all threads with the given key in the bucket - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = queue_mut.head; + let mut link = Cell::from_mut(&mut queue_mut.head); let mut previous = ptr::null(); let mut threads = SmallVec::<[_; 8]>::new(); while !current.is_null() { @@ -792,8 +948,8 @@ pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { // Remove the thread from the queue let next = (*current).next_in_queue.get(); link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if queue_mut.tail == current { + queue_mut.tail = previous; } // Set the token for the target thread @@ -811,9 +967,7 @@ pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { } } - // Unlock the bucket - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + drop(queue); // Now that we are outside the lock, wake up all the threads that we removed // from the queue. @@ -860,20 +1014,23 @@ pub unsafe fn unpark_requeue( callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult { // Lock the two buckets for the given key - let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to); + let (mut bucket_from, mut bucket_to) = lock_bucket_pair(key_from, key_to); // If the validation function fails, just return let mut result = UnparkResult::default(); let op = validate(); if op == RequeueOp::Abort { - // SAFETY: Both buckets are locked, as required. - unlock_bucket_pair(bucket_from, bucket_to); + drop(bucket_from); + drop(bucket_to); return result; } + // borrow checker needs a little help here + let bucket_from_queue = &mut bucket_from.queue(); + // Remove all threads with the given key in the source bucket - let mut link = &bucket_from.queue_head; - let mut current = bucket_from.queue_head.get(); + let mut current = bucket_from_queue.head; + let mut link = Cell::from_mut(&mut bucket_from_queue.head); let mut previous = ptr::null(); let mut requeue_threads: *const ThreadData = ptr::null(); let mut requeue_threads_tail: *const ThreadData = ptr::null(); @@ -883,8 +1040,8 @@ pub unsafe fn unpark_requeue( // Remove the thread from the queue let next = (*current).next_in_queue.get(); link.set(next); - if bucket_from.queue_tail.get() == current { - bucket_from.queue_tail.set(previous); + if bucket_from_queue.tail == current { + bucket_from_queue.tail = previous; } // Prepare the first thread for wakeup and requeue the rest. @@ -927,19 +1084,19 @@ pub unsafe fn unpark_requeue( // Add the requeued threads to the destination bucket if !requeue_threads.is_null() { (*requeue_threads_tail).next_in_queue.set(ptr::null()); - if !bucket_to.queue_head.get().is_null() { - (*bucket_to.queue_tail.get()) + if !bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().head.is_null() { + (*bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().tail) .next_in_queue .set(requeue_threads); } else { - bucket_to.queue_head.set(requeue_threads); + bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().head = requeue_threads; } - bucket_to.queue_tail.set(requeue_threads_tail); + bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().tail = requeue_threads_tail; } // Invoke the callback before waking up the thread if result.unparked_threads != 0 { - result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout(); + result.be_fair = (*bucket_from.bucket().fair_timeout.get()).should_timeout(); } let token = callback(op, result); @@ -947,12 +1104,12 @@ pub unsafe fn unpark_requeue( if let Some(wakeup_thread) = wakeup_thread { (*wakeup_thread).unpark_token.set(token); let handle = (*wakeup_thread).parker.unpark_lock(); - // SAFETY: Both buckets are locked, as required. - unlock_bucket_pair(bucket_from, bucket_to); + drop(bucket_from); + drop(bucket_to); handle.unpark(); } else { - // SAFETY: Both buckets are locked, as required. - unlock_bucket_pair(bucket_from, bucket_to); + drop(bucket_from); + drop(bucket_to); } result @@ -991,11 +1148,13 @@ pub unsafe fn unpark_filter( callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut bucket = lock_bucket(key); + // borrow checker needs a little help here + let queue = bucket.queue(); // Go through the queue looking for threads with a matching key - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = queue.head; + let mut link = Cell::from_mut(&mut queue.head); let mut previous = ptr::null(); let mut threads = SmallVec::<[_; 8]>::new(); let mut result = UnparkResult::default(); @@ -1007,8 +1166,8 @@ pub unsafe fn unpark_filter( FilterOp::Unpark => { // Remove the thread from the queue link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if queue.tail == current { + queue.tail = previous; } // Add the thread to our list of threads to unpark @@ -1037,7 +1196,7 @@ pub unsafe fn unpark_filter( // Invoke the callback before waking up the threads result.unparked_threads = threads.len(); if result.unparked_threads != 0 { - result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); + result.be_fair = (*bucket.bucket().fair_timeout.get()).should_timeout(); } let token = callback(result); @@ -1048,8 +1207,7 @@ pub unsafe fn unpark_filter( t.1 = Some((*t.0).parker.unpark_lock()); } - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + drop(bucket); // Now that we are outside the lock, wake up all the threads that we removed // from the queue. @@ -1409,9 +1567,9 @@ mod tests { /// Calls a closure for every `ThreadData` currently parked on a given key fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) { - let bucket = super::lock_bucket(key); + let mut bucket = super::lock_bucket(key); - let mut current: *const ThreadData = bucket.queue_head.get(); + let mut current: *const ThreadData = bucket.queue().head; while !current.is_null() { let current_ref = unsafe { &*current }; if current_ref.key.load(Ordering::Relaxed) == key { @@ -1419,9 +1577,6 @@ mod tests { } current = current_ref.next_in_queue.get(); } - - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; } macro_rules! test { From 43eff38517e3e6d2defce78abb2fdb030d3df814 Mon Sep 17 00:00:00 2001 From: Martin Habovstiak Date: Wed, 10 Feb 2021 15:27:52 +0100 Subject: [PATCH 2/2] Support Rust 1.36 `Cell::from_mut()` was stabilized in 1.37 but the MSRV of this crate is 1.36. --- core/src/parking_lot.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/parking_lot.rs b/core/src/parking_lot.rs index e00b8cd8..f889950f 100644 --- a/core/src/parking_lot.rs +++ b/core/src/parking_lot.rs @@ -607,6 +607,14 @@ fn lock_bucket_pair(key1: usize, key2: usize) -> (LockedBucket<'static>, Option< } } +// reimplementation of Cell::from_mut because it was stabilized in 1.37 and this crate supports +// 1.36 +fn cell_from_mut(t: &mut T) -> &Cell { + // SAFETY: copy-pasted from std 1.37 + // `&mut` ensures unique access. + unsafe { &*(t as *mut T as *const Cell) } +} + /// Result of a park operation. #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum ParkResult { @@ -794,7 +802,7 @@ pub unsafe fn park( // We timed out, so we now need to remove our thread from the queue let mut current = queue.head; - let mut link = Cell::from_mut(&mut queue.head); + let mut link = cell_from_mut(&mut queue.head); let mut previous = ptr::null(); let mut was_last_thread = true; while !current.is_null() { @@ -867,7 +875,7 @@ pub unsafe fn unpark_one( // Find a thread with a matching key and remove it from the queue let mut current = bucket.queue().head; - let mut link = Cell::from_mut(&mut bucket.queue().head); + let mut link = cell_from_mut(&mut bucket.queue().head); let mut previous = ptr::null(); let mut result = UnparkResult::default(); while !current.is_null() { @@ -940,7 +948,7 @@ pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { // Remove all threads with the given key in the bucket let mut current = queue_mut.head; - let mut link = Cell::from_mut(&mut queue_mut.head); + let mut link = cell_from_mut(&mut queue_mut.head); let mut previous = ptr::null(); let mut threads = SmallVec::<[_; 8]>::new(); while !current.is_null() { @@ -1030,7 +1038,7 @@ pub unsafe fn unpark_requeue( // Remove all threads with the given key in the source bucket let mut current = bucket_from_queue.head; - let mut link = Cell::from_mut(&mut bucket_from_queue.head); + let mut link = cell_from_mut(&mut bucket_from_queue.head); let mut previous = ptr::null(); let mut requeue_threads: *const ThreadData = ptr::null(); let mut requeue_threads_tail: *const ThreadData = ptr::null(); @@ -1154,7 +1162,7 @@ pub unsafe fn unpark_filter( // Go through the queue looking for threads with a matching key let mut current = queue.head; - let mut link = Cell::from_mut(&mut queue.head); + let mut link = cell_from_mut(&mut queue.head); let mut previous = ptr::null(); let mut threads = SmallVec::<[_; 8]>::new(); let mut result = UnparkResult::default();