Skip to content

Commit

Permalink
Merge pull request #1 from asynchronics/feature/wait-until-or-timeout
Browse files Browse the repository at this point in the history
Make it possible to time out after a deadline
  • Loading branch information
sbarral authored May 15, 2024
2 parents b1596a3 + 7cde07e commit 84ea682
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
rust:
- stable
- 1.56.1
- 1.77.1
steps:
- name: Checkout sources
uses: actions/checkout@v3
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ name = "async-event"
version = "0.1.0"
authors = ["Serge Barral <[email protected]>"]
edition = "2021"
rust-version = "1.56"
rust-version = "1.77"
license = "MIT OR Apache-2.0"
repository = "https://github.com/asynchronics/async-event"
readme = "README.md"
Expand All @@ -18,6 +18,9 @@ An efficient async condition variable for lock-free algorithms.
categories = ["asynchronous", "concurrency"]
keywords = ["async", "event", "atomic", "futures"]

[dependencies]
pin-project-lite = "0.2"

[dev-dependencies]
tokio = { version = "1", features = ["full"] }
futures-executor = "0.3"
Expand Down
76 changes: 75 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
//! });
//! ```

// Temporary workaround until the `async_event_loom` flag can be whitelisted
// without a `build.rs` [1].
//
// [1]: (https://github.com/rust-lang/rust/issues/124800).
#![allow(unexpected_cfgs)]

mod loom_exports;

use std::future::Future;
Expand All @@ -81,6 +87,7 @@ use std::task::{Context, Poll, Waker};
use loom_exports::cell::UnsafeCell;
use loom_exports::sync::atomic::{self, AtomicBool};
use loom_exports::sync::Mutex;
use pin_project_lite::pin_project;

/// An object that can receive or send notifications.
pub struct Event {
Expand Down Expand Up @@ -130,9 +137,29 @@ impl Event {

/// Returns a future that can be `await`ed until the provided predicate is
/// satisfied.
pub fn wait_until<F: FnMut() -> Option<T>, T>(&self, predicate: F) -> WaitUntil<F, T> {
pub fn wait_until<F, T>(&self, predicate: F) -> WaitUntil<F, T>
where
F: FnMut() -> Option<T>,
{
WaitUntil::new(&self.wait_set, predicate)
}

/// Returns a future that can be `await`ed until the provided predicate is
/// satisfied or until the provided future completes.
///
/// The deadline is specified as a `Future` that is expected to resolves to
/// `()` after some duration, such as a `tokio::time::Sleep` future.
pub fn wait_until_or_timeout<F, T, D>(
&self,
predicate: F,
deadline: D,
) -> WaitUntilOrTimeout<F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
WaitUntilOrTimeout::new(&self.wait_set, predicate, deadline)
}
}

impl Default for Event {
Expand Down Expand Up @@ -378,6 +405,53 @@ enum WaitUntilState {
Completed,
}

pin_project! {
/// A future that can be `await`ed until a predicate is satisfied or until a
/// deadline elapses.
pub struct WaitUntilOrTimeout<'a, F: FnMut() -> Option<T>, T, D: Future<Output = ()>> {
wait_until: WaitUntil<'a, F, T>,
#[pin]
deadline: D,
}
}

impl<'a, F, T, D> WaitUntilOrTimeout<'a, F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
/// Creates a future associated with the specified event sink that can be
/// `await`ed until the specified predicate is satisfied, or until the
/// specified timeout future completes.
fn new(wait_set: &'a WaitSet, predicate: F, deadline: D) -> Self {
Self {
wait_until: WaitUntil::new(wait_set, predicate),
deadline,
}
}
}

impl<'a, F, T, D> Future for WaitUntilOrTimeout<'a, F, T, D>
where
F: FnMut() -> Option<T>,
D: Future<Output = ()>,
{
type Output = Option<T>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

if let Poll::Ready(value) = Pin::new(this.wait_until).poll(cx) {
Poll::Ready(Some(value))
} else if this.deadline.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}

/// A set of notifiers.
///
/// The set wraps a Mutex-protected list of notifiers and manages a flag for
Expand Down

0 comments on commit 84ea682

Please sign in to comment.