Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/active_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,6 @@ impl ActiveQuery {
}
}

pub(super) fn iteration_count(&self) -> IterationCount {
self.iteration_count
}

pub(crate) fn tracked_struct_ids(&self) -> &IdentityMap {
&self.tracked_struct_ids
}
Expand Down
19 changes: 15 additions & 4 deletions src/function/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,19 @@ where
/// * `db`, the database.
/// * `active_query`, the active stack frame for the query to execute.
/// * `opt_old_memo`, the older memo, if any existed. Used for backdating.
///
/// # Returns
/// The newly computed memo or `None` if this query is part of a larger cycle
/// and `execute` blocked on a cycle head running on another thread. In this case,
/// the memo is potentially outdated and needs to be refetched.
#[inline(never)]
pub(super) fn execute<'db>(
&'db self,
db: &'db C::DbView,
mut claim_guard: ClaimGuard<'db>,
zalsa_local: &'db ZalsaLocal,
opt_old_memo: Option<&Memo<'db, C>>,
) -> &'db Memo<'db, C> {
) -> Option<&'db Memo<'db, C>> {
let database_key_index = claim_guard.database_key_index();
let zalsa = claim_guard.zalsa();

Expand Down Expand Up @@ -80,7 +85,7 @@ where
// We need to mark the memo as finalized so other cycle participants that have fallbacks
// will be verified (participants that don't have fallbacks will not be verified).
memo.revisions.verified_final.store(true, Ordering::Release);
return memo;
return Some(memo);
}

// If we're in the middle of a cycle and we have a fallback, use it instead.
Expand Down Expand Up @@ -125,7 +130,7 @@ where
self.diff_outputs(zalsa, database_key_index, old_memo, &completed_query);
}

self.insert_memo(
let memo = self.insert_memo(
zalsa,
id,
Memo::new(
Expand All @@ -134,7 +139,13 @@ where
completed_query.revisions,
),
memo_ingredient_index,
)
);

if claim_guard.drop() {
None
} else {
Some(memo)
}
}

fn execute_maybe_iterate<'db>(
Expand Down
46 changes: 4 additions & 42 deletions src/function/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,20 +58,11 @@ where
id: Id,
) -> &'db Memo<'db, C> {
let memo_ingredient_index = self.memo_ingredient_index(zalsa, id);
let mut retry_count = 0;

loop {
if let Some(memo) = self
.fetch_hot(zalsa, id, memo_ingredient_index)
.or_else(|| {
self.fetch_cold_with_retry(
zalsa,
zalsa_local,
db,
id,
memo_ingredient_index,
&mut retry_count,
)
})
.or_else(|| self.fetch_cold(zalsa, zalsa_local, db, id, memo_ingredient_index))
{
return memo;
}
Expand Down Expand Up @@ -104,33 +95,6 @@ where
}
}

fn fetch_cold_with_retry<'db>(
&'db self,
zalsa: &'db Zalsa,
zalsa_local: &'db ZalsaLocal,
db: &'db C::DbView,
id: Id,
memo_ingredient_index: MemoIngredientIndex,
retry_count: &mut u32,
) -> Option<&'db Memo<'db, C>> {
let memo = self.fetch_cold(zalsa, zalsa_local, db, id, memo_ingredient_index)?;

// If we get back a provisional cycle memo, and it's provisional on any cycle heads
// that are claimed by a different thread, we can't propagate the provisional memo
// any further (it could escape outside the cycle); we need to block on the other
// thread completing fixpoint iteration of the cycle, and then we can re-query for
// our no-longer-provisional memo.
// That is only correct for fixpoint cycles, though: `FallbackImmediate` cycles
// never have provisional entries.
if C::CYCLE_STRATEGY == CycleRecoveryStrategy::FallbackImmediate
|| !memo.provisional_retry(zalsa, zalsa_local, self.database_key_index(id), retry_count)
{
Some(memo)
} else {
None
}
}

fn fetch_cold<'db>(
&'db self,
zalsa: &'db Zalsa,
Expand All @@ -151,7 +115,7 @@ where

if let Some(memo) = memo {
if memo.value.is_some() {
memo.block_on_heads(zalsa, zalsa_local);
memo.block_on_heads(zalsa);
}
}
}
Expand Down Expand Up @@ -212,9 +176,7 @@ where
}
}

let memo = self.execute(db, claim_guard, zalsa_local, opt_old_memo);

Some(memo)
self.execute(db, claim_guard, zalsa_local, opt_old_memo)
}

#[cold]
Expand Down
4 changes: 2 additions & 2 deletions src/function/maybe_changed_after.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ where
// `in_cycle` tracks if the enclosing query is in a cycle. `deep_verify.cycle_heads` tracks
// if **this query** encountered a cycle (which means there's some provisional value somewhere floating around).
if old_memo.value.is_some() && !cycle_heads.has_any() {
let memo = self.execute(db, claim_guard, zalsa_local, Some(old_memo));
let memo = self.execute(db, claim_guard, zalsa_local, Some(old_memo))?;
let changed_at = memo.revisions.changed_at;

// Always assume that a provisional value has changed.
Expand Down Expand Up @@ -500,7 +500,7 @@ where
return on_stack;
}

let cycle_heads_iter = TryClaimCycleHeadsIter::new(zalsa, zalsa_local, cycle_heads);
let cycle_heads_iter = TryClaimCycleHeadsIter::new(zalsa, cycle_heads);

for cycle_head in cycle_heads_iter {
match cycle_head {
Expand Down
89 changes: 10 additions & 79 deletions src/function/memo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::runtime::Running;
use crate::sync::atomic::Ordering;
use crate::table::memo::MemoTableWithTypesMut;
use crate::zalsa::{MemoIngredientIndex, Zalsa};
use crate::zalsa_local::{QueryOriginRef, QueryRevisions, ZalsaLocal};
use crate::zalsa_local::{QueryOriginRef, QueryRevisions};
use crate::{Event, EventKind, Id, Revision};

impl<C: Configuration> IngredientImpl<C> {
Expand Down Expand Up @@ -132,50 +132,12 @@ impl<'db, C: Configuration> Memo<'db, C> {
!self.revisions.verified_final.load(Ordering::Relaxed)
}

/// Invoked when `refresh_memo` is about to return a memo to the caller; if that memo is
/// provisional, and its cycle head is claimed by another thread, we need to wait for that
/// other thread to complete the fixpoint iteration, and then retry fetching our own memo.
///
/// Return `true` if the caller should retry, `false` if the caller should go ahead and return
/// this memo to the caller.
#[inline(always)]
pub(super) fn provisional_retry(
&self,
zalsa: &Zalsa,
zalsa_local: &ZalsaLocal,
database_key_index: DatabaseKeyIndex,
retry_count: &mut u32,
) -> bool {
if self.block_on_heads(zalsa, zalsa_local) {
// If we get here, we are a provisional value of
// the cycle head (either initial value, or from a later iteration) and should be
// returned to caller to allow fixpoint iteration to proceed.
false
} else {
assert!(
*retry_count <= 20000,
"Provisional memo retry limit exceeded for {database_key_index:?}; \
this usually indicates a bug in salsa's cycle caching/locking. \
(retried {retry_count} times)",
);

*retry_count += 1;

// all our cycle heads are complete; re-fetch
// and we should get a non-provisional memo.
crate::tracing::debug!(
"Retrying provisional memo {database_key_index:?} after awaiting cycle heads."
);
true
}
}

/// Blocks on all cycle heads (recursively) that this memo depends on.
///
/// Returns `true` if awaiting all cycle heads results in a cycle. This means, they're all waiting
/// for us to make progress.
#[inline(always)]
pub(super) fn block_on_heads(&self, zalsa: &Zalsa, zalsa_local: &ZalsaLocal) -> bool {
pub(super) fn block_on_heads(&self, zalsa: &Zalsa) -> bool {
// IMPORTANT: If you make changes to this function, make sure to run `cycle_nested_deep` with
// shuttle with at least 10k iterations.

Expand All @@ -184,16 +146,12 @@ impl<'db, C: Configuration> Memo<'db, C> {
return true;
}

return block_on_heads_cold(zalsa, zalsa_local, cycle_heads);
return block_on_heads_cold(zalsa, cycle_heads);

#[inline(never)]
fn block_on_heads_cold(
zalsa: &Zalsa,
zalsa_local: &ZalsaLocal,
heads: &CycleHeads,
) -> bool {
fn block_on_heads_cold(zalsa: &Zalsa, heads: &CycleHeads) -> bool {
let _entered = crate::tracing::debug_span!("block_on_heads").entered();
let cycle_heads = TryClaimCycleHeadsIter::new(zalsa, zalsa_local, heads);
let cycle_heads = TryClaimCycleHeadsIter::new(zalsa, heads);
let mut all_cycles = true;

for claim_result in cycle_heads {
Expand Down Expand Up @@ -447,6 +405,7 @@ mod persistence {
}
}

#[derive(Debug)]
pub(super) enum TryClaimHeadsResult<'me> {
/// Claiming the cycle head results in a cycle.
Cycle {
Expand All @@ -465,19 +424,15 @@ pub(super) enum TryClaimHeadsResult<'me> {
/// Iterator to try claiming the transitive cycle heads of a memo.
pub(super) struct TryClaimCycleHeadsIter<'a> {
zalsa: &'a Zalsa,
zalsa_local: &'a ZalsaLocal,

cycle_heads: CycleHeadsIterator<'a>,
}

impl<'a> TryClaimCycleHeadsIter<'a> {
pub(super) fn new(
zalsa: &'a Zalsa,
zalsa_local: &'a ZalsaLocal,
cycle_heads: &'a CycleHeads,
) -> Self {
pub(super) fn new(zalsa: &'a Zalsa, cycle_heads: &'a CycleHeads) -> Self {
Self {
zalsa,
zalsa_local,

cycle_heads: cycle_heads.iter(),
}
}
Expand All @@ -488,31 +443,7 @@ impl<'me> Iterator for TryClaimCycleHeadsIter<'me> {

fn next(&mut self) -> Option<Self::Item> {
let head = self.cycle_heads.next()?;

let head_database_key = head.database_key_index;
let head_iteration_count = head.iteration_count.load();

// The most common case is that the head is already in the query stack. So let's check that first.
// SAFETY: We do not access the query stack reentrantly.
if let Some(current_iteration_count) = unsafe {
self.zalsa_local.with_query_stack_unchecked(|stack| {
stack
.iter()
.rev()
.find(|query| query.database_key_index == head_database_key)
.map(|query| query.iteration_count())
})
} {
crate::tracing::trace!(
"Waiting for {head_database_key:?} results in a cycle (because it is already in the query stack)"
);
return Some(TryClaimHeadsResult::Cycle {
head_iteration_count,
memo_iteration_count: current_iteration_count,
verified_at: self.zalsa.current_revision(),
});
}

let head_key_index = head_database_key.key_index();
let ingredient = self
.zalsa
Expand Down Expand Up @@ -543,7 +474,7 @@ impl<'me> Iterator for TryClaimCycleHeadsIter<'me> {

Some(TryClaimHeadsResult::Cycle {
memo_iteration_count: current_iteration_count,
head_iteration_count,
head_iteration_count: head.iteration_count.load(),
verified_at,
})
}
Expand Down
Loading
Loading