Skip to content

Commit

Permalink
Add run_until_cancelled_owned to CancellationToken
Browse files Browse the repository at this point in the history
  • Loading branch information
tysen committed Jan 9, 2025
1 parent 5c8cd33 commit a99e2bc
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 0 deletions.
48 changes: 48 additions & 0 deletions tokio-util/src/sync/cancellation_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,54 @@ impl CancellationToken {
}
.await
}

/// Runs a future to completion and returns its result wrapped inside of an `Option`
/// unless the `CancellationToken` is cancelled. In that case the function returns
/// `None` and the future gets dropped.
///
/// The function takes self by value.
///
/// # Cancel safety
///
/// This method is only cancel safe if `fut` is cancel safe.
pub async fn run_until_cancelled_owned<F>(self, fut: F) -> Option<F::Output>
where
F: Future,
{
pin_project! {
/// A Future that is resolved once the corresponding [`CancellationToken`]
/// is cancelled or a given Future gets resolved. It is biased towards the
/// Future completion.
#[must_use = "futures do nothing unless polled"]
struct RunUntilCancelledFuture<F: Future> {
#[pin]
cancellation: WaitForCancellationFutureOwned,
#[pin]
future: F,
}
}

impl<F: Future> Future for RunUntilCancelledFuture<F> {
type Output = Option<F::Output>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(res) = this.future.poll(cx) {
Poll::Ready(Some(res))
} else if this.cancellation.poll(cx).is_ready() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}

RunUntilCancelledFuture {
cancellation: self.cancelled_owned(),
future: fut,
}
.await
}
}

// ===== impl WaitForCancellationFuture =====
Expand Down
48 changes: 48 additions & 0 deletions tokio-util/tests/sync_cancellation_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,51 @@ fn run_until_cancelled_test() {
);
}
}

#[test]
fn run_until_cancelled_owned_test() {
let (waker, _) = new_count_waker();

{
let token = CancellationToken::new();
let to_cancel = token.clone();

let fut = token.run_until_cancelled_owned(std::future::pending::<()>());
pin!(fut);

assert_eq!(
Poll::Pending,
fut.as_mut().poll(&mut Context::from_waker(&waker))
);

to_cancel.cancel();

assert_eq!(
Poll::Ready(None),
fut.as_mut().poll(&mut Context::from_waker(&waker))
);
}

{
let (tx, rx) = oneshot::channel::<()>();

let token = CancellationToken::new();
let fut = token.run_until_cancelled_owned(async move {
rx.await.unwrap();
42
});
pin!(fut);

assert_eq!(
Poll::Pending,
fut.as_mut().poll(&mut Context::from_waker(&waker))
);

tx.send(()).unwrap();

assert_eq!(
Poll::Ready(Some(42)),
fut.as_mut().poll(&mut Context::from_waker(&waker))
);
}
}

0 comments on commit a99e2bc

Please sign in to comment.