diff --git a/compiler/rustc_middle/src/query/job.rs b/compiler/rustc_middle/src/query/job.rs index 923eae045cae2..2f7b12bcc7254 100644 --- a/compiler/rustc_middle/src/query/job.rs +++ b/compiler/rustc_middle/src/query/job.rs @@ -70,22 +70,29 @@ impl<'tcx> QueryJob<'tcx> { } #[derive(Debug)] -pub struct QueryWaiter<'tcx> { +pub struct QueryWaiter { pub query: Option, - pub condvar: Condvar, + pub condvar: Arc, pub span: Span, - pub cycle: Mutex>>>, } #[derive(Clone, Debug)] pub struct QueryLatch<'tcx> { /// The `Option` is `Some(..)` when the job is active, and `None` once completed. - pub waiters: Arc>>>>>, + pub inner: Arc>>>, +} + +#[derive(Debug)] +pub struct QueryLatchState<'tcx> { + pub waiters: Vec, + pub cycle: Option>>, } impl<'tcx> QueryLatch<'tcx> { fn new() -> Self { - QueryLatch { waiters: Arc::new(Mutex::new(Some(Vec::new()))) } + QueryLatch { + inner: Arc::new(Mutex::new(Some(QueryLatchState { waiters: Vec::new(), cycle: None }))), + } } /// Awaits for the query job to complete. @@ -95,19 +102,20 @@ impl<'tcx> QueryLatch<'tcx> { query: Option, span: Span, ) -> Result<(), CycleError>> { - let mut waiters_guard = self.waiters.lock(); - let Some(waiters) = &mut *waiters_guard else { + let mut state_lock = self.inner.lock(); + let Some(state) = &mut *state_lock else { return Ok(()); // already complete }; - let waiter = - Arc::new(QueryWaiter { query, span, cycle: Mutex::new(None), condvar: Condvar::new() }); + let condvar = Arc::new(Condvar::new()); + let waiter = QueryWaiter { query, span, condvar: Arc::clone(&condvar) }; + state.waiters.reserve(state.waiters.len().saturating_sub(tcx.sess.threads())); // We push the waiter on to the `waiters` list. It can be accessed inside // the `wait` call below, by 1) the `set` method or 2) by deadlock detection. // Both of these will remove it from the `waiters` list before resuming // this thread. - waiters.push(Arc::clone(&waiter)); + state.waiters.push(waiter); // Awaits the caller on this latch by blocking the current thread. // If this detects a deadlock and the deadlock handler wants to resume this thread @@ -115,16 +123,15 @@ impl<'tcx> QueryLatch<'tcx> { // getting the self.info lock. rustc_thread_pool::mark_blocked(); tcx.jobserver_proxy.release_thread(); - waiter.condvar.wait(&mut waiters_guard); + condvar.wait(&mut state_lock); + let cycle = state_lock + .as_mut() + .map(|s| s.cycle.take().expect("resumed waiter for unfinished query without a cycle")); // Release the lock before we potentially block in `acquire_thread` - drop(waiters_guard); + drop(state_lock); tcx.jobserver_proxy.acquire_thread(); - // FIXME: Get rid of this lock. We have ownership of the QueryWaiter - // although another thread may still have a Arc reference so we cannot - // use Arc::get_mut - let mut cycle = waiter.cycle.lock(); - match cycle.take() { + match cycle { None => Ok(()), Some(cycle) => Err(cycle), } @@ -132,8 +139,8 @@ impl<'tcx> QueryLatch<'tcx> { /// Sets the latch and resumes all waiters on it fn set(&self) { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.take().unwrap(); // mark the latch as complete + let mut state_lock = self.inner.lock(); + let waiters = state_lock.take().unwrap().waiters; // mark the latch as complete let registry = rustc_thread_pool::Registry::current(); for waiter in waiters { rustc_thread_pool::mark_unblocked(®istry); @@ -143,10 +150,10 @@ impl<'tcx> QueryLatch<'tcx> { /// Removes a single waiter from the list of waiters. /// This is used to break query cycles. - pub fn extract_waiter(&self, waiter: usize) -> Arc> { - let mut waiters_guard = self.waiters.lock(); - let waiters = waiters_guard.as_mut().expect("non-empty waiters vec"); + pub fn extract_waiter(&self, waiter: usize) -> QueryWaiter { + let mut state_lock = self.inner.lock(); + let state = state_lock.as_mut().expect("non-empty waiters vec"); // Remove the waiter from the list of waiters - waiters.remove(waiter) + state.waiters.remove(waiter) } } diff --git a/compiler/rustc_query_impl/src/job.rs b/compiler/rustc_query_impl/src/job.rs index 01f36d06c0472..20d7b601e6183 100644 --- a/compiler/rustc_query_impl/src/job.rs +++ b/compiler/rustc_query_impl/src/job.rs @@ -1,7 +1,6 @@ use std::io::Write; use std::iter; use std::ops::ControlFlow; -use std::sync::Arc; use rustc_data_structures::fx::{FxHashMap, FxHashSet}; use rustc_errors::{Diag, DiagCtxtHandle}; @@ -136,7 +135,7 @@ fn visit_waiters<'tcx>( // Visit the explicit waiters which use condvars and are resumable if let Some(latch) = job_map.latch_of(query) { - for (i, waiter) in latch.waiters.lock().as_ref().unwrap().iter().enumerate() { + for (i, waiter) in latch.inner.lock().as_ref().unwrap().waiters.iter().enumerate() { if let Some(waiter_query) = waiter.query { // Return a value which indicates that this waiter can be resumed visit(waiter.span, waiter_query).map_break(|_| Some((query, i)))?; @@ -217,7 +216,7 @@ fn connected_to_root<'tcx>( fn remove_cycle<'tcx>( job_map: &QueryJobMap<'tcx>, jobs: &mut Vec, - wakelist: &mut Vec>>, + wakelist: &mut Vec, ) -> bool { let mut visited = FxHashSet::default(); let mut stack = Vec::new(); @@ -306,11 +305,15 @@ fn remove_cycle<'tcx>( // edge which is resumable / waited using a query latch let (waitee_query, waiter_idx) = waiter.unwrap(); - // Extract the waiter we want to resume - let waiter = job_map.latch_of(waitee_query).unwrap().extract_waiter(waiter_idx); + let latch = job_map.latch_of(waitee_query).unwrap(); + let mut latch_state_lock = latch.inner.lock(); + let latch_state = latch_state_lock.as_mut().expect("non-empty waiters vec"); + + // Remove the waiter from the list of waiters we want to resume + let waiter = latch_state.waiters.remove(waiter_idx); // Set the cycle error so it will be picked up when resumed - *waiter.cycle.lock() = Some(error); + latch_state.cycle = Some(error); // Put the waiter on the list of things to resume wakelist.push(waiter);