Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 30 additions & 23 deletions compiler/rustc_middle/src/query/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,29 @@ impl<'tcx> QueryJob<'tcx> {
}

#[derive(Debug)]
pub struct QueryWaiter<'tcx> {
pub struct QueryWaiter {
pub query: Option<QueryJobId>,
pub condvar: Condvar,
pub condvar: Arc<Condvar>,
pub span: Span,
pub cycle: Mutex<Option<CycleError<QueryStackDeferred<'tcx>>>>,
}

#[derive(Clone, Debug)]
pub struct QueryLatch<'tcx> {
/// The `Option` is `Some(..)` when the job is active, and `None` once completed.
pub waiters: Arc<Mutex<Option<Vec<Arc<QueryWaiter<'tcx>>>>>>,
pub inner: Arc<Mutex<Option<QueryLatchState<'tcx>>>>,
}

#[derive(Debug)]
pub struct QueryLatchState<'tcx> {
pub waiters: Vec<QueryWaiter>,
pub cycle: Option<CycleError<QueryStackDeferred<'tcx>>>,
}

impl<'tcx> QueryLatch<'tcx> {
fn new() -> Self {
QueryLatch { waiters: Arc::new(Mutex::new(Some(Vec::new()))) }
QueryLatch {
inner: Arc::new(Mutex::new(Some(QueryLatchState { waiters: Vec::new(), cycle: None }))),
}
}

/// Awaits for the query job to complete.
Expand All @@ -95,45 +102,45 @@ impl<'tcx> QueryLatch<'tcx> {
query: Option<QueryJobId>,
span: Span,
) -> Result<(), CycleError<QueryStackDeferred<'tcx>>> {
let mut waiters_guard = self.waiters.lock();
let Some(waiters) = &mut *waiters_guard else {
let mut state_lock = self.inner.lock();
let Some(state) = &mut *state_lock else {
return Ok(()); // already complete
};

let waiter =
Arc::new(QueryWaiter { query, span, cycle: Mutex::new(None), condvar: Condvar::new() });
let condvar = Arc::new(Condvar::new());
let waiter = QueryWaiter { query, span, condvar: Arc::clone(&condvar) };

state.waiters.reserve(state.waiters.len().saturating_sub(tcx.sess.threads()));
// We push the waiter on to the `waiters` list. It can be accessed inside
// the `wait` call below, by 1) the `set` method or 2) by deadlock detection.
// Both of these will remove it from the `waiters` list before resuming
// this thread.
waiters.push(Arc::clone(&waiter));
state.waiters.push(waiter);

// Awaits the caller on this latch by blocking the current thread.
// If this detects a deadlock and the deadlock handler wants to resume this thread
// we have to be in the `wait` call. This is ensured by the deadlock handler
// getting the self.info lock.
rustc_thread_pool::mark_blocked();
tcx.jobserver_proxy.release_thread();
waiter.condvar.wait(&mut waiters_guard);
condvar.wait(&mut state_lock);
let cycle = state_lock
.as_mut()
.map(|s| s.cycle.take().expect("resumed waiter for unfinished query without a cycle"));
// Release the lock before we potentially block in `acquire_thread`
drop(waiters_guard);
drop(state_lock);
tcx.jobserver_proxy.acquire_thread();

// FIXME: Get rid of this lock. We have ownership of the QueryWaiter
// although another thread may still have a Arc reference so we cannot
// use Arc::get_mut
let mut cycle = waiter.cycle.lock();
match cycle.take() {
match cycle {
None => Ok(()),
Some(cycle) => Err(cycle),
}
}

/// Sets the latch and resumes all waiters on it
fn set(&self) {
let mut waiters_guard = self.waiters.lock();
let waiters = waiters_guard.take().unwrap(); // mark the latch as complete
let mut state_lock = self.inner.lock();
let waiters = state_lock.take().unwrap().waiters; // mark the latch as complete
let registry = rustc_thread_pool::Registry::current();
for waiter in waiters {
rustc_thread_pool::mark_unblocked(&registry);
Expand All @@ -143,10 +150,10 @@ impl<'tcx> QueryLatch<'tcx> {

/// Removes a single waiter from the list of waiters.
/// This is used to break query cycles.
pub fn extract_waiter(&self, waiter: usize) -> Arc<QueryWaiter<'tcx>> {
let mut waiters_guard = self.waiters.lock();
let waiters = waiters_guard.as_mut().expect("non-empty waiters vec");
pub fn extract_waiter(&self, waiter: usize) -> QueryWaiter {
let mut state_lock = self.inner.lock();
let state = state_lock.as_mut().expect("non-empty waiters vec");
// Remove the waiter from the list of waiters
waiters.remove(waiter)
state.waiters.remove(waiter)
}
}
15 changes: 9 additions & 6 deletions compiler/rustc_query_impl/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::io::Write;
use std::iter;
use std::ops::ControlFlow;
use std::sync::Arc;

use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_errors::{Diag, DiagCtxtHandle};
Expand Down Expand Up @@ -136,7 +135,7 @@ fn visit_waiters<'tcx>(

// Visit the explicit waiters which use condvars and are resumable
if let Some(latch) = job_map.latch_of(query) {
for (i, waiter) in latch.waiters.lock().as_ref().unwrap().iter().enumerate() {
for (i, waiter) in latch.inner.lock().as_ref().unwrap().waiters.iter().enumerate() {
if let Some(waiter_query) = waiter.query {
// Return a value which indicates that this waiter can be resumed
visit(waiter.span, waiter_query).map_break(|_| Some((query, i)))?;
Expand Down Expand Up @@ -217,7 +216,7 @@ fn connected_to_root<'tcx>(
fn remove_cycle<'tcx>(
job_map: &QueryJobMap<'tcx>,
jobs: &mut Vec<QueryJobId>,
wakelist: &mut Vec<Arc<QueryWaiter<'tcx>>>,
wakelist: &mut Vec<QueryWaiter>,
) -> bool {
let mut visited = FxHashSet::default();
let mut stack = Vec::new();
Expand Down Expand Up @@ -306,11 +305,15 @@ fn remove_cycle<'tcx>(
// edge which is resumable / waited using a query latch
let (waitee_query, waiter_idx) = waiter.unwrap();

// Extract the waiter we want to resume
let waiter = job_map.latch_of(waitee_query).unwrap().extract_waiter(waiter_idx);
let latch = job_map.latch_of(waitee_query).unwrap();
let mut latch_state_lock = latch.inner.lock();
let latch_state = latch_state_lock.as_mut().expect("non-empty waiters vec");

// Remove the waiter from the list of waiters we want to resume
let waiter = latch_state.waiters.remove(waiter_idx);

// Set the cycle error so it will be picked up when resumed
*waiter.cycle.lock() = Some(error);
latch_state.cycle = Some(error);

// Put the waiter on the list of things to resume
wakelist.push(waiter);
Expand Down
Loading