From c7c22a0ff887140d75e246895c46fb18e88e9ba5 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 12:46:21 +0800 Subject: [PATCH 01/12] feat(oneshot channel): ensure msg won't be dropped on oneshot sender when returning ok --- tokio/src/sync/oneshot.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 9e8c3fcb7f7..57a6c3dee6b 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -627,7 +627,22 @@ impl Sender { ) }); - Ok(()) + if let Some(inner) = Arc::into_inner(inner) { + if let Some(value) = inner.value.with_mut(|ptr| unsafe { + // SAFETY: we have successfully returned with `Some`, which means we are the + // only accessor the ptr. + // + // Note: value can be `None` even though we have previously set it as `Some`, + // because the value may have been consumed by receiver before we reach here. + (*ptr).take() + }) { + Err(value) + } else { + Ok(()) + } + } else { + Ok(()) + } } /// Waits for the associated [`Receiver`] handle to close. From 97e0afc371e80a5e0f3748a9bfcf545202650afc Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 14:18:03 +0800 Subject: [PATCH 02/12] rename and update comment --- tokio/src/sync/oneshot.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 57a6c3dee6b..1c0dd838769 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -628,15 +628,15 @@ impl Sender { }); if let Some(inner) = Arc::into_inner(inner) { - if let Some(value) = inner.value.with_mut(|ptr| unsafe { + if let Some(t) = inner.value.with_mut(|ptr| unsafe { // SAFETY: we have successfully returned with `Some`, which means we are the - // only accessor the ptr. + // only accessor to `ptr`. // // Note: value can be `None` even though we have previously set it as `Some`, // because the value may have been consumed by receiver before we reach here. (*ptr).take() }) { - Err(value) + Err(t) } else { Ok(()) } From 60bc7263f3853a1818e821200762b672ff2356e9 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 15:10:50 +0800 Subject: [PATCH 03/12] fix compile before 1.70 --- tokio/Cargo.toml | 3 ++- tokio/src/sync/oneshot.rs | 32 +++++++++++++++++++++----------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 020cc1e4ac2..9dc0cb40257 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -84,7 +84,7 @@ signal = [ "windows-sys/Win32_Foundation", "windows-sys/Win32_System_Console", ] -sync = [] +sync = ["rustversion"] test-util = ["rt", "sync", "time"] time = [] @@ -97,6 +97,7 @@ pin-project-lite = "0.2.11" bytes = { version = "1.0.0", optional = true } mio = { version = "0.8.9", optional = true, default-features = false } num_cpus = { version = "1.8.0", optional = true } +rustversion = { version = "1.0.16", optional = true } parking_lot = { version = "0.12.0", optional = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 1c0dd838769..6b8c5675a19 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -627,22 +627,32 @@ impl Sender { ) }); - if let Some(inner) = Arc::into_inner(inner) { - if let Some(t) = inner.value.with_mut(|ptr| unsafe { - // SAFETY: we have successfully returned with `Some`, which means we are the - // only accessor to `ptr`. - // - // Note: value can be `None` even though we have previously set it as `Some`, - // because the value may have been consumed by receiver before we reach here. - (*ptr).take() - }) { - Err(t) + #[rustversion::since(1.70)] + fn consume_inner(inner: Arc>) -> Result<(), T> { + if let Some(inner) = Arc::into_inner(inner) { + if let Some(t) = inner.value.with_mut(|ptr| unsafe { + // SAFETY: we have successfully returned with `Some`, which means we are the + // only accessor to `ptr`. + // + // Note: value can be `None` even though we have previously set it as `Some`, + // because the value may have been consumed by receiver before we reach here. + (*ptr).take() + }) { + Err(t) + } else { + Ok(()) + } } else { Ok(()) } - } else { + } + + #[rustversion::before(1.70)] + fn consume_inner(_inner: Arc>) -> Result<(), T> { Ok(()) } + + consume_inner(inner) } /// Waits for the associated [`Receiver`] handle to close. From b7c5c8c350eec28dda5c6644b461c5e96a7fcdda Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 15:30:04 +0800 Subject: [PATCH 04/12] use rustversion anyway --- tokio/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 9dc0cb40257..ae90677402b 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -84,7 +84,7 @@ signal = [ "windows-sys/Win32_Foundation", "windows-sys/Win32_System_Console", ] -sync = ["rustversion"] +sync = [] test-util = ["rt", "sync", "time"] time = [] @@ -92,12 +92,12 @@ time = [] tokio-macros = { version = "~2.2.0", path = "../tokio-macros", optional = true } pin-project-lite = "0.2.11" +rustversion = { version = "1.0.16" } # Everything else is optional... bytes = { version = "1.0.0", optional = true } mio = { version = "0.8.9", optional = true, default-features = false } num_cpus = { version = "1.8.0", optional = true } -rustversion = { version = "1.0.16", optional = true } parking_lot = { version = "0.12.0", optional = true } [target.'cfg(not(target_family = "wasm"))'.dependencies] From 605315a517d3db1f8cc6fcc000f2abbae1a88e90 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 15:39:30 +0800 Subject: [PATCH 05/12] disable new feature for loom --- tokio/src/sync/oneshot.rs | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 6b8c5675a19..dfaadcf38a9 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -629,20 +629,29 @@ impl Sender { #[rustversion::since(1.70)] fn consume_inner(inner: Arc>) -> Result<(), T> { - if let Some(inner) = Arc::into_inner(inner) { - if let Some(t) = inner.value.with_mut(|ptr| unsafe { - // SAFETY: we have successfully returned with `Some`, which means we are the - // only accessor to `ptr`. - // - // Note: value can be `None` even though we have previously set it as `Some`, - // because the value may have been consumed by receiver before we reach here. - (*ptr).take() - }) { - Err(t) + #[cfg(not(loom))] + { + if let Some(inner) = Arc::into_inner(inner) { + if let Some(t) = inner.value.with_mut(|ptr| unsafe { + // SAFETY: we have successfully returned with `Some`, which means we are the + // only accessor to `ptr`. + // + // Note: value can be `None` even though we have previously set it as `Some`, + // because the value may have been consumed by receiver before we reach here. + (*ptr).take() + }) { + Err(t) + } else { + Ok(()) + } } else { Ok(()) } - } else { + } + + #[cfg(loom)] + { + // The `loom::sync::Arc` does not implement `into_inner` yet. Ok(()) } } From 71d2f484e0e2382e85f985b8d0cde67e74ff3e2d Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 15:41:49 +0800 Subject: [PATCH 06/12] make loom happy --- tokio/src/sync/oneshot.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index dfaadcf38a9..5fdcd3765fe 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -651,6 +651,7 @@ impl Sender { #[cfg(loom)] { + drop(inner); // The `loom::sync::Arc` does not implement `into_inner` yet. Ok(()) } From 4d6e04e69688f8b14887e90453f95859cf8b693a Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 16:19:39 +0800 Subject: [PATCH 07/12] temp use patched rustversion --- tokio/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index ae90677402b..f02a57d8111 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -92,7 +92,7 @@ time = [] tokio-macros = { version = "~2.2.0", path = "../tokio-macros", optional = true } pin-project-lite = "0.2.11" -rustversion = { version = "1.0.16" } +rustversion = { git = "https://github.com/wenym1/rustversion", rev = "0b11410" } # Everything else is optional... bytes = { version = "1.0.0", optional = true } From f5acc3cec1c8cf08bdfce8bd5bc6b30b0b773632 Mon Sep 17 00:00:00 2001 From: William Wen Date: Tue, 14 May 2024 19:24:25 +0800 Subject: [PATCH 08/12] add loom test --- tokio/Cargo.toml | 3 +- tokio/src/sync/oneshot.rs | 32 +++++++----------- tokio/src/sync/tests/loom_oneshot.rs | 49 +++++++++++++++++++++++++++- 3 files changed, 61 insertions(+), 23 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index f02a57d8111..c0018d4c2d8 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -153,7 +153,8 @@ wasm-bindgen-test = "0.3.0" mio-aio = { version = "0.8.0", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] -loom = { version = "0.7", features = ["futures", "checkpoint"] } +loom = { git = "https://github.com/wenym1/loom", rev = "4edf9435", features = ["futures", "checkpoint"] } +#loom = { path = "/Users/william/repo/loom", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 5fdcd3765fe..6b8c5675a19 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -629,30 +629,20 @@ impl Sender { #[rustversion::since(1.70)] fn consume_inner(inner: Arc>) -> Result<(), T> { - #[cfg(not(loom))] - { - if let Some(inner) = Arc::into_inner(inner) { - if let Some(t) = inner.value.with_mut(|ptr| unsafe { - // SAFETY: we have successfully returned with `Some`, which means we are the - // only accessor to `ptr`. - // - // Note: value can be `None` even though we have previously set it as `Some`, - // because the value may have been consumed by receiver before we reach here. - (*ptr).take() - }) { - Err(t) - } else { - Ok(()) - } + if let Some(inner) = Arc::into_inner(inner) { + if let Some(t) = inner.value.with_mut(|ptr| unsafe { + // SAFETY: we have successfully returned with `Some`, which means we are the + // only accessor to `ptr`. + // + // Note: value can be `None` even though we have previously set it as `Some`, + // because the value may have been consumed by receiver before we reach here. + (*ptr).take() + }) { + Err(t) } else { Ok(()) } - } - - #[cfg(loom)] - { - drop(inner); - // The `loom::sync::Arc` does not implement `into_inner` yet. + } else { Ok(()) } } diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index c5f79720794..8a2cd86e171 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -1,4 +1,4 @@ -use crate::sync::oneshot; +use crate::sync::{mpsc, oneshot}; use futures::future::poll_fn; use loom::future::block_on; @@ -86,6 +86,8 @@ fn recv_closed() { // TODO: Move this into `oneshot` proper. +use crate::loom; +use crate::sync::oneshot::{Receiver, Sender}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -138,3 +140,48 @@ fn changing_tx_task() { } }); } + +#[test] +fn checking_tx_send_ok_not_drop() { + use std::cell::RefCell; + loom::thread_local! { + static IS_RX: RefCell = RefCell::new(true); + } + + struct Msg; + + impl Drop for Msg { + fn drop(&mut self) { + IS_RX.with(|is_rx: &RefCell<_>| { + // On `tx.send(msg)` returning `Err(msg)`, + // we call `std::mem::forget(msg)`, so that + // `drop` is not expected to be called in the + // tx thread. + assert!(*is_rx.borrow()); + }); + } + } + + loom::model(|| { + let (tx, rx) = oneshot::channel(); + + // tx thread + let tx_thread_join_handle = thread::spawn(move || { + // Ensure that `Msg::drop` in this thread will see is_rx == false + IS_RX.with(|is_rx: &RefCell<_>| { + *is_rx.borrow_mut() = false; + }); + if let Err(msg) = tx.send(Msg) { + std::mem::forget(msg); + } + }); + + // rx thread + let rx_thread_join_handle = thread::spawn(move || { + drop(rx); + }); + + tx_thread_join_handle.join().unwrap(); + rx_thread_join_handle.join().unwrap(); + }); +} From 76f4b49f98de8c7815da6e89e215bd7b6f6b0737 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 15 May 2024 00:47:21 +0800 Subject: [PATCH 09/12] drop value in rx --- tokio/Cargo.toml | 4 +-- tokio/src/sync/oneshot.rs | 52 ++++++++++++++++++--------------------- 2 files changed, 25 insertions(+), 31 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index c0018d4c2d8..020cc1e4ac2 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -92,7 +92,6 @@ time = [] tokio-macros = { version = "~2.2.0", path = "../tokio-macros", optional = true } pin-project-lite = "0.2.11" -rustversion = { git = "https://github.com/wenym1/rustversion", rev = "0b11410" } # Everything else is optional... bytes = { version = "1.0.0", optional = true } @@ -153,8 +152,7 @@ wasm-bindgen-test = "0.3.0" mio-aio = { version = "0.8.0", features = ["tokio"] } [target.'cfg(loom)'.dev-dependencies] -loom = { git = "https://github.com/wenym1/loom", rev = "4edf9435", features = ["futures", "checkpoint"] } -#loom = { path = "/Users/william/repo/loom", features = ["futures", "checkpoint"] } +loom = { version = "0.7", features = ["futures", "checkpoint"] } [package.metadata.docs.rs] all-features = true diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 6b8c5675a19..f954811f0ab 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -627,32 +627,7 @@ impl Sender { ) }); - #[rustversion::since(1.70)] - fn consume_inner(inner: Arc>) -> Result<(), T> { - if let Some(inner) = Arc::into_inner(inner) { - if let Some(t) = inner.value.with_mut(|ptr| unsafe { - // SAFETY: we have successfully returned with `Some`, which means we are the - // only accessor to `ptr`. - // - // Note: value can be `None` even though we have previously set it as `Some`, - // because the value may have been consumed by receiver before we reach here. - (*ptr).take() - }) { - Err(t) - } else { - Ok(()) - } - } else { - Ok(()) - } - } - - #[rustversion::before(1.70)] - fn consume_inner(_inner: Arc>) -> Result<(), T> { - Ok(()) - } - - consume_inner(inner) + Ok(()) } /// Waits for the associated [`Receiver`] handle to close. @@ -1097,7 +1072,16 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { if let Some(inner) = self.inner.as_ref() { - inner.close(); + let state = inner.close(); + + if state.is_complete() { + drop(unsafe { + // SAFETY: we have ensured that the `VALUE_SENT` bit has been set, + // so only the receiver can access the value. + inner.consume_value() + }); + } + #[cfg(all(tokio_unstable, feature = "tracing"))] self.resource_span.in_scope(|| { tracing::trace!( @@ -1227,7 +1211,7 @@ impl Inner { } /// Called by `Receiver` to indicate that the value will never be received. - fn close(&self) { + fn close(&self) -> State { let prev = State::set_closed(&self.state); if prev.is_tx_task_set() && !prev.is_complete() { @@ -1235,6 +1219,8 @@ impl Inner { self.tx_task.with_task(Waker::wake_by_ref); } } + + prev } /// Consumes the value. This function does not check `state`. @@ -1273,6 +1259,16 @@ impl Drop for Inner { self.tx_task.drop_task(); } } + + unsafe { + // SAFETY: we have `&mut self`, and therefore we have + // exclusive access to the value. + // + // Note: the assertion holds because if the value has been sent by sender, + // we must ensure that the value must have been consumed by the receiver before + // dropping the `Inner`. + debug_assert!(self.consume_value().is_none()); + } } } From d94f3735cece41d965d29cce95de9546d14e2304 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 15 May 2024 18:17:21 +0800 Subject: [PATCH 10/12] fix unsafe doc and apply comment --- tokio/src/sync/oneshot.rs | 13 +++++-------- tokio/src/sync/tests/loom_oneshot.rs | 18 +++++++++--------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index f954811f0ab..ab29b3e3edd 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -1075,11 +1075,9 @@ impl Drop for Receiver { let state = inner.close(); if state.is_complete() { - drop(unsafe { - // SAFETY: we have ensured that the `VALUE_SENT` bit has been set, - // so only the receiver can access the value. - inner.consume_value() - }); + // SAFETY: we have ensured that the `VALUE_SENT` bit has been set, + // so only the receiver can access the value. + drop(unsafe { inner.consume_value() }); } #[cfg(all(tokio_unstable, feature = "tracing"))] @@ -1260,10 +1258,9 @@ impl Drop for Inner { } } + // SAFETY: we have `&mut self`, and therefore we have + // exclusive access to the value. unsafe { - // SAFETY: we have `&mut self`, and therefore we have - // exclusive access to the value. - // // Note: the assertion holds because if the value has been sent by sender, // we must ensure that the value must have been consumed by the receiver before // dropping the `Inner`. diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index 8a2cd86e171..c2c6523ee92 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -1,4 +1,4 @@ -use crate::sync::{mpsc, oneshot}; +use crate::sync::oneshot; use futures::future::poll_fn; use loom::future::block_on; @@ -86,8 +86,6 @@ fn recv_closed() { // TODO: Move this into `oneshot` proper. -use crate::loom; -use crate::sync::oneshot::{Receiver, Sender}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -143,21 +141,23 @@ fn changing_tx_task() { #[test] fn checking_tx_send_ok_not_drop() { - use std::cell::RefCell; + use std::borrow::Borrow; + use std::cell::Cell; + loom::thread_local! { - static IS_RX: RefCell = RefCell::new(true); + static IS_RX: Cell = Cell::new(true); } struct Msg; impl Drop for Msg { fn drop(&mut self) { - IS_RX.with(|is_rx: &RefCell<_>| { + IS_RX.with(|is_rx: &Cell<_>| { // On `tx.send(msg)` returning `Err(msg)`, // we call `std::mem::forget(msg)`, so that // `drop` is not expected to be called in the // tx thread. - assert!(*is_rx.borrow()); + assert!(is_rx.get()); }); } } @@ -168,8 +168,8 @@ fn checking_tx_send_ok_not_drop() { // tx thread let tx_thread_join_handle = thread::spawn(move || { // Ensure that `Msg::drop` in this thread will see is_rx == false - IS_RX.with(|is_rx: &RefCell<_>| { - *is_rx.borrow_mut() = false; + IS_RX.with(|is_rx: &Cell<_>| { + is_rx.set(false); }); if let Err(msg) = tx.send(Msg) { std::mem::forget(msg); From aaf09f01a020c6a9e93ddcb845bd381f658b47bd Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 15 May 2024 18:45:26 +0800 Subject: [PATCH 11/12] drop rx in main --- tokio/src/sync/tests/loom_oneshot.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index c2c6523ee92..b2a67e3a009 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -176,12 +176,9 @@ fn checking_tx_send_ok_not_drop() { } }); - // rx thread - let rx_thread_join_handle = thread::spawn(move || { - drop(rx); - }); + // main thread is the rx thread + drop(rx); tx_thread_join_handle.join().unwrap(); - rx_thread_join_handle.join().unwrap(); }); } From 4a8f1ff66ea5cfed5c43152b93fe7057a2ae4981 Mon Sep 17 00:00:00 2001 From: William Wen Date: Wed, 15 May 2024 19:08:34 +0800 Subject: [PATCH 12/12] use test builder --- tokio/src/sync/tests/loom_oneshot.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/tests/loom_oneshot.rs b/tokio/src/sync/tests/loom_oneshot.rs index b2a67e3a009..717edcfd2a3 100644 --- a/tokio/src/sync/tests/loom_oneshot.rs +++ b/tokio/src/sync/tests/loom_oneshot.rs @@ -162,7 +162,10 @@ fn checking_tx_send_ok_not_drop() { } } - loom::model(|| { + let mut builder = loom::model::Builder::new(); + builder.preemption_bound = Some(2); + + builder.check(|| { let (tx, rx) = oneshot::channel(); // tx thread