From 5c571aa2bbd903a84c0b84cde74b9f2a177bba7b Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Tue, 21 May 2024 11:23:03 +0800 Subject: [PATCH] Add sync primitives Notify and watch::channel --- Cargo.toml | 8 +- Makefile | 7 +- asyncs-sync/Cargo.toml | 17 + asyncs-sync/src/lib.rs | 7 + asyncs-sync/src/notify.rs | 1011 +++++++++++++++++++++++++++++++++++++ asyncs-sync/src/parker.rs | 231 +++++++++ asyncs-sync/src/watch.rs | 685 +++++++++++++++++++++++++ asyncs-test/Cargo.toml | 2 +- src/lib.rs | 3 +- 9 files changed, 1964 insertions(+), 7 deletions(-) create mode 100644 asyncs-sync/Cargo.toml create mode 100644 asyncs-sync/src/lib.rs create mode 100644 asyncs-sync/src/notify.rs create mode 100644 asyncs-sync/src/parker.rs create mode 100644 asyncs-sync/src/watch.rs diff --git a/Cargo.toml b/Cargo.toml index 0246365..b018142 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,11 +1,12 @@ [workspace] resolver = "2" -members = ["asyncs-test"] +members = ["asyncs-test", "asyncs-sync"] [workspace.package] license = "Apache-2.0" version = "0.2.1" edition = "2021" +authors = ["Kezhu Wang "] homepage = "https://github.com/kezhuw/asyncs" repository = "https://github.com/kezhuw/asyncs" documentation = "https://docs.rs/asyncs" @@ -37,8 +38,9 @@ spawns-compat = ["spawns-core/compat"] [dependencies] async-select = "0.2.0" asyncs-test = { version = "0.2.1", path = "./asyncs-test", optional = true } -spawns = { version = "0.2.3", optional = true } -spawns-core = "1.1.0" +asyncs-sync = { version = "0.2.1", path = "./asyncs-sync" } +spawns = { version = "0.2.4", optional = true } +spawns-core = "1.1.1" [dev-dependencies] test-case = "3.3.1" diff --git a/Makefile b/Makefile index 6ce1229..e98ea1c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -verify: check build test +verify: check build-all test check: check_fmt lint doc @@ -12,10 +12,13 @@ lint: cargo clippy --all-features --no-deps -- -D clippy::all build: + cargo build --workspace --all-features + +build-all: cargo build-all-features test: - cargo test --all-features + cargo test --workspace --all-features doc: cargo doc --all-features diff --git a/asyncs-sync/Cargo.toml b/asyncs-sync/Cargo.toml new file mode 100644 index 0000000..020ac48 --- /dev/null +++ b/asyncs-sync/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "asyncs-sync" +description = "Asynchronous runtime agnostic synchronization utilities" +readme.workspace = true +authors.workspace = true +edition.workspace = true +version.workspace = true +homepage.workspace = true +license.workspace = true +repository.workspace = true +documentation.workspace = true + +[dependencies] + +[dev-dependencies] +asyncs = { version = "0.2.1", path = "../", features = ["test"] } +futures = "0.3.30" diff --git a/asyncs-sync/src/lib.rs b/asyncs-sync/src/lib.rs new file mode 100644 index 0000000..95da45e --- /dev/null +++ b/asyncs-sync/src/lib.rs @@ -0,0 +1,7 @@ +//! Utilities to synchronize among asynchronous tasks + +mod notify; +mod parker; +pub mod watch; + +pub use notify::*; diff --git a/asyncs-sync/src/notify.rs b/asyncs-sync/src/notify.rs new file mode 100644 index 0000000..a87d7a7 --- /dev/null +++ b/asyncs-sync/src/notify.rs @@ -0,0 +1,1011 @@ +use std::future::Future; +use std::mem::MaybeUninit; +use std::pin::Pin; +use std::ptr; +use std::ptr::NonNull; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{self, *}; +use std::sync::Mutex; +use std::task::{Context, Poll, Waker}; + +use crate::parker::{Parking, WeakOrdering}; + +trait Node { + fn link(&mut self) -> &mut Link; +} + +struct Link { + next: Option>, + prev: Option>, +} + +impl Default for Link { + fn default() -> Self { + Self { next: None, prev: None } + } +} + +struct List { + head: Option>, + tail: Option>, +} + +impl Default for List { + fn default() -> Self { + Self { head: None, tail: None } + } +} + +impl List { + pub fn push_front(&mut self, node: &mut T) { + let ptr = unsafe { NonNull::new_unchecked(node as *const T as *mut T) }; + if let Some(mut head) = self.head { + unsafe { + head.as_mut().link().prev = Some(ptr); + } + } + let link = node.link(); + link.next = self.head; + link.prev = None; + self.head = Some(ptr); + if self.tail.is_none() { + self.tail = self.head; + } + } + + pub fn pop_back<'a>(&mut self) -> Option<&'a mut T> { + let node = match self.tail { + None => return None, + Some(mut ptr) => unsafe { ptr.as_mut() }, + }; + self.tail = node.link().prev; + match self.tail { + None => self.head = None, + Some(mut ptr) => unsafe { ptr.as_mut().link().next = None }, + } + Some(node) + } + + pub fn unlink(&mut self, node: &mut T) -> bool { + let ptr = unsafe { NonNull::new_unchecked(node as *const T as *mut T) }; + let link = node.link(); + + if let Some(mut next) = link.next { + unsafe { next.as_mut().link().prev = link.prev }; + } else if self.tail == Some(ptr) { + self.tail = link.prev; + } else { + return false; + } + + if let Some(mut prev) = link.prev { + unsafe { prev.as_mut().link().next = link.next }; + } else if self.head == Some(ptr) { + self.head = link.next; + } else { + return false; + } + + link.next = None; + link.prev = None; + + true + } + + pub fn is_empty(&self) -> bool { + self.head.is_none() + } +} + +struct GuardedList<'a, T> { + empty: bool, + guard: &'a mut T, +} + +impl<'a, T: Node> GuardedList<'a, T> { + pub fn new(list: List, guard: &'a mut T) -> Self { + let ptr = unsafe { NonNull::new_unchecked(guard as *mut T) }; + let link = guard.link(); + if list.is_empty() { + link.next = Some(ptr); + link.prev = Some(ptr); + } else { + link.next = list.head; + link.prev = list.tail; + unsafe { + list.head.unwrap_unchecked().as_mut().link().prev = Some(ptr); + list.tail.unwrap_unchecked().as_mut().link().next = Some(ptr); + } + } + Self { empty: false, guard } + } + + pub fn pop_back<'b>(&mut self) -> Option<&'b mut T> { + let addr = self.guard as *mut _; + let link = self.guard.link(); + let last = unsafe { link.prev.unwrap_unchecked().as_mut() }; + if ptr::addr_eq(addr, last) { + self.empty = true; + return None; + } + link.prev = last.link().prev; + last.link().next = unsafe { Some(NonNull::new_unchecked(addr)) }; + Some(last) + } + + pub fn is_empty(&self) -> bool { + self.empty + } +} + +struct WaiterList<'a> { + list: GuardedList<'a, Waiter>, + round: Round, + notify: &'a Notify, +} + +impl<'a> WaiterList<'a> { + pub fn new(list: GuardedList<'a, Waiter>, round: Round, notify: &'a Notify) -> Self { + Self { list, round, notify } + } + + pub fn pop_back<'b>(&mut self, _lock: &mut std::sync::MutexGuard<'_, List>) -> Option<&'b mut Waiter> { + self.list.pop_back() + } +} + +impl Drop for WaiterList<'_> { + fn drop(&mut self) { + if self.list.is_empty() { + return; + } + let _lock = self.notify.list.lock().unwrap(); + while let Some(waiter) = self.list.pop_back() { + waiter.notification.store(self.round.into_notification(NotificationKind::All), Release); + } + } +} + +const STATUS_MASK: usize = 3usize; + +const ROUND_UNIT: usize = STATUS_MASK + 1; +const ROUND_MASK: usize = !STATUS_MASK; + +#[derive(Copy, Clone, Debug, PartialEq)] +struct Round(usize); + +impl Round { + const ZERO: Round = Self(0); + + pub fn new() -> Self { + Self(ROUND_UNIT) + } + + pub fn into_notification(self, kind: NotificationKind) -> Notification { + Notification { kind, round: self } + } + + pub fn next(self) -> Self { + Self(self.0.wrapping_add(ROUND_UNIT)) + } + + pub fn into(self) -> usize { + self.0 + } + + pub fn from(i: usize) -> Self { + Self(i & ROUND_MASK) + } +} + +#[derive(Clone, Copy, Debug, PartialEq)] +struct State { + round: Round, + status: Status, +} + +#[derive(Clone, Copy, Debug, PartialEq)] +#[repr(usize)] +enum Status { + Idle = 0, + Waiting = 1, + Notified = 2, +} + +impl State { + pub fn new() -> Self { + Self { round: Round::new(), status: Status::Idle } + } + + pub fn with_status(self, status: Status) -> Self { + Self { round: self.round, status } + } + + pub fn with_round(self, round: Round) -> Self { + Self { round, status: self.status } + } + + pub fn next_round(self) -> Self { + self.with_round(self.round.next()) + } +} + +struct AtomicState(AtomicUsize); + +impl Default for AtomicState { + fn default() -> Self { + Self::new(State::new()) + } +} + +impl AtomicState { + pub fn new(state: State) -> Self { + Self(AtomicUsize::new(state.into())) + } + + pub fn store(&self, state: State, ordering: Ordering) { + self.0.store(state.into(), ordering) + } + + pub fn load(&self, ordering: Ordering) -> State { + let u = self.0.load(ordering); + State::from(u) + } + + pub fn compare_exchange( + &self, + current: State, + new: State, + success: Ordering, + failure: Ordering, + ) -> Result { + match self.0.compare_exchange(current.into(), new.into(), success, failure) { + Ok(_) => Ok(current), + Err(updated) => Err(State::from(updated)), + } + } +} + +impl From for usize { + fn from(state: State) -> usize { + state.round.into() | state.status as usize + } +} + +impl From for State { + fn from(i: usize) -> Self { + let status = i & STATUS_MASK; + Self { round: Round::from(i), status: unsafe { std::mem::transmute(status) } } + } +} + +#[derive(Clone, Copy, PartialEq)] +#[repr(usize)] +enum NotificationKind { + One = 0, + All = 1, +} + +#[derive(Clone, Copy)] +struct Notification { + kind: NotificationKind, + round: Round, +} + +impl From for usize { + fn from(notification: Notification) -> usize { + notification.round.into() | notification.kind as usize + } +} + +impl From for Notification { + fn from(u: usize) -> Self { + let kind = u & STATUS_MASK; + Self { kind: unsafe { std::mem::transmute(kind) }, round: Round::from(u) } + } +} + +#[derive(Default)] +struct AtomicNotification(AtomicUsize); + +impl AtomicNotification { + pub fn clear(&mut self) { + self.0.store(0, Relaxed) + } + + pub fn take(&mut self) -> Option { + let notification = std::mem::take(self); + notification.load(Relaxed) + } + + pub fn load(&self, ordering: Ordering) -> Option { + match self.0.load(ordering) { + 0 => None, + u => Some(Notification::from(u)), + } + } + + pub fn store(&self, notification: Notification, ordering: Ordering) { + self.0.store(notification.into(), ordering) + } +} + +/// Notifies one task or all attached tasks to wakeup. +/// +/// [notify_one] and [notified().await] behave similar to [Thread::unpark] and [thread::park] +/// except that [notified().await] will not be waked up spuriously. One could assume that there is +/// at most one permit associated with [Notify]. [notified().await] will block current task unless +/// or until the permit is available to consume. [notify_one] release the permit for [notified().await] +/// to acquire, it will wake up [Notified] in FIFO order if there are multiple [Notified]s blocking +/// for the permit. The order of [Notified]s are the order of [notified().await] or [Notified::enable()] +/// whichever first. +/// +/// [notify_all], on the other hand, will wake up all attached [Notified]s and start a fresh new round +/// for [notify_one] with no permit. [Notify::notified()]s are attached by default, one could use +/// [Notified::detach] to detach from rounds of [Notify] until [Notified::enable] or future polling. +/// +/// ## Differences with [tokio] +/// * [tokio::sync::Notify::notify_all()] does not clear permit from [notify_one]. +/// * [tokio] does not have [Notified::detach()]. +/// +/// [thread::park]: std::thread::park +/// [Thread::unpark]: std::thread::Thread::unpark +/// [notified().await]: Notify::notified() +/// [notify_one]: Notify::notify_one() +/// [notify_all]: Notify::notify_all() +/// [tokio]: https://docs.rs/tokio +/// [tokio::sync::Notify::notify_all()]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_one +#[derive(Default)] +pub struct Notify { + // All link operations are guarded by this lock including GuardedList which actually is an + // independent list. + list: Mutex>, + state: AtomicState, +} + +unsafe impl Send for Notify {} +unsafe impl Sync for Notify {} + +impl Notify { + /// Constructs a new [Notify]. + pub fn new() -> Self { + Self::default() + } + + /// Constructs a attached [Notified] to consume permit from [Notify::notify_one]. + pub fn notified(&self) -> Notified<'_> { + let round = self.round(); + Notified { notify: self, stage: Stage::default(), round, waiter: Waiter::default() } + } + + /// Notifies one waiting task or stores a permit to consume in case of no waiting task. + pub fn notify_one(&self) { + let state = self.state.load(SeqCst); + self.notify_one_in_round(state.round, state); + } + + /// Notifies all attached [Notified]s and starts a fresh new round with no permit. + pub fn notify_all(&self) { + let mut state = self.state.load(SeqCst); + loop { + while state.status != Status::Waiting { + match self.state.compare_exchange(state, state.next_round().with_status(Status::Idle), Release, Relaxed) + { + Ok(_) => return, + Err(updated) => state = updated, + } + } + let mut list = self.list.lock().unwrap(); + state = self.state.load(Relaxed); + if state.status != Status::Waiting { + drop(list); + continue; + } + + // Release store to publish changes. + self.state.store(state.next_round().with_status(Status::Idle), Release); + + let mut guard = Waiter::default(); + let mut wakers = WakerList::new(); + let mut waiters = + WaiterList::new(GuardedList::new(std::mem::take(&mut list), &mut guard), state.round, self); + + 'list: loop { + while !wakers.is_full() { + let Some(waiter) = waiters.pop_back(&mut list) else { + break 'list; + }; + let waker = unsafe { waiter.parking.unpark() }; + waiter.notification.store(state.round.into_notification(NotificationKind::All), Release); + if let Some(waker) = waker { + wakers.push(waker) + } + } + drop(list); + wakers.wake(); + list = self.list.lock().unwrap(); + } + drop(list); + wakers.wake(); + return; + } + } + + fn remove(&self, waiter: &mut Waiter) { + let notification = match waiter.notification.load(Acquire) { + None => { + let mut list = self.list.lock().unwrap(); + if list.unlink(waiter) && list.is_empty() { + let state = self.state.load(Relaxed); + if state.status == Status::Waiting { + self.state.store(state.with_status(Status::Idle), Relaxed); + } + } + drop(list); + // Relaxed load as nothing is important in case of drop. + let Some(notification) = waiter.notification.load(Relaxed) else { + return; + }; + notification + }, + Some(notification) => notification, + }; + if notification.kind == NotificationKind::One { + self.release_notification(notification.round); + } + } + + fn poll(&self, waiter: &mut Waiter, round: Round) -> Poll { + let mut state = self.state.load(SeqCst); + let round = if round == Round::ZERO { state.round } else { round }; + loop { + if state.round != round { + return Poll::Ready(round.into_notification(NotificationKind::All)); + } + if state.status != Status::Notified { + break; + } + // Acquire load to observe changes in case of `notify_all`. + match self.state.compare_exchange(state, state.with_status(Status::Idle), Acquire, Acquire) { + Ok(_) => return Poll::Ready(state.round.into_notification(NotificationKind::One)), + Err(updated) => state = updated, + } + } + let mut list = self.list.lock().unwrap(); + state = self.state.load(SeqCst); + loop { + if state.round != round { + drop(list); + return Poll::Ready(round.into_notification(NotificationKind::All)); + } + match state.status { + Status::Waiting => break, + Status::Idle => { + match self.state.compare_exchange(state, state.with_status(Status::Waiting), Relaxed, Relaxed) { + Ok(_) => break, + Err(updated) => state = updated, + } + }, + Status::Notified => { + match self.state.compare_exchange(state, state.with_status(Status::Idle), Acquire, Relaxed) { + Ok(_) => { + drop(list); + return Poll::Ready(state.round.into_notification(NotificationKind::One)); + }, + Err(updated) => state = updated, + } + }, + } + } + list.push_front(waiter); + drop(list); + Poll::Pending + } + + fn notify_one_in_round(&self, round: Round, mut state: State) { + loop { + loop { + // There are must be at least one `notify_all`, all waiters from this round must be + // notified. + if state.round != round { + return; + } + if state.status == Status::Waiting { + break; + } + // Release store to transfer happens-before relationship. + match self.state.compare_exchange(state, state.with_status(Status::Notified), Release, Relaxed) { + Ok(_) => return, + Err(updated) => state = updated, + } + } + let mut list = self.list.lock().unwrap(); + let state = self.state.load(Relaxed); + if state.round != round { + return; + } + if state.status != Status::Waiting { + drop(list); + continue; + } + let waiter = list.pop_back().unwrap(); + let waker = unsafe { waiter.parking.unpark() }; + waiter.notification.store(state.round.into_notification(NotificationKind::One), Release); + if list.is_empty() { + self.state.store(state.with_status(Status::Idle), Relaxed); + } + drop(list); + if let Some(waker) = waker { + waker.wake(); + } + return; + } + } + + fn round(&self) -> Round { + self.state.load(SeqCst).round + } + + fn release_notification(&self, round: Round) { + let state = self.state.load(SeqCst); + self.notify_one_in_round(round, state); + } +} + +struct WakerList { + next: usize, + wakers: [MaybeUninit; 32], +} + +impl WakerList { + pub fn new() -> Self { + Self { next: 0, wakers: std::array::from_fn(|_| MaybeUninit::uninit()) } + } + + pub fn is_full(&self) -> bool { + self.next == self.wakers.len() + } + + pub fn push(&mut self, waker: Waker) { + debug_assert!(self.next < self.wakers.len()); + self.wakers[self.next].write(waker); + self.next += 1; + } + + pub fn wake(&mut self) { + while self.next != 0 { + self.next -= 1; + let waker = unsafe { self.wakers[self.next].assume_init_read() }; + waker.wake(); + } + } +} + +impl Drop for WakerList { + fn drop(&mut self) { + while self.next != 0 { + self.next -= 1; + unsafe { + self.wakers[self.next].assume_init_drop(); + } + } + } +} + +struct Waiter { + link: Link, + parking: Parking, + + /// Release store to release connection to `Waiter`. + /// Acquire load to observe all changes. + notification: AtomicNotification, +} + +impl Default for Waiter { + fn default() -> Self { + Self { link: Link::default(), parking: Parking::new(), notification: AtomicNotification::default() } + } +} + +impl Node for Waiter { + fn link(&mut self) -> &mut Link { + &mut self.link + } +} + +#[repr(usize)] +#[derive(Default, Debug, Copy, Clone, PartialEq)] +enum Stage { + #[default] + Init = 0, + Waiting = 1, + Finished = 2, +} + +/// Future created from [Notify::notified()]. +pub struct Notified<'a> { + notify: &'a Notify, + + stage: Stage, + round: Round, + + waiter: Waiter, +} + +unsafe impl Send for Notified<'_> {} +unsafe impl Sync for Notified<'_> {} + +impl<'a> Notified<'a> { + /// Enables to wait for a notification from [Notify::notify_one] or [Notify::notify_all]. + /// + /// If there is permit from [Notify::notify_one], this will consume it temporarily for future + /// polling. If this [Notified] is dropped without further polling, the permit will be handed + /// over to [Notify] in case of no new [Notify::notify_all]. + /// + /// [Notified::poll] will enable this also. + pub fn enable(mut self: Pin<&mut Self>) { + if self.stage != Stage::Init { + return; + } + let round = self.round; + if let Poll::Ready(notification) = self.notify.poll(&mut self.waiter, round) { + self.stage = Stage::Finished; + self.waiter.notification.store(notification, Relaxed); + } else { + self.stage = Stage::Waiting; + } + } + + /// Detaches from rounds of [Notify] so it will not be notified until [Notified::enable] or + /// [Notified::poll]. + pub fn detach(mut self) -> Notified<'a> { + self.round = Round::ZERO; + self + } +} + +impl Future for Notified<'_> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let round = self.round; + match self.stage { + Stage::Init => match self.notify.poll(&mut self.waiter, round) { + Poll::Pending => self.stage = Stage::Waiting, + Poll::Ready(_) => { + self.stage = Stage::Finished; + return Poll::Ready(()); + }, + }, + Stage::Waiting => match self.waiter.notification.load(Acquire) { + None => {}, + Some(_) => { + self.waiter.notification.clear(); + self.stage = Stage::Finished; + return Poll::Ready(()); + }, + }, + Stage::Finished => { + // We could come from `enable`. + self.waiter.notification.clear(); + return Poll::Ready(()); + }, + } + debug_assert_eq!(self.stage, Stage::Waiting); + if unsafe { self.waiter.parking.park(cx.waker()).is_ready() } { + while self.waiter.notification.load(Acquire).is_none() { + std::hint::spin_loop(); + } + self.waiter.notification.clear(); + self.stage = Stage::Finished; + return Poll::Ready(()); + } + Poll::Pending + } +} + +impl Drop for Notified<'_> { + fn drop(&mut self) { + match self.stage { + Stage::Init => {}, + Stage::Waiting => self.notify.remove(&mut self.waiter), + Stage::Finished => { + if let Some(Notification { round, kind: NotificationKind::One }) = self.waiter.notification.take() { + self.notify.release_notification(round); + } + }, + }; + } +} + +#[cfg(test)] +mod tests { + use std::pin::{pin, Pin}; + + use asyncs::select; + + use super::Notify; + + #[asyncs::test] + async fn notify_one_simple() { + let notify = Notify::new(); + + // given: two notifieds polled in order + let mut notified1 = notify.notified(); + let mut notified2 = notify.notified(); + select! { + biased; + default => {}, + _ = &mut notified1 => unreachable!(), + _ = &mut notified2 => unreachable!(), + } + + // when: notify_one + notify.notify_one(); + + // then: only the first polled got notified + select! { + biased; + default => unreachable!(), + _ = &mut notified2 => unreachable!(), + _ = &mut notified1 => {} + } + + // when: another notify_one + notify.notify_one(); + // then: other got notified + select! { + default => unreachable!(), + _ = &mut notified2 => {}, + } + } + + #[asyncs::test] + async fn notify_one_enabled() { + let notify = Notify::new(); + let notified1 = notify.notified(); + let mut notified1 = pin!(notified1); + let mut notified2 = notify.notified(); + + // given: enabled notified + notified1.as_mut().enable(); + select! { + default => {}, + _ = &mut notified2 => unreachable!(), + } + + // when: notify_one + notify.notify_one(); + + // then: enabled notified behaves same as polled notified + notified1.await; + + select! { + default => {}, + _ = &mut notified2 => unreachable!(), + } + } + + #[asyncs::test] + async fn notify_one_permit_does_not_acculumate() { + let notify = Notify::new(); + + // given: two notifieds + let notified1 = notify.notified(); + let notified2 = notify.notified(); + + // when: notify_one twice + notify.notify_one(); + notify.notify_one(); + + // then: only one permit + select! { + default => unreachable!(), + _ = notified1 => {}, + }; + select! { + default => {}, + _ = notified2 => unreachable!(), + }; + } + + #[asyncs::test] + async fn notify_one_permit_consumed_by_poll() { + let notify = Notify::new(); + let mut notified1 = notify.notified(); + let notified2 = notify.notified(); + + // given: notify_one permit + notify.notify_one(); + + // when: poll and drop + select! { + default => unreachable!(), + _ = &mut notified1 => {}, + }; + drop(notified1); + + // then: no permit resumed + select! { + default => {}, + _ = notified2 => unreachable!(), + }; + } + + #[asyncs::test] + async fn notify_one_permit_doesnot_consumed_by_enable() { + let notify = Notify::new(); + let mut notified1 = notify.notified(); + let notified2 = notify.notified(); + + // given: notify_one permit + notify.notify_one(); + + // when: enable and drop notified + unsafe { + Pin::new_unchecked(&mut notified1).enable(); + } + drop(notified1); + + // then: notify_one permit resumed + select! { + default => unreachable!(), + _ = notified2 => {}, + }; + } + + #[asyncs::test] + async fn notify_one_permit_unconsumed_resumed_on_drop() { + let notify = Notify::new(); + + // given: enabled/polled notified + let mut notified1 = notify.notified(); + select! { + default => {}, + _ = &mut notified1 => unreachable!(), + }; + + // when: notify_one and drop with no further poll + notify.notify_one(); + drop(notified1); + + // then: unconsumed notify_one will be resumed + let notified2 = notify.notified(); + select! { + default => unreachable!(), + _ = notified2 => {}, + }; + } + + #[asyncs::test] + async fn notify_one_permit_does_not_resumed_cross_round() { + let notify = Notify::new(); + + // given: enabled/polled notified + let mut notified1 = notify.notified(); + select! { + default => {}, + _ = &mut notified1 => unreachable!(), + }; + + // when: notify_one and drop after notify_all with no further poll + notify.notify_one(); + notify.notify_all(); + drop(notified1); + + // then: unconsumed notify_one will not be resumed cross round + let notified2 = notify.notified(); + select! { + default => {}, + _ = notified2 => unreachable!(), + }; + } + + #[asyncs::test] + async fn notify_all_simple() { + let notify = Notify::new(); + + // given: not enabled notified + let mut notified1 = notify.notified().detach(); + let mut notified2 = notify.notified().detach(); + let mut notified3 = notify.notified(); + + // when: notify_all + notify.notify_all(); + + // then: only attached ones got notified + select! { + // So all notifieds got polled + biased; + default => unreachable!(), + _ = &mut notified1 => unreachable!("not ready"), + _ = &mut notified2 => unreachable!("not ready"), + _ = &mut notified3 => {}, + }; + + // given: polled notified + // when: notify_all + notify.notify_all(); + + // then: notified + select! { + default => unreachable!(), + _ = &mut notified1 => {}, + }; + + select! { + default => unreachable!(), + _ = &mut notified2 => {}, + }; + } + + #[asyncs::test] + async fn notify_all_enabled() { + let notify = Notify::new(); + let notified = notify.notified(); + + // given: enabled notified + let mut notified = pin!(notified); + notified.as_mut().enable(); + + // when: notify_all + notify.notify_all(); + + // then: notified + select! { + default => unreachable!(), + _ = notified => {}, + }; + } + + #[asyncs::test] + async fn notify_all_ruin_permit() { + let notify = Notify::new(); + + // given: a detached Notified + let notified = notify.notified().detach(); + + // when: notify_one and then notify_all + notify.notify_one(); + notify.notify_all(); + + // then: permit got cleared + select! { + default => {}, + _ = notified => unreachable!(), + } + } + + #[asyncs::test] + async fn notify_unlink() { + let notify = Notify::new(); + + let mut notified1 = notify.notified(); + let mut notified2 = notify.notified(); + + select! { + default => {}, + _ = &mut notified1 => unreachable!(), + _ = &mut notified2 => unreachable!(), + } + + let mut notified3 = notify.notified(); + unsafe { Pin::new_unchecked(&mut notified3).enable() }; + + unsafe { + std::ptr::drop_in_place(&mut notified1); + } + unsafe { + std::ptr::drop_in_place(&mut notified2); + } + unsafe { + std::ptr::drop_in_place(&mut notified3); + } + + std::mem::forget(notified1); + std::mem::forget(notified2); + std::mem::forget(notified3); + + notify.notify_all(); + } +} diff --git a/asyncs-sync/src/parker.rs b/asyncs-sync/src/parker.rs new file mode 100644 index 0000000..c848dd1 --- /dev/null +++ b/asyncs-sync/src/parker.rs @@ -0,0 +1,231 @@ +//! Utility to park and unpark task. + +extern crate alloc; + +use core::cell::UnsafeCell; +use core::marker::PhantomData; +use core::sync::atomic::AtomicUsize; +use core::sync::atomic::Ordering::{self, *}; +use core::task::{Poll, Waker}; + +const UNPARKED: usize = usize::MAX; + +#[derive(Clone, Copy)] +enum State { + Parking(usize), + Unparked, +} + +impl State { + unsafe fn from(u: usize) -> State { + match u { + UNPARKED => State::Unparked, + _ => State::Parking(u), + } + } +} + +impl From for usize { + fn from(state: State) -> usize { + match state { + State::Unparked => UNPARKED, + State::Parking(UNPARKED) => panic!("parker state overflow usize"), + State::Parking(index) => index, + } + } +} + +struct AtomicState(AtomicUsize); + +impl Default for AtomicState { + fn default() -> Self { + AtomicState(AtomicUsize::new(State::Parking(0).into())) + } +} + +impl AtomicState { + pub fn load(&self, ordering: Ordering) -> State { + let u = self.0.load(ordering); + unsafe { State::from(u) } + } + + pub fn compare_exchange( + &self, + current: State, + new: State, + success: Ordering, + failure: Ordering, + ) -> Result { + match self.0.compare_exchange(current.into(), new.into(), success, failure) { + Ok(_) => Ok(current), + Err(updated) => unsafe { Err(State::from(updated)) }, + } + } +} + +trait MemoryOrdering { + fn park_ordering() -> Ordering; + fn unpark_ordering() -> Ordering; +} + +/// No happens-before relationship between `unpark` and `park`. +pub enum WeakOrdering {} + +impl MemoryOrdering for WeakOrdering { + fn park_ordering() -> Ordering { + Relaxed + } + + fn unpark_ordering() -> Ordering { + // Acquire load to observe waker. + Acquire + } +} + +impl MemoryOrdering for StrongOrdering { + fn park_ordering() -> Ordering { + // Acquire load to observe all writes before `unpark`. + Acquire + } + + fn unpark_ordering() -> Ordering { + // Acquire load to observe waker. + // Release store to publish all writes before `unpark`. + AcqRel + } +} + +/// Code before `unpark` happens before code after `park` that returns `Poll::Ready`. +pub enum StrongOrdering {} + +#[allow(dead_code)] +pub(crate) type WeakParking = Parking; +#[allow(dead_code)] +pub(crate) type StrongParking = Parking; + +pub(crate) struct Parking { + state: AtomicState, + wakers: UnsafeCell<[Option; 2]>, + _marker: PhantomData, +} + +unsafe impl Send for Parking {} +unsafe impl Sync for Parking {} + +#[allow(private_bounds)] +impl Parking { + pub fn new() -> Self { + Self { state: AtomicState::default(), wakers: UnsafeCell::new(Default::default()), _marker: PhantomData } + } + + /// # Panics + /// Panic if unpark twice. + /// + /// # Safety + /// Unsafe to unpark concurrently + pub unsafe fn unpark(&self) -> Option { + let mut state = self.state.load(Relaxed); + loop { + match state { + State::Unparked => unreachable!("unpark twice"), + State::Parking(index) => { + match self.state.compare_exchange(state, State::Unparked, T::unpark_ordering(), Relaxed) { + Err(updated) => state = updated, + Ok(_) => { + let wakers = unsafe { &mut *self.wakers.get() }; + return wakers[index].take(); + }, + } + }, + } + } + } + + /// # Safety + /// Unsafe to concurrent parks. + pub unsafe fn park(&self, waker: &Waker) -> Poll<()> { + let state = self.state.load(T::park_ordering()); + match state { + State::Unparked => Poll::Ready(()), + State::Parking(index) => { + let index = (index + 1) & 0x01; + let wakers = unsafe { &mut *self.wakers.get() }; + match unsafe { wakers.get_unchecked_mut(index) } { + existing_waker @ None => *existing_waker = Some(waker.clone()), + Some(existing_waker) => existing_waker.clone_from(waker), + } + // Release store to publish waker. + match self.state.compare_exchange(state, State::Parking(index), Release, T::park_ordering()) { + Ok(_) => Poll::Pending, + Err(State::Unparked) => Poll::Ready(()), + Err(_) => unsafe { std::hint::unreachable_unchecked() }, + } + }, + } + } +} + +#[cfg(test)] +mod tests { + use std::future; + use std::sync::Arc; + + use asyncs::task; + + use super::{StrongParking, WeakParking}; + + #[asyncs::test] + async fn unpark_first() { + let parking = WeakParking::new(); + + unsafe { + assert!(parking.unpark().is_none()); + assert!(parking.park(futures::task::noop_waker_ref()).is_ready()); + } + } + + #[asyncs::test] + async fn unpark_waker() { + let parking = WeakParking::new(); + + unsafe { + assert!(parking.park(futures::task::noop_waker_ref()).is_pending()); + assert!(parking.unpark().is_some()); + } + } + + #[asyncs::test] + async fn parking_concurrent() { + let shared = Arc::new(0usize); + let parking = Arc::new(StrongParking::new()); + + let handle = task::spawn({ + let shared = shared.clone(); + let parking = parking.clone(); + async move { + future::poll_fn(|cx| unsafe { parking.park(cx.waker()) }).await; + *shared + } + }); + + #[allow(invalid_reference_casting)] + let mutable = unsafe { &mut *(shared.as_ref() as *const usize as *mut usize) }; + *mutable = 5; + unsafe { + parking.unpark(); + } + + assert_eq!(handle.await.unwrap(), 5); + } + + #[asyncs::test] + #[should_panic(expected = "unpark twice")] + async fn unpark_twice() { + let parking = WeakParking::new(); + + unsafe { + parking.unpark(); + parking.unpark(); + } + } +} diff --git a/asyncs-sync/src/watch.rs b/asyncs-sync/src/watch.rs new file mode 100644 index 0000000..e2f0a6a --- /dev/null +++ b/asyncs-sync/src/watch.rs @@ -0,0 +1,685 @@ +//! Channel to publish and subscribe values. + +use std::cell::{Cell, UnsafeCell}; +use std::cmp::Ordering::*; +use std::fmt::{self, Formatter}; +use std::mem::{ManuallyDrop, MaybeUninit}; +use std::ops::Deref; +use std::ptr; +use std::sync::atomic::Ordering::{self, *}; +use std::sync::atomic::{AtomicPtr, AtomicUsize}; +use std::sync::Arc; + +use crate::Notify; + +/// Error for [Sender::send] to express no receivers alive. +#[derive(Debug)] +pub struct SendError(T); + +impl SendError { + pub fn into_value(self) -> T { + self.0 + } +} + +/// Error for [Receiver::changed()] to express that sender has been dropped. +#[derive(Debug)] +pub struct RecvError(()); + +#[repr(transparent)] +#[derive(Clone, Copy, Default, Debug, PartialEq, PartialOrd, Eq, Ord)] +struct Version(u64); + +impl Version { + pub fn next(self) -> Self { + Self(self.0 + 1) + } +} + +struct Slot { + refs: AtomicUsize, + + frees: AtomicPtr>, + + /// Safety: never changed after published and before reclaimed + value: UnsafeCell>, + version: Cell, +} + +impl Default for Slot { + fn default() -> Self { + Self { + refs: AtomicUsize::new(0), + frees: AtomicPtr::new(ptr::null_mut()), + value: UnsafeCell::new(ManuallyDrop::new(unsafe { MaybeUninit::zeroed().assume_init() })), + version: Cell::new(Version::default()), + } + } +} + +impl Slot { + fn store(&self, value: T) { + debug_assert_eq!(self.refs.load(Relaxed), 0); + unsafe { + std::ptr::write(self.value.get(), ManuallyDrop::new(value)); + } + self.refs.store(1, Relaxed); + } + + /// Retains current or newer version. + unsafe fn retain(&self, version: Version) -> Option<&Slot> { + let mut refs = self.refs.load(Relaxed); + loop { + if refs == 0 { + return None; + } + match self.refs.compare_exchange(refs, refs + 1, Relaxed, Relaxed) { + Ok(_) => {}, + Err(updated) => { + refs = updated; + continue; + }, + } + match self.version.get().cmp(&version) { + Equal | Greater => return Some(self), + Less => panic!( + "BUG: version is monotonic, expect version {:?}, got old version {:?}", + version, + self.version.get() + ), + } + } + } +} + +#[repr(transparent)] +struct UnsafeSlot(Slot); + +impl UnsafeSlot { + pub fn retain(&self, version: Version) -> Option<&Slot> { + unsafe { self.0.retain(version) } + } + + pub unsafe fn slot(&self) -> &Slot { + &self.0 + } +} + +struct Row { + prev: Option>>, + slots: [Slot; 16], +} + +impl Default for Row { + fn default() -> Self { + Self { prev: None, slots: Default::default() } + } +} + +// We could also stamp version into the atomic to filter eagerly, but it will require `AtomicU128`. +struct Latest(AtomicUsize); + +impl Latest { + const CLOSED: usize = 0x01; + const MASK: usize = !Self::CLOSED; + + pub fn new(slot: &Slot) -> Self { + let raw = slot as *const _ as usize; + Self(AtomicUsize::new(raw)) + } + + pub fn load<'a, T>(&self, ordering: Ordering) -> (&'a UnsafeSlot, bool) { + let raw = self.0.load(ordering); + (Self::slot(raw & Self::MASK), raw & Self::CLOSED == Self::CLOSED) + } + + fn slot<'a, T>(raw: usize) -> &'a UnsafeSlot { + unsafe { &*(raw as *const UnsafeSlot) } + } + + fn ptr(slot: &Slot) -> usize { + slot as *const Slot as usize + } + + pub fn compare_exchange<'a, T>( + &self, + current: &'a Slot, + new: &Slot, + success: Ordering, + failure: Ordering, + ) -> Result<&'a Slot, &'a UnsafeSlot> { + match self.0.compare_exchange(Self::ptr(current), Self::ptr(new), success, failure) { + Ok(_) => Ok(current), + Err(updated) => Err(Self::slot(updated)), + } + } + + pub fn close(&self) { + let u = self.0.load(Relaxed); + self.0.store(u | Self::CLOSED, Relaxed); + } +} + +struct Shared { + rows: UnsafeCell>>, + frees: AtomicPtr>, + + latest: Latest, + + closed: Notify, + changes: Notify, + + senders: AtomicUsize, + receivers: AtomicUsize, +} + +impl Drop for Shared { + fn drop(&mut self) { + let slot = self.latest.load(Relaxed).0; + self.release(unsafe { slot.slot() }); + } +} + +impl Shared { + fn new(version: Version, value: T) -> Self { + let row = Box::>::default(); + let slot = &row.slots[0]; + slot.store(value); + slot.version.set(version); + let latest = Latest::new(slot); + let shared = Self { + rows: UnsafeCell::new(row), + frees: AtomicPtr::new(ptr::null_mut()), + latest, + closed: Notify::new(), + changes: Notify::new(), + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + }; + let row = unsafe { &*shared.rows.get() }; + shared.add_slots(&row.slots[1..]); + shared + } + + fn new_sender(self: &Arc) -> Sender { + self.senders.fetch_add(1, Relaxed); + Sender { shared: self.clone() } + } + + fn drop_sender(&self) { + if self.senders.fetch_sub(1, Relaxed) != 1 { + return; + } + self.latest.close(); + self.changes.notify_all(); + } + + fn new_receiver(self: &Arc, seen: Version) -> Receiver { + self.receivers.fetch_add(1, Relaxed); + Receiver { seen, shared: self.clone() } + } + + fn drop_receiver(&self) { + if self.receivers.fetch_sub(1, Relaxed) == 1 { + self.closed.notify_all(); + } + } + + fn add_slots(&self, slots: &[Slot]) { + for i in 0..slots.len() - 1 { + let curr = unsafe { slots.get_unchecked(i) }; + let next = unsafe { slots.get_unchecked(i + 1) }; + curr.frees.store(next as *const _ as *mut _, Relaxed); + } + let head = unsafe { slots.get_unchecked(0) }; + let tail = unsafe { slots.get_unchecked(slots.len() - 1) }; + self.free_slots(head, tail); + } + + fn alloc_slot(&self) -> &Slot { + // Acquire load to see `slot.frees`. + let mut head = self.frees.load(Acquire); + loop { + if head.is_null() { + break; + } + let slot = unsafe { &*head }; + let next = slot.frees.load(Relaxed); + match self.frees.compare_exchange(head, next, Relaxed, Acquire) { + Ok(_) => { + slot.frees.store(ptr::null_mut(), Relaxed); + return slot; + }, + Err(updated) => head = updated, + } + } + let mut row = ManuallyDrop::new(Box::>::default()); + row.prev = Some(unsafe { ptr::read(self.rows.get() as *const _) }); + unsafe { + ptr::write(self.rows.get(), ManuallyDrop::take(&mut row)); + } + self.add_slots(&row.slots[1..]); + unsafe { std::mem::transmute(row.slots.get_unchecked(0)) } + } + + fn free_slots(&self, head: &Slot, tail: &Slot) { + let mut frees = self.frees.load(Relaxed); + loop { + tail.frees.store(frees, Relaxed); + // Release store to publish `slot.frees`. + match self.frees.compare_exchange(frees, head as *const _ as *mut _, Release, Relaxed) { + Ok(_) => break, + Err(updated) => frees = updated, + } + } + } + + fn free_slot(&self, slot: &Slot) { + self.free_slots(slot, slot); + } + + fn release(&self, slot: &Slot) { + if slot.refs.fetch_sub(1, Relaxed) != 1 { + return; + } + let value = unsafe { &mut *slot.value.get() }; + let value = unsafe { ManuallyDrop::take(value) }; + drop(value); + self.free_slot(slot); + } + + fn publish(&self, value: T) { + let slot = self.alloc_slot(); + slot.store(value); + let mut latest = self.latest(Version(0)); + loop { + let version = latest.version().next(); + slot.version.set(version); + // Release store to publish value and version + // Acquire load to observe version + match self.latest.compare_exchange(latest.slot, slot, Release, Acquire) { + Ok(slot) => { + self.release(slot); + self.changes.notify_all(); + break; + }, + Err(updated) => match updated.retain(version) { + None => latest = self.latest(version), + Some(slot) => latest = Ref { slot, shared: self, closed: false, changed: true }, + }, + } + } + } + + fn latest(&self, seen: Version) -> Ref<'_, T> { + loop { + // Acquire load to observe version and value. + let (slot, closed) = self.latest.load(Acquire); + if let Some(slot) = slot.retain(seen) { + return Ref { slot, shared: self, closed, changed: seen != slot.version.get() }; + } + } + } + + fn try_changed(&self, seen: Version) -> Result>, RecvError> { + let latest = self.latest(seen); + if latest.has_changed() { + Ok(Some(latest)) + } else if latest.closed { + Err(RecvError(())) + } else { + Ok(None) + } + } +} + +/// Constructs a lock free channel to publish and subscribe values. +/// +/// ## Differences with [tokio] +/// * [tokio] holds only a single value, so no allocation. +/// * [Ref] holds no lock but reference to underlying value, which prevent it from reclamation, so +/// it is also crucial to drop it as soon as possible. +/// +/// [tokio]: https://docs.rs/tokio +pub fn channel(value: T) -> (Sender, Receiver) { + let version = Version(1); + let shared = Arc::new(Shared::new(version, value)); + let sender = Sender { shared: shared.clone() }; + let receiver = Receiver { seen: version, shared }; + (sender, receiver) +} + +/// Send part of [Receiver]. +pub struct Sender { + shared: Arc>, +} + +unsafe impl Send for Sender {} +unsafe impl Sync for Sender {} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let latest = self.shared.latest(Version(0)); + f.debug_struct("Sender") + .field("version", &latest.version()) + .field("value", latest.as_ref()) + .field("closed", &latest.closed) + .finish() + } +} + +impl Sender { + /// Sends value to [Receiver]s. + pub fn send(&self, value: T) -> Result<(), SendError> { + if self.shared.receivers.load(Relaxed) == 0 { + return Err(SendError(value)); + } + self.publish(value); + Ok(()) + } + + /// Publishes value for existing [Receiver]s and possible future [Receiver]s from + /// [Sender::subscribe]. + pub fn publish(&self, value: T) { + self.shared.publish(value); + } + + /// Subscribes to future changes. + pub fn subscribe(&self) -> Receiver { + let latest = self.shared.latest(Version::default()); + self.shared.receivers.fetch_add(1, Relaxed); + Receiver { seen: latest.version(), shared: self.shared.clone() } + } + + /// Receiver count. + pub fn receiver_count(&self) -> usize { + self.shared.receivers.load(Relaxed) + } + + /// Blocks until all receivers dropped. + pub async fn closed(&self) { + // Loop as `subscribe` takes `&self` but not `&mut self`. + while !self.is_closed() { + let notified = self.shared.closed.notified(); + if self.is_closed() { + return; + } + notified.await + } + } + + pub fn is_closed(&self) -> bool { + self.receiver_count() == 0 + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + self.shared.new_sender() + } +} + +impl Drop for Sender { + fn drop(&mut self) { + self.shared.drop_sender(); + } +} + +/// Receive part of [Sender]. +pub struct Receiver { + seen: Version, + shared: Arc>, +} + +unsafe impl Send for Receiver {} +unsafe impl Sync for Receiver {} + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let latest = self.borrow(); + f.debug_struct("Receiver") + .field("seen", &self.seen) + .field("version", &latest.version()) + .field("value", latest.as_ref()) + .field("closed", &latest.closed) + .field("changed", &latest.changed) + .finish() + } +} + +/// Reference to borrowed value. +/// +/// Holds reference will prevent it from reclamation so drop it as soon as possible. +pub struct Ref<'a, T> { + slot: &'a Slot, + shared: &'a Shared, + closed: bool, + changed: bool, +} + +unsafe impl Send for Ref<'_, T> {} +unsafe impl Sync for Ref<'_, T> {} + +impl fmt::Debug for Ref<'_, T> { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("Ref") + .field("version", &self.version()) + .field("value", self.as_ref()) + .field("closed", &self.closed) + .field("changed", &self.changed) + .finish() + } +} + +impl<'a, T> Ref<'a, T> { + fn version(&self) -> Version { + self.slot.version.get() + } + + /// Do we ever seen this before from last [Receiver::borrow_and_update] and [Receiver::changed()] ? + pub fn has_changed(&self) -> bool { + self.changed + } +} + +impl Deref for Ref<'_, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + unsafe { &*self.slot.value.get() } + } +} + +impl AsRef for Ref<'_, T> { + fn as_ref(&self) -> &T { + self + } +} + +impl Drop for Ref<'_, T> { + fn drop(&mut self) { + self.shared.release(self.slot); + } +} + +impl Receiver { + /// Borrows the latest value but does not mark it as seen. + pub fn borrow(&self) -> Ref<'_, T> { + self.shared.latest(self.seen) + } + + /// Borrows the latest value and marks it as seen. + pub fn borrow_and_update(&mut self) -> Ref<'_, T> { + let latest = self.shared.latest(self.seen); + self.seen = latest.version(); + latest + } + + /// Blocks and consumes new change since last [Receiver::borrow_and_update] or [Receiver::changed()]. + /// + /// If multiple values are published in the meantime, it is likely that only later one got + /// observed. It is guaranteed that the final value after all [Sender]s dropped is always + /// observed. + /// + /// ## Errors + /// * [RecvError] after all [Sender]s dropped and final value consumed + pub async fn changed(&mut self) -> Result, RecvError> { + loop { + // This serves both luck path and recheck after `notified.await`. + if let Some(changed) = self.shared.try_changed(self.seen)? { + self.seen = changed.version(); + return Ok(changed); + } + let notified = self.shared.changes.notified(); + if let Some(changed) = self.shared.try_changed(self.seen)? { + self.seen = changed.version(); + return Ok(changed); + } + notified.await; + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + self.shared.new_receiver(self.seen) + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + self.shared.drop_receiver(); + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use asyncs::{select, task}; + + use crate::{watch, Notify}; + + #[asyncs::test] + async fn channel_sequential() { + // given: a watch channel + let (sender, receiver) = watch::channel(5); + + // when: borrow without a send + let latest = receiver.borrow(); + // then: have seen initial value + assert_eq!(*latest, 5); + assert_eq!(latest.has_changed(), false); + drop(latest); + + // when: send + sender.send(6).unwrap(); + // then: receiver will observe that send + let latest = receiver.borrow(); + assert_eq!(*latest, 6); + assert_eq!(latest.has_changed(), true); + drop(latest); + + // when: send after all receivers dropped. + drop(receiver); + assert_eq!(sender.send(7).unwrap_err().into_value(), 7); + + // then: send fails with no side effect. + let receiver = sender.subscribe(); + let latest = receiver.borrow(); + assert_eq!(*latest, 6); + assert_eq!(latest.has_changed(), false); + drop(latest); + drop(receiver); + + // when: publish after all receivers dropped. + sender.publish(7); + // then: new receiver will observe that + let receiver = sender.subscribe(); + let latest = receiver.borrow(); + assert_eq!(*latest, 7); + assert_eq!(latest.has_changed(), false); + } + + #[asyncs::test] + async fn receivers_dropped() { + let (sender, receiver) = watch::channel(5); + task::spawn(async move { + drop(receiver); + }); + select! { + _ = sender.closed() => {}, + } + + let _receiver = sender.subscribe(); + select! { + default => {}, + _ = sender.closed() => unreachable!(), + } + } + + #[asyncs::test] + async fn senders_dropped() { + let (sender, mut receiver) = watch::channel(()); + drop(sender.clone()); + select! { + default => {}, + _ = receiver.changed() => unreachable!(), + } + + drop(sender); + select! { + default => unreachable!(), + Err(_) = receiver.changed() => {}, + } + } + + #[asyncs::test] + async fn changed() { + let notify = Arc::new(Notify::new()); + let (sender, mut receiver) = watch::channel(0); + let handle = task::spawn({ + let notify = notify.clone(); + async move { + let mut values = vec![]; + while let Ok(latest) = receiver.changed().await { + values.push(*latest); + notify.notify_one(); + } + values + } + }); + + sender.send(1).unwrap(); + notify.notified().await; + sender.send(2).unwrap(); + notify.notified().await; + sender.send(3).unwrap(); + notify.notified().await; + + // Final value is guaranteed to be seen before error. + sender.send(4).unwrap(); + drop(sender); + let values = handle.await.unwrap(); + assert_eq!(values, vec![1, 2, 3, 4]); + } + + #[asyncs::test] + async fn ref_drop_release_value() { + let shared = Arc::new(()); + + let (sender, receiver) = watch::channel(shared.clone()); + assert_eq!(Arc::strong_count(&shared), 2); + + let borrowed1 = receiver.borrow(); + let borrowed2 = receiver.borrow(); + assert_eq!(Arc::strong_count(&shared), 2); + sender.send(Arc::new(())).unwrap(); + assert_eq!(Arc::strong_count(&shared), 2); + + drop(borrowed1); + assert_eq!(Arc::strong_count(&shared), 2); + drop(borrowed2); + assert_eq!(Arc::strong_count(&shared), 1); + } +} diff --git a/asyncs-test/Cargo.toml b/asyncs-test/Cargo.toml index e66d23b..4c9ffec 100644 --- a/asyncs-test/Cargo.toml +++ b/asyncs-test/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "asyncs-test" -authors = ["Kezhu Wang "] description = "Macros to bootstrap asynchronous tests" +authors.workspace = true edition.workspace = true version.workspace = true homepage.workspace = true diff --git a/src/lib.rs b/src/lib.rs index 343c7e4..d6307b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,8 +14,9 @@ //! multiple ones. See [spawns] for how to compat with [task::spawn]. pub mod task; - pub use async_select::select; +#[doc(inline)] +pub use asyncs_sync as sync; pub use task::spawn; #[doc(hidden)]