Skip to content

Commit

Permalink
sync: always drop message in destructor for oneshot receiver (#6558)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored May 15, 2024
1 parent df77063 commit 18e048d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 2 deletions.
22 changes: 20 additions & 2 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,14 @@ impl<T> Receiver<T> {
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(inner) = self.inner.as_ref() {
inner.close();
let state = inner.close();

if state.is_complete() {
// SAFETY: we have ensured that the `VALUE_SENT` bit has been set,
// so only the receiver can access the value.
drop(unsafe { inner.consume_value() });
}

#[cfg(all(tokio_unstable, feature = "tracing"))]
self.resource_span.in_scope(|| {
tracing::trace!(
Expand Down Expand Up @@ -1202,14 +1209,16 @@ impl<T> Inner<T> {
}

/// Called by `Receiver` to indicate that the value will never be received.
fn close(&self) {
fn close(&self) -> State {
let prev = State::set_closed(&self.state);

if prev.is_tx_task_set() && !prev.is_complete() {
unsafe {
self.tx_task.with_task(Waker::wake_by_ref);
}
}

prev
}

/// Consumes the value. This function does not check `state`.
Expand Down Expand Up @@ -1248,6 +1257,15 @@ impl<T> Drop for Inner<T> {
self.tx_task.drop_task();
}
}

// SAFETY: we have `&mut self`, and therefore we have
// exclusive access to the value.
unsafe {
// Note: the assertion holds because if the value has been sent by sender,
// we must ensure that the value must have been consumed by the receiver before
// dropping the `Inner`.
debug_assert!(self.consume_value().is_none());
}
}
}

Expand Down
47 changes: 47 additions & 0 deletions tokio/src/sync/tests/loom_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,50 @@ fn changing_tx_task() {
}
});
}

#[test]
fn checking_tx_send_ok_not_drop() {
use std::borrow::Borrow;
use std::cell::Cell;

loom::thread_local! {
static IS_RX: Cell<bool> = Cell::new(true);
}

struct Msg;

impl Drop for Msg {
fn drop(&mut self) {
IS_RX.with(|is_rx: &Cell<_>| {
// On `tx.send(msg)` returning `Err(msg)`,
// we call `std::mem::forget(msg)`, so that
// `drop` is not expected to be called in the
// tx thread.
assert!(is_rx.get());
});
}
}

let mut builder = loom::model::Builder::new();
builder.preemption_bound = Some(2);

builder.check(|| {
let (tx, rx) = oneshot::channel();

// tx thread
let tx_thread_join_handle = thread::spawn(move || {
// Ensure that `Msg::drop` in this thread will see is_rx == false
IS_RX.with(|is_rx: &Cell<_>| {
is_rx.set(false);
});
if let Err(msg) = tx.send(Msg) {
std::mem::forget(msg);
}
});

// main thread is the rx thread
drop(rx);

tx_thread_join_handle.join().unwrap();
});
}

0 comments on commit 18e048d

Please sign in to comment.