From c07257f99f92c5d6773fc0827bcfd77e43f0fd46 Mon Sep 17 00:00:00 2001 From: Nur Date: Thu, 21 Nov 2024 22:31:39 +0600 Subject: [PATCH] io: simplify io readiness logic (#6966) --- tokio/src/runtime/io/scheduled_io.rs | 77 +++++++++------------------- 1 file changed, 23 insertions(+), 54 deletions(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index ee6977c00e7..af57caed460 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -206,43 +206,23 @@ impl ScheduledIo { /// specific tick. /// - `f`: a closure returning a new readiness value given the previous /// readiness. - pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) { - let mut current = self.readiness.load(Acquire); - - // If the io driver is shut down, then you are only allowed to clear readiness. - debug_assert!(SHUTDOWN.unpack(current) == 0 || matches!(tick, Tick::Clear(_))); - - loop { - // Mask out the tick bits so that the modifying function doesn't see - // them. - let current_readiness = Ready::from_usize(current); - let new = f(current_readiness); - - let new_tick = match tick { - Tick::Set => { - let current = TICK.unpack(current); - current.wrapping_add(1) % (TICK.max_value() + 1) - } - Tick::Clear(t) => { - if TICK.unpack(current) as u8 != t { - // Trying to clear readiness with an old event! - return; - } - - t as usize - } + pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) { + let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| { + // If the io driver is shut down, then you are only allowed to clear readiness. + debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_))); + + const MAX_TICK: usize = TICK.max_value() + 1; + let tick = TICK.unpack(curr); + + let new_tick = match tick_op { + // Trying to clear readiness with an old event! + Tick::Clear(t) if tick as u8 != t => return None, + Tick::Clear(t) => t as usize, + Tick::Set => tick.wrapping_add(1) % MAX_TICK, }; - let next = TICK.pack(new_tick, new.as_usize()); - - match self - .readiness - .compare_exchange(current, next, AcqRel, Acquire) - { - Ok(_) => return, - // we lost the race, retry! - Err(actual) => current = actual, - } - } + let ready = Ready::from_usize(READINESS.unpack(curr)); + Some(TICK.pack(new_tick, f(ready).as_usize())) + }); } /// Notifies all pending waiters that have registered interest in `ready`. @@ -335,22 +315,16 @@ impl ScheduledIo { if ready.is_empty() && !is_shutdown { // Update the task info let mut waiters = self.waiters.lock(); - let slot = match direction { + let waker = match direction { Direction::Read => &mut waiters.reader, Direction::Write => &mut waiters.writer, }; // Avoid cloning the waker if one is already stored that matches the // current task. - match slot { - Some(existing) => { - if !existing.will_wake(cx.waker()) { - existing.clone_from(cx.waker()); - } - } - None => { - *slot = Some(cx.waker().clone()); - } + match waker { + Some(waker) => waker.clone_from(cx.waker()), + None => *waker = Some(cx.waker().clone()), } // Try again, in case the readiness was changed while we were @@ -465,12 +439,11 @@ impl Future for Readiness<'_> { State::Init => { // Optimistically check existing readiness let curr = scheduled_io.readiness.load(SeqCst); - let ready = Ready::from_usize(READINESS.unpack(curr)); let is_shutdown = SHUTDOWN.unpack(curr) != 0; // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; - let ready = ready.intersection(interest); + let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest); if !ready.is_empty() || is_shutdown { // Currently ready! @@ -538,10 +511,7 @@ impl Future for Readiness<'_> { *state = State::Done; } else { // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); - } - + w.waker.as_mut().unwrap().clone_from(cx.waker()); return Poll::Pending; } @@ -566,8 +536,7 @@ impl Future for Readiness<'_> { // The readiness state could have been cleared in the meantime, // but we allow the returned ready set to be empty. - let curr_ready = Ready::from_usize(READINESS.unpack(curr)); - let ready = curr_ready.intersection(w.interest); + let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest); return Poll::Ready(ReadyEvent { tick,