Skip to content

Commit e8ddb4d

Browse files
authored
pref: Add SyncTable::peek_claim fast path for function::Ingredient::wait_for (#1011)
1 parent 25b3ef1 commit e8ddb4d

File tree

2 files changed

+78
-8
lines changed

2 files changed

+78
-8
lines changed

src/function.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,11 @@ where
428428
fn wait_for<'me>(&'me self, zalsa: &'me Zalsa, key_index: Id) -> WaitForResult<'me> {
429429
match self
430430
.sync_table
431-
.try_claim(zalsa, key_index, Reentrancy::Deny)
431+
.peek_claim(zalsa, key_index, Reentrancy::Deny)
432432
{
433433
ClaimResult::Running(blocked_on) => WaitForResult::Running(blocked_on),
434434
ClaimResult::Cycle { inner } => WaitForResult::Cycle { inner },
435-
ClaimResult::Claimed(_) => WaitForResult::Available,
435+
ClaimResult::Claimed(()) => WaitForResult::Available,
436436
}
437437
}
438438

src/function/sync.rs

Lines changed: 76 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ pub(crate) struct SyncTable {
2020
ingredient: IngredientIndex,
2121
}
2222

23-
pub(crate) enum ClaimResult<'a> {
23+
pub(crate) enum ClaimResult<'a, Guard = ClaimGuard<'a>> {
2424
/// Can't claim the query because it is running on an other thread.
2525
Running(Running<'a>),
2626
/// Claiming the query results in a cycle.
@@ -31,7 +31,7 @@ pub(crate) enum ClaimResult<'a> {
3131
inner: bool,
3232
},
3333
/// Successfully claimed the query.
34-
Claimed(ClaimGuard<'a>),
34+
Claimed(Guard),
3535
}
3636

3737
pub(crate) struct SyncState {
@@ -87,10 +87,7 @@ impl SyncTable {
8787
}
8888
};
8989

90-
let &mut SyncState {
91-
ref mut anyone_waiting,
92-
..
93-
} = occupied_entry.into_mut();
90+
let SyncState { anyone_waiting, .. } = occupied_entry.into_mut();
9491

9592
// NB: `Ordering::Relaxed` is sufficient here,
9693
// as there are no loads that are "gated" on this
@@ -125,6 +122,51 @@ impl SyncTable {
125122
}
126123
}
127124

125+
/// Claims the given key index, or blocks if it is running on another thread.
126+
pub(crate) fn peek_claim<'me>(
127+
&'me self,
128+
zalsa: &'me Zalsa,
129+
key_index: Id,
130+
reentrant: Reentrancy,
131+
) -> ClaimResult<'me, ()> {
132+
let mut write = self.syncs.lock();
133+
match write.entry(key_index) {
134+
std::collections::hash_map::Entry::Occupied(occupied_entry) => {
135+
let id = match occupied_entry.get().id {
136+
SyncOwner::Thread(id) => id,
137+
SyncOwner::Transferred => {
138+
return match self.peek_claim_transferred(zalsa, occupied_entry, reentrant) {
139+
Ok(claimed) => claimed,
140+
Err(other_thread) => match other_thread.block(write) {
141+
BlockResult::Cycle => ClaimResult::Cycle { inner: false },
142+
BlockResult::Running(running) => ClaimResult::Running(running),
143+
},
144+
}
145+
}
146+
};
147+
148+
let SyncState { anyone_waiting, .. } = occupied_entry.into_mut();
149+
150+
// NB: `Ordering::Relaxed` is sufficient here,
151+
// as there are no loads that are "gated" on this
152+
// value. Everything that is written is also protected
153+
// by a lock that must be acquired. The role of this
154+
// boolean is to decide *whether* to acquire the lock,
155+
// not to gate future atomic reads.
156+
*anyone_waiting = true;
157+
match zalsa.runtime().block(
158+
DatabaseKeyIndex::new(self.ingredient, key_index),
159+
id,
160+
write,
161+
) {
162+
BlockResult::Running(blocked_on) => ClaimResult::Running(blocked_on),
163+
BlockResult::Cycle => ClaimResult::Cycle { inner: false },
164+
}
165+
}
166+
std::collections::hash_map::Entry::Vacant(_) => ClaimResult::Claimed(()),
167+
}
168+
}
169+
128170
#[cold]
129171
#[inline(never)]
130172
fn try_claim_transferred<'me>(
@@ -179,6 +221,34 @@ impl SyncTable {
179221
}
180222
}
181223

224+
#[cold]
225+
#[inline(never)]
226+
fn peek_claim_transferred<'me>(
227+
&'me self,
228+
zalsa: &'me Zalsa,
229+
mut entry: OccupiedEntry<Id, SyncState>,
230+
reentrant: Reentrancy,
231+
) -> Result<ClaimResult<'me, ()>, Box<BlockOnTransferredOwner<'me>>> {
232+
let key_index = *entry.key();
233+
let database_key_index = DatabaseKeyIndex::new(self.ingredient, key_index);
234+
let thread_id = thread::current().id();
235+
236+
match zalsa
237+
.runtime()
238+
.block_transferred(database_key_index, thread_id)
239+
{
240+
BlockTransferredResult::ImTheOwner if reentrant.is_allow() => {
241+
Ok(ClaimResult::Claimed(()))
242+
}
243+
BlockTransferredResult::ImTheOwner => Ok(ClaimResult::Cycle { inner: true }),
244+
BlockTransferredResult::OwnedBy(other_thread) => {
245+
entry.get_mut().anyone_waiting = true;
246+
Err(other_thread)
247+
}
248+
BlockTransferredResult::Released => Ok(ClaimResult::Claimed(())),
249+
}
250+
}
251+
182252
/// Marks `key_index` as a transfer target.
183253
///
184254
/// Returns the `SyncOwnerId` of the thread that currently owns this query.

0 commit comments

Comments
 (0)