diff --git a/core/src/parking_lot.rs b/core/src/parking_lot.rs index 519ce9e3..f889950f 100644 --- a/core/src/parking_lot.rs +++ b/core/src/parking_lot.rs @@ -6,7 +6,6 @@ // copied, modified, or distributed except according to those terms. use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT}; use crate::util::UncheckedOptionExt; -use crate::word_lock::WordLock; use core::{ cell::{Cell, UnsafeCell}, ptr, @@ -16,6 +15,134 @@ use instant::Instant; use smallvec::SmallVec; use std::time::Duration; +use safe_lock::SafeLock; +use safe_lock::Guard as LockGuard; +use locked_bucket::LockedBucket; + +mod safe_lock { + use crate::word_lock::WordLock; + use core::cell:: UnsafeCell; + + pub(crate) struct SafeLock { + lock: WordLock, + data: UnsafeCell, + } + + impl SafeLock { + pub fn new(value: T) -> Self { + SafeLock { + lock: WordLock::new(), + data: UnsafeCell::new(value), + } + } + + pub fn get_mut(&mut self) -> &mut T { + // get_mut on UnsafeCell requires nightly + unsafe { + &mut *self.data.get() + } + } + + pub fn lock(&self) -> Guard<'_, T> { + unsafe { + self.lock.lock(); + self.assume_locked() + } + } + + pub unsafe fn assume_locked(&self) -> Guard<'_, T> { + Guard { + lock: self, + _mark_not_send: Default::default(), + } + } + } + + pub(crate) struct Guard<'a, T> { + lock: &'a SafeLock, + _mark_not_send: core::marker::PhantomData<*mut T>, + } + + impl<'a, T: Sync> Guard<'a, T> { + pub fn forget(this: Self) -> &'a mut T { + unsafe { + let data = &mut *this.lock.data.get(); + core::mem::forget(this); + data + } + } + } + + // Sync is safe because the guard doesn't do any modification on internal lock data + // using shared reference - same logic as in std + unsafe impl<'a, T: Sync> Sync for Guard<'a, T> {} + + impl<'a, T: Sync> core::ops::Deref for Guard<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { + &*self.lock.data.get() + } + } + } + + impl<'a, T: Sync> core::ops::DerefMut for Guard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { + &mut *self.lock.data.get() + } + } + } + + impl<'a, T> Drop for Guard<'a, T> { + fn drop(&mut self) { + unsafe { + self.lock.lock.unlock() + } + } + } +} + +mod locked_hash_table { + use super::{HashTable, LockGuard}; + + pub(crate) struct LockedHashTable<'a> { + hash_table: &'a HashTable, + _mark_not_send: core::marker::PhantomData<*const HashTable>, + } + + impl<'a> LockedHashTable<'a> { + pub(crate) fn new(hash_table: &'a HashTable) -> Self { + for bucket in &hash_table.entries[..] { + LockGuard::forget(bucket.queue.lock()); + } + + LockedHashTable { + hash_table, + _mark_not_send: Default::default(), + } + } + + pub(crate) fn table(&self) -> &'a HashTable { + self.hash_table + } + + pub(crate) fn queues_mut(&mut self) -> impl Iterator { + self.hash_table.entries.iter().map(|bucket| unsafe { LockGuard::forget(bucket.queue.assume_locked()) }) + } + } + + impl<'a> Drop for LockedHashTable<'a> { + fn drop(&mut self) { + for bucket in &self.hash_table.entries[..] { + // SAFETY: We hold the lock here, as required + unsafe { bucket.queue.assume_locked() }; + } + } + } +} + static NUM_THREADS: AtomicUsize = AtomicUsize::new(0); /// Holds the pointer to the currently active `HashTable`. @@ -30,7 +157,7 @@ static HASHTABLE: AtomicPtr = AtomicPtr::new(ptr::null_mut()); // still only a few hundred bytes per thread. const LOAD_FACTOR: usize = 3; -struct HashTable { +pub(crate) struct HashTable { // Hash buckets for the table entries: Box<[Bucket]>, @@ -60,16 +187,79 @@ impl HashTable { _prev: prev, }) } + + fn lock_all_queues(&self) -> locked_hash_table::LockedHashTable<'_> { + locked_hash_table::LockedHashTable::new(self) + } } -#[repr(align(64))] -struct Bucket { - // Lock protecting the queue - mutex: WordLock, +pub(crate) struct Queue { + head: *const ThreadData, + tail: *const ThreadData, +} + +// This is not actually safe yet, the Queue operations must be protected by module boundary to be +// safe. Should be fine for a while. +unsafe impl Sync for Queue {} + +impl Queue { + fn new() -> Self { + Queue { + head: ptr::null(), + tail: ptr::null(), + } + } +} + +mod locked_bucket { + use super::{Bucket, Queue, LockGuard}; + + pub(crate) struct LockedBucket<'a> { + bucket: &'a Bucket, + _mark_not_send: core::marker::PhantomData<*const Bucket>, + } + + impl<'a> LockedBucket<'a> { + pub fn new(bucket: &'a Bucket) -> Self { + LockGuard::forget(bucket.queue.lock()); + LockedBucket { + bucket, + _mark_not_send: Default::default(), + } + } + + pub fn bucket(&self) -> &'a Bucket { + self.bucket + } + + pub fn queue(&mut self) -> &mut Queue { + unsafe { + LockGuard::forget(self.bucket.queue.assume_locked()) + } + } + + pub fn into_queue_guard(self) -> LockGuard<'a, Queue> { + unsafe { + let guard = self.bucket.queue.assume_locked(); + core::mem::forget(self); + guard + } + } + } + + impl<'a> Drop for LockedBucket<'a> { + fn drop(&mut self) { + unsafe { + self.bucket.queue.assume_locked(); + } + } + } +} +#[repr(align(64))] +pub(crate) struct Bucket { // Linked list of threads waiting on this bucket - queue_head: Cell<*const ThreadData>, - queue_tail: Cell<*const ThreadData>, + queue: SafeLock, // Next time at which point be_fair should be set fair_timeout: UnsafeCell, @@ -79,12 +269,14 @@ impl Bucket { #[inline] pub fn new(timeout: Instant, seed: u32) -> Self { Self { - mutex: WordLock::new(), - queue_head: Cell::new(ptr::null()), - queue_tail: Cell::new(ptr::null()), + queue: SafeLock::new(Queue::new()), fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)), } } + + fn lock(&self) -> LockedBucket<'_> { + LockedBucket::new(self) + } } struct FairTimeout { @@ -239,7 +431,7 @@ fn create_hashtable() -> &'static HashTable { // created, which only happens once per thread. fn grow_hashtable(num_threads: usize) { // Lock all buckets in the existing table and get a reference to it - let old_table = loop { + let mut old_table = loop { let table = get_hashtable(); // Check if we need to resize the existing table @@ -247,46 +439,31 @@ fn grow_hashtable(num_threads: usize) { return; } - // Lock all buckets in the old table - for bucket in &table.entries[..] { - bucket.mutex.lock(); - } + let locked = table.lock_all_queues(); // Now check if our table is still the latest one. Another thread could // have grown the hash table between us reading HASHTABLE and locking // the buckets. if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ { - break table; - } - - // Unlock buckets and try again - for bucket in &table.entries[..] { - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; + break locked; } }; // Create the new table - let mut new_table = HashTable::new(num_threads, old_table); + let mut new_table = HashTable::new(num_threads, old_table.table()); // Move the entries from the old table to the new one - for bucket in &old_table.entries[..] { + for queue in old_table.queues_mut() { // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked // lists. All `ThreadData` instances in these lists will remain valid as long as they are // present in the lists, meaning as long as their threads are parked. - unsafe { rehash_bucket_into(bucket, &mut new_table) }; + unsafe { rehash_bucket_into(queue, &mut new_table) }; } // Publish the new table. No races are possible at this point because // any other thread trying to grow the hash table is blocked on the bucket // locks in the old table. HASHTABLE.store(Box::into_raw(new_table), Ordering::Release); - - // Unlock all buckets in the old table - for bucket in &old_table.entries[..] { - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; - } } /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table @@ -298,19 +475,20 @@ fn grow_hashtable(num_threads: usize) { /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use. /// /// The given `table` must only contain buckets with correctly constructed linked lists. -unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) { - let mut current: *const ThreadData = bucket.queue_head.get(); +unsafe fn rehash_bucket_into(queue: &mut Queue, table: &mut HashTable) { + let mut current: *const ThreadData = queue.head; while !current.is_null() { + // This is the unsafety unsafe impl Sync for Queue talks about let next = (*current).next_in_queue.get(); let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits); - if table.entries[hash].queue_tail.get().is_null() { - table.entries[hash].queue_head.set(current); + if table.entries[hash].queue.get_mut().tail.is_null() { + table.entries[hash].queue.get_mut().head = current; } else { - (*table.entries[hash].queue_tail.get()) + (*table.entries[hash].queue.get_mut().tail) .next_in_queue .set(current); } - table.entries[hash].queue_tail.set(current); + table.entries[hash].queue.get_mut().tail = current; (*current).next_in_queue.set(ptr::null()); current = next; } @@ -331,7 +509,7 @@ fn hash(key: usize, bits: u32) -> usize { /// Locks the bucket for the given key and returns a reference to it. /// The returned bucket must be unlocked again in order to not cause deadlocks. #[inline] -fn lock_bucket(key: usize) -> &'static Bucket { +fn lock_bucket(key: usize) -> LockedBucket<'static> { loop { let hashtable = get_hashtable(); @@ -339,25 +517,26 @@ fn lock_bucket(key: usize) -> &'static Bucket { let bucket = &hashtable.entries[hash]; // Lock the bucket - bucket.mutex.lock(); + let guard= bucket.lock(); // If no other thread has rehashed the table before we grabbed the lock // then we are good to go! The lock we grabbed prevents any rehashes. if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { - return bucket; + return guard; } - - // Unlock the bucket and try again - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; } } +#[inline] +fn lock_queue(key: usize) -> LockGuard<'static, Queue> { + lock_bucket(key).into_queue_guard() +} + /// Locks the bucket for the given key and returns a reference to it. But checks that the key /// hasn't been changed in the meantime due to a requeue. /// The returned bucket must be unlocked again in order to not cause deadlocks. #[inline] -fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { +fn lock_bucket_checked(key: &AtomicUsize) -> (usize, LockedBucket<'static>) { loop { let hashtable = get_hashtable(); let current_key = key.load(Ordering::Relaxed); @@ -366,7 +545,7 @@ fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { let bucket = &hashtable.entries[hash]; // Lock the bucket - bucket.mutex.lock(); + let guard = bucket.lock(); // Check that both the hash table and key are correct while the bucket // is locked. Note that the key can't change once we locked the proper @@ -374,22 +553,25 @@ fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) { if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ && key.load(Ordering::Relaxed) == current_key { - return (current_key, bucket); + return (current_key, guard); } - - // Unlock the bucket and try again - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; } } +#[inline] +fn lock_queue_checked(key: &AtomicUsize) -> (usize, LockGuard<'static, Queue>) { + let (key, guard) = lock_bucket_checked(key); + (key, guard.into_queue_guard()) +} + /// Locks the two buckets for the given pair of keys and returns references to them. /// The returned buckets must be unlocked again in order to not cause deadlocks. /// -/// If both keys hash to the same value, both returned references will be to the same bucket. Be -/// careful to only unlock it once in this case, always use `unlock_bucket_pair`. +/// If both keys hash to the same value, the second bucket will be `None`. +/// +/// To avoid deadlocks the buckets have to be unlocked (dropped) from left to right. #[inline] -fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) { +fn lock_bucket_pair(key1: usize, key2: usize) -> (LockedBucket<'static>, Option>) { loop { let hashtable = get_hashtable(); @@ -404,42 +586,33 @@ fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Buck }; // Lock the first bucket - bucket1.mutex.lock(); + let guard1 = bucket1.lock(); // If no other thread has rehashed the table before we grabbed the lock // then we are good to go! The lock we grabbed prevents any rehashes. if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ { // Now lock the second bucket and return the two buckets if hash1 == hash2 { - return (bucket1, bucket1); + return (guard1, None); } else if hash1 < hash2 { let bucket2 = &hashtable.entries[hash2]; - bucket2.mutex.lock(); - return (bucket1, bucket2); + let guard2 = bucket2.lock(); + return (guard1, Some(guard2)); } else { let bucket2 = &hashtable.entries[hash1]; - bucket2.mutex.lock(); - return (bucket2, bucket1); + let guard2 = bucket2.lock(); + return (guard2, Some(guard1)); } } - - // Unlock the bucket and try again - // SAFETY: We hold the lock here, as required - unsafe { bucket1.mutex.unlock() }; } } -/// Unlock a pair of buckets -/// -/// # Safety -/// -/// Both buckets must be locked -#[inline] -unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) { - bucket1.mutex.unlock(); - if !ptr::eq(bucket1, bucket2) { - bucket2.mutex.unlock(); - } +// reimplementation of Cell::from_mut because it was stabilized in 1.37 and this crate supports +// 1.36 +fn cell_from_mut(t: &mut T) -> &Cell { + // SAFETY: copy-pasted from std 1.37 + // `&mut` ensures unique access. + unsafe { &*(t as *mut T as *const Cell) } } /// Result of a park operation. @@ -575,12 +748,10 @@ pub unsafe fn park( // Grab our thread data, this also ensures that the hash table exists with_thread_data(|thread_data| { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut queue = lock_queue(key); // If the validation function fails, just return if !validate() { - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); return ParkResult::Invalid; } @@ -590,14 +761,13 @@ pub unsafe fn park( thread_data.key.store(key, Ordering::Relaxed); thread_data.park_token.set(park_token); thread_data.parker.prepare_park(); - if !bucket.queue_head.get().is_null() { - (*bucket.queue_tail.get()).next_in_queue.set(thread_data); + if !queue.head.is_null() { + (*queue.tail).next_in_queue.set(thread_data); } else { - bucket.queue_head.set(thread_data); + queue.head = thread_data; } - bucket.queue_tail.set(thread_data); - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + queue.tail = thread_data; + drop(queue); // Invoke the pre-sleep callback before_sleep(); @@ -622,27 +792,25 @@ pub unsafe fn park( // Lock our bucket again. Note that the hashtable may have been rehashed in // the meantime. Our key may also have changed if we were requeued. - let (key, bucket) = lock_bucket_checked(&thread_data.key); + let (key, mut queue) = lock_queue_checked(&thread_data.key); // Now we need to check again if we were unparked or timed out. Unlike the // last check this is precise because we hold the bucket lock. if !thread_data.parker.timed_out() { - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); return ParkResult::Unparked(thread_data.unpark_token.get()); } // We timed out, so we now need to remove our thread from the queue - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = queue.head; + let mut link = cell_from_mut(&mut queue.head); let mut previous = ptr::null(); let mut was_last_thread = true; while !current.is_null() { if current == thread_data { let next = (*current).next_in_queue.get(); link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if queue.tail == current { + queue.tail = previous; } else { // Scan the rest of the queue to see if there are any other // entries with the given key. @@ -674,9 +842,6 @@ pub unsafe fn park( // if we timed out. debug_assert!(!current.is_null()); - // Unlock the bucket, we are done - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); ParkResult::TimedOut }) } @@ -706,11 +871,11 @@ pub unsafe fn unpark_one( callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut bucket = lock_bucket(key); // Find a thread with a matching key and remove it from the queue - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = bucket.queue().head; + let mut link = cell_from_mut(&mut bucket.queue().head); let mut previous = ptr::null(); let mut result = UnparkResult::default(); while !current.is_null() { @@ -718,8 +883,8 @@ pub unsafe fn unpark_one( // Remove the thread from the queue let next = (*current).next_in_queue.get(); link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if bucket.queue().tail == current { + bucket.queue().tail = previous; } else { // Scan the rest of the queue to see if there are any other // entries with the given key. @@ -735,7 +900,7 @@ pub unsafe fn unpark_one( // Invoke the callback before waking up the thread result.unparked_threads = 1; - result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); + result.be_fair = (*bucket.bucket().fair_timeout.get()).should_timeout(); let token = callback(result); // Set the token for the target thread @@ -747,8 +912,7 @@ pub unsafe fn unpark_one( // the queue locked while we perform a system call. Finally we wake // up the parked thread. let handle = (*current).parker.unpark_lock(); - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + drop(bucket); handle.unpark(); return result; @@ -761,8 +925,6 @@ pub unsafe fn unpark_one( // No threads with a matching key were found in the bucket callback(result); - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); result } @@ -780,11 +942,13 @@ pub unsafe fn unpark_one( #[inline] pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut queue = lock_queue(key); + // borrow checker needs a little help here + let queue_mut = &mut *queue; // Remove all threads with the given key in the bucket - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = queue_mut.head; + let mut link = cell_from_mut(&mut queue_mut.head); let mut previous = ptr::null(); let mut threads = SmallVec::<[_; 8]>::new(); while !current.is_null() { @@ -792,8 +956,8 @@ pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { // Remove the thread from the queue let next = (*current).next_in_queue.get(); link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if queue_mut.tail == current { + queue_mut.tail = previous; } // Set the token for the target thread @@ -811,9 +975,7 @@ pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize { } } - // Unlock the bucket - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + drop(queue); // Now that we are outside the lock, wake up all the threads that we removed // from the queue. @@ -860,20 +1022,23 @@ pub unsafe fn unpark_requeue( callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult { // Lock the two buckets for the given key - let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to); + let (mut bucket_from, mut bucket_to) = lock_bucket_pair(key_from, key_to); // If the validation function fails, just return let mut result = UnparkResult::default(); let op = validate(); if op == RequeueOp::Abort { - // SAFETY: Both buckets are locked, as required. - unlock_bucket_pair(bucket_from, bucket_to); + drop(bucket_from); + drop(bucket_to); return result; } + // borrow checker needs a little help here + let bucket_from_queue = &mut bucket_from.queue(); + // Remove all threads with the given key in the source bucket - let mut link = &bucket_from.queue_head; - let mut current = bucket_from.queue_head.get(); + let mut current = bucket_from_queue.head; + let mut link = cell_from_mut(&mut bucket_from_queue.head); let mut previous = ptr::null(); let mut requeue_threads: *const ThreadData = ptr::null(); let mut requeue_threads_tail: *const ThreadData = ptr::null(); @@ -883,8 +1048,8 @@ pub unsafe fn unpark_requeue( // Remove the thread from the queue let next = (*current).next_in_queue.get(); link.set(next); - if bucket_from.queue_tail.get() == current { - bucket_from.queue_tail.set(previous); + if bucket_from_queue.tail == current { + bucket_from_queue.tail = previous; } // Prepare the first thread for wakeup and requeue the rest. @@ -927,19 +1092,19 @@ pub unsafe fn unpark_requeue( // Add the requeued threads to the destination bucket if !requeue_threads.is_null() { (*requeue_threads_tail).next_in_queue.set(ptr::null()); - if !bucket_to.queue_head.get().is_null() { - (*bucket_to.queue_tail.get()) + if !bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().head.is_null() { + (*bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().tail) .next_in_queue .set(requeue_threads); } else { - bucket_to.queue_head.set(requeue_threads); + bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().head = requeue_threads; } - bucket_to.queue_tail.set(requeue_threads_tail); + bucket_to.as_mut().unwrap_or(&mut bucket_from).queue().tail = requeue_threads_tail; } // Invoke the callback before waking up the thread if result.unparked_threads != 0 { - result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout(); + result.be_fair = (*bucket_from.bucket().fair_timeout.get()).should_timeout(); } let token = callback(op, result); @@ -947,12 +1112,12 @@ pub unsafe fn unpark_requeue( if let Some(wakeup_thread) = wakeup_thread { (*wakeup_thread).unpark_token.set(token); let handle = (*wakeup_thread).parker.unpark_lock(); - // SAFETY: Both buckets are locked, as required. - unlock_bucket_pair(bucket_from, bucket_to); + drop(bucket_from); + drop(bucket_to); handle.unpark(); } else { - // SAFETY: Both buckets are locked, as required. - unlock_bucket_pair(bucket_from, bucket_to); + drop(bucket_from); + drop(bucket_to); } result @@ -991,11 +1156,13 @@ pub unsafe fn unpark_filter( callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult { // Lock the bucket for the given key - let bucket = lock_bucket(key); + let mut bucket = lock_bucket(key); + // borrow checker needs a little help here + let queue = bucket.queue(); // Go through the queue looking for threads with a matching key - let mut link = &bucket.queue_head; - let mut current = bucket.queue_head.get(); + let mut current = queue.head; + let mut link = cell_from_mut(&mut queue.head); let mut previous = ptr::null(); let mut threads = SmallVec::<[_; 8]>::new(); let mut result = UnparkResult::default(); @@ -1007,8 +1174,8 @@ pub unsafe fn unpark_filter( FilterOp::Unpark => { // Remove the thread from the queue link.set(next); - if bucket.queue_tail.get() == current { - bucket.queue_tail.set(previous); + if queue.tail == current { + queue.tail = previous; } // Add the thread to our list of threads to unpark @@ -1037,7 +1204,7 @@ pub unsafe fn unpark_filter( // Invoke the callback before waking up the threads result.unparked_threads = threads.len(); if result.unparked_threads != 0 { - result.be_fair = (*bucket.fair_timeout.get()).should_timeout(); + result.be_fair = (*bucket.bucket().fair_timeout.get()).should_timeout(); } let token = callback(result); @@ -1048,8 +1215,7 @@ pub unsafe fn unpark_filter( t.1 = Some((*t.0).parker.unpark_lock()); } - // SAFETY: We hold the lock here, as required - bucket.mutex.unlock(); + drop(bucket); // Now that we are outside the lock, wake up all the threads that we removed // from the queue. @@ -1409,9 +1575,9 @@ mod tests { /// Calls a closure for every `ThreadData` currently parked on a given key fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) { - let bucket = super::lock_bucket(key); + let mut bucket = super::lock_bucket(key); - let mut current: *const ThreadData = bucket.queue_head.get(); + let mut current: *const ThreadData = bucket.queue().head; while !current.is_null() { let current_ref = unsafe { &*current }; if current_ref.key.load(Ordering::Relaxed) == key { @@ -1419,9 +1585,6 @@ mod tests { } current = current_ref.next_in_queue.get(); } - - // SAFETY: We hold the lock here, as required - unsafe { bucket.mutex.unlock() }; } macro_rules! test {