From 6c2c0774753df16bcf43f212e07dab0a8e86a971 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sosth=C3=A8ne=20Gu=C3=A9don?= Date: Mon, 1 Jul 2024 16:31:48 +0200 Subject: [PATCH] Queue: implement QueueView on top of #486 --- CHANGELOG.md | 1 + src/spsc.rs | 219 ++++++++++++++++++++++++++++++++++--------------- tests/cpass.rs | 12 ++- 3 files changed, 161 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 146453b708..500c43622f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Added `LinearMapView`, the `!Sized` version of `LinearMap`. - Added `HistoryBufferView`, the `!Sized` version of `HistoryBuffer`. - Added `DequeView`, the `!Sized` version of `Deque`. +- Added `QueueView`, the `!Sized` version of `Queue`. ### Changed diff --git a/src/spsc.rs b/src/spsc.rs index 1b7ab0354b..5311a7897e 100644 --- a/src/spsc.rs +++ b/src/spsc.rs @@ -97,7 +97,7 @@ //! - The numbers reported correspond to the successful path (i.e. `Some` is returned by `dequeue` //! and `Ok` is returned by `enqueue`). -use core::{cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr}; +use core::{borrow::Borrow, cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr}; #[cfg(not(feature = "portable-atomic"))] use core::sync::atomic; @@ -106,28 +106,36 @@ use portable_atomic as atomic; use atomic::{AtomicUsize, Ordering}; -/// A statically allocated single producer single consumer queue with a capacity of `N - 1` elements +use crate::storage::{OwnedStorage, Storage, ViewStorage}; + +/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`]. /// -/// *IMPORTANT*: To get better performance use a value for `N` that is a power of 2 (e.g. `16`, `32`, -/// etc.). -pub struct Queue { +/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this +/// struct if you want to write code that's generic over both. +pub struct QueueInner { // this is from where we dequeue items pub(crate) head: AtomicUsize, // this is where we enqueue new items pub(crate) tail: AtomicUsize, - pub(crate) buffer: [UnsafeCell>; N], + pub(crate) buffer: S::Buffer>>, } -impl Queue { - const INIT: UnsafeCell> = UnsafeCell::new(MaybeUninit::uninit()); +/// A statically allocated single producer single consumer queue with a capacity of `N - 1` elements +/// +/// *IMPORTANT*: To get better performance use a value for `N` that is a power of 2 (e.g. `16`, `32`, +/// etc.). +pub type Queue = QueueInner>; - #[inline] - fn increment(val: usize) -> usize { - (val + 1) % N - } +/// Asingle producer single consumer queue +/// +/// *IMPORTANT*: To get better performance use a value for `N` that is a power of 2 (e.g. `16`, `32`, +/// etc.). +pub type QueueView = QueueInner; +impl Queue { + const INIT: UnsafeCell> = UnsafeCell::new(MaybeUninit::uninit()); /// Creates an empty queue with a fixed capacity of `N - 1` pub const fn new() -> Self { // Const assert N > 1 @@ -141,18 +149,51 @@ impl Queue { } /// Returns the maximum number of elements the queue can hold + /// + /// For the same method on [`QueueView`], see [`storage_capacity`](QueueInner::storage_capacity) #[inline] pub const fn capacity(&self) -> usize { N - 1 } + /// Get a reference to the `Queue`, erasing the `N` const-generic. + pub fn as_view(&self) -> &QueueView { + self + } + + /// Get a mutable reference to the `Queue`, erasing the `N` const-generic. + pub fn as_mut_view(&mut self) -> &mut QueueView { + self + } +} + +impl QueueInner { + #[inline] + fn increment(&self, val: usize) -> usize { + (val + 1) % self.n() + } + + #[inline] + fn n(&self) -> usize { + self.buffer.borrow().len() + } + + /// Returns the maximum number of elements the queue can hold + #[inline] + pub fn storage_capacity(&self) -> usize { + self.n() - 1 + } + /// Returns the number of elements in the queue #[inline] pub fn len(&self) -> usize { let current_head = self.head.load(Ordering::Relaxed); let current_tail = self.tail.load(Ordering::Relaxed); - current_tail.wrapping_sub(current_head).wrapping_add(N) % N + current_tail + .wrapping_sub(current_head) + .wrapping_add(self.n()) + % self.n() } /// Returns `true` if the queue is empty @@ -164,12 +205,12 @@ impl Queue { /// Returns `true` if the queue is full #[inline] pub fn is_full(&self) -> bool { - Self::increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed) + self.increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed) } /// Iterates from the front of the queue to the back - pub fn iter(&self) -> Iter<'_, T, N> { - Iter { + pub fn iter(&self) -> IterInner<'_, T, S> { + IterInner { rb: self, index: 0, len: self.len(), @@ -177,9 +218,9 @@ impl Queue { } /// Returns an iterator that allows modifying each value - pub fn iter_mut(&mut self) -> IterMut<'_, T, N> { + pub fn iter_mut(&mut self) -> IterMutInner<'_, T, S> { let len = self.len(); - IterMut { + IterMutInner { rb: self, index: 0, len, @@ -218,7 +259,7 @@ impl Queue { pub fn peek(&self) -> Option<&T> { if !self.is_empty() { let head = self.head.load(Ordering::Relaxed); - Some(unsafe { &*(self.buffer.get_unchecked(head).get() as *const T) }) + Some(unsafe { &*(self.buffer.borrow().get_unchecked(head).get() as *const T) }) } else { None } @@ -229,10 +270,10 @@ impl Queue { // items without doing pointer arithmetic and accessing internal fields of this type. unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> { let current_tail = self.tail.load(Ordering::Relaxed); - let next_tail = Self::increment(current_tail); + let next_tail = self.increment(current_tail); if next_tail != self.head.load(Ordering::Acquire) { - (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val)); + (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val)); self.tail.store(next_tail, Ordering::Release); Ok(()) @@ -247,9 +288,9 @@ impl Queue { unsafe fn inner_enqueue_unchecked(&self, val: T) { let current_tail = self.tail.load(Ordering::Relaxed); - (self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val)); + (self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val)); self.tail - .store(Self::increment(current_tail), Ordering::Release); + .store(self.increment(current_tail), Ordering::Release); } /// Adds an `item` to the end of the queue, without checking if it's full @@ -273,10 +314,10 @@ impl Queue { if current_head == self.tail.load(Ordering::Acquire) { None } else { - let v = (self.buffer.get_unchecked(current_head).get() as *const T).read(); + let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read(); self.head - .store(Self::increment(current_head), Ordering::Release); + .store(self.increment(current_head), Ordering::Release); Some(v) } @@ -287,10 +328,10 @@ impl Queue { // items without doing pointer arithmetic and accessing internal fields of this type. unsafe fn inner_dequeue_unchecked(&self) -> T { let current_head = self.head.load(Ordering::Relaxed); - let v = (self.buffer.get_unchecked(current_head).get() as *const T).read(); + let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read(); self.head - .store(Self::increment(current_head), Ordering::Release); + .store(self.increment(current_head), Ordering::Release); v } @@ -306,8 +347,8 @@ impl Queue { } /// Splits a queue into producer and consumer endpoints - pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) { - (Producer { rb: self }, Consumer { rb: self }) + pub fn split(&mut self) -> (ProducerInner<'_, T, S>, ConsumerInner<'_, T, S>) { + (ProducerInner { rb: self }, ConsumerInner { rb: self }) } } @@ -336,24 +377,35 @@ where } } -impl PartialEq> for Queue +impl PartialEq> for QueueInner where T: PartialEq, + S: Storage, + S2: Storage, { - fn eq(&self, other: &Queue) -> bool { + fn eq(&self, other: &QueueInner) -> bool { self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2) } } -impl Eq for Queue where T: Eq {} +impl Eq for QueueInner where T: Eq {} -/// An iterator over the items of a queue -pub struct Iter<'a, T, const N: usize> { - rb: &'a Queue, +/// Base struct for [`Iter`] and [`IterView`], generic over the [`Storage`]. +/// +/// In most cases you should use [`Iter`] or [`IterView`] directly. Only use this +/// struct if you want to write code that's generic over both. +pub struct IterInner<'a, T, S: Storage> { + rb: &'a QueueInner, index: usize, len: usize, } +/// An iterator over the items of a queue +pub type Iter<'a, T, const N: usize> = IterInner<'a, T, OwnedStorage>; + +/// An iterator over the items of a queue +pub type IterView<'a, T> = IterInner<'a, T, ViewStorage>; + impl<'a, T, const N: usize> Clone for Iter<'a, T, N> { fn clone(&self) -> Self { Self { @@ -364,78 +416,87 @@ impl<'a, T, const N: usize> Clone for Iter<'a, T, N> { } } -/// A mutable iterator over the items of a queue -pub struct IterMut<'a, T, const N: usize> { - rb: &'a mut Queue, +/// Base struct for [`IterMut`] and [`IterMutView`], generic over the [`Storage`]. +/// +/// In most cases you should use [`IterMut`] or [`IterMutView`] directly. Only use this +/// struct if you want to write code that's generic over both. +pub struct IterMutInner<'a, T, S: Storage> { + rb: &'a QueueInner, index: usize, len: usize, } -impl<'a, T, const N: usize> Iterator for Iter<'a, T, N> { +/// An iterator over the items of a queue +pub type IterMut<'a, T, const N: usize> = IterMutInner<'a, T, OwnedStorage>; + +/// An iterator over the items of a queue +pub type IterMutView<'a, T> = IterMutInner<'a, T, ViewStorage>; + +impl<'a, T, S: Storage> Iterator for IterInner<'a, T, S> { type Item = &'a T; fn next(&mut self) -> Option { if self.index < self.len { let head = self.rb.head.load(Ordering::Relaxed); - let i = (head + self.index) % N; + let i = (head + self.index) % self.rb.n(); self.index += 1; - Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) }) + Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) }) } else { None } } } -impl<'a, T, const N: usize> Iterator for IterMut<'a, T, N> { +impl<'a, T, S: Storage> Iterator for IterMutInner<'a, T, S> { type Item = &'a mut T; fn next(&mut self) -> Option { if self.index < self.len { let head = self.rb.head.load(Ordering::Relaxed); - let i = (head + self.index) % N; + let i = (head + self.index) % self.rb.n(); self.index += 1; - Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) }) + Some(unsafe { &mut *(self.rb.buffer.borrow().get_unchecked(i).get() as *mut T) }) } else { None } } } -impl<'a, T, const N: usize> DoubleEndedIterator for Iter<'a, T, N> { +impl<'a, T, S: Storage> DoubleEndedIterator for IterInner<'a, T, S> { fn next_back(&mut self) -> Option { if self.index < self.len { let head = self.rb.head.load(Ordering::Relaxed); // self.len > 0, since it's larger than self.index > 0 - let i = (head + self.len - 1) % N; + let i = (head + self.len - 1) % self.rb.n(); self.len -= 1; - Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) }) + Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) }) } else { None } } } -impl<'a, T, const N: usize> DoubleEndedIterator for IterMut<'a, T, N> { +impl<'a, T, S: Storage> DoubleEndedIterator for IterMutInner<'a, T, S> { fn next_back(&mut self) -> Option { if self.index < self.len { let head = self.rb.head.load(Ordering::Relaxed); // self.len > 0, since it's larger than self.index > 0 - let i = (head + self.len - 1) % N; + let i = (head + self.len - 1) % self.rb.n(); self.len -= 1; - Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) }) + Some(unsafe { &mut *(self.rb.buffer.borrow().get_unchecked(i).get() as *mut T) }) } else { None } } } -impl Drop for Queue { +impl Drop for QueueInner { fn drop(&mut self) { for item in self { unsafe { @@ -445,18 +506,20 @@ impl Drop for Queue { } } -impl fmt::Debug for Queue +impl fmt::Debug for QueueInner where T: fmt::Debug, + S: Storage, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_list().entries(self.iter()).finish() } } -impl hash::Hash for Queue +impl hash::Hash for QueueInner where T: hash::Hash, + S: Storage, { fn hash(&self, state: &mut H) { // iterate over self in order @@ -466,41 +529,61 @@ where } } -impl<'a, T, const N: usize> IntoIterator for &'a Queue { +impl<'a, T, S: Storage> IntoIterator for &'a QueueInner { type Item = &'a T; - type IntoIter = Iter<'a, T, N>; + type IntoIter = IterInner<'a, T, S>; fn into_iter(self) -> Self::IntoIter { self.iter() } } -impl<'a, T, const N: usize> IntoIterator for &'a mut Queue { +impl<'a, T, S: Storage> IntoIterator for &'a mut QueueInner { type Item = &'a mut T; - type IntoIter = IterMut<'a, T, N>; + type IntoIter = IterMutInner<'a, T, S>; fn into_iter(self) -> Self::IntoIter { self.iter_mut() } } +/// Base struct for [`Consumer`] and [`ConsumerView`], generic over the [`Storage`]. +/// +/// In most cases you should use [`Consumer`] or [`ConsumerView`] directly. Only use this +/// struct if you want to write code that's generic over both. +pub struct ConsumerInner<'a, T, S: Storage> { + rb: &'a QueueInner, +} + +/// A queue "consumer"; it can dequeue items from the queue +/// NOTE the consumer semantically owns the `head` pointer of the queue +pub type Consumer<'a, T, const N: usize> = ConsumerInner<'a, T, OwnedStorage>; + /// A queue "consumer"; it can dequeue items from the queue /// NOTE the consumer semantically owns the `head` pointer of the queue -pub struct Consumer<'a, T, const N: usize> { - rb: &'a Queue, +pub type ConsumerView<'a, T> = ConsumerInner<'a, T, ViewStorage>; + +unsafe impl<'a, T, S: Storage> Send for ConsumerInner<'a, T, S> where T: Send {} + +/// Base struct for [`Producer`] and [`ProducerView`], generic over the [`Storage`]. +/// +/// In most cases you should use [`Producer`] or [`ProducerView`] directly. Only use this +/// struct if you want to write code that's generic over both. +pub struct ProducerInner<'a, T, S: Storage> { + rb: &'a QueueInner, } -unsafe impl<'a, T, const N: usize> Send for Consumer<'a, T, N> where T: Send {} +/// A queue "producer"; it can enqueue items into the queue +/// NOTE the producer semantically owns the `tail` pointer of the queue +pub type Producer<'a, T, const N: usize> = ProducerInner<'a, T, OwnedStorage>; /// A queue "producer"; it can enqueue items into the queue /// NOTE the producer semantically owns the `tail` pointer of the queue -pub struct Producer<'a, T, const N: usize> { - rb: &'a Queue, -} +pub type ProducerView<'a, T> = ProducerInner<'a, T, ViewStorage>; -unsafe impl<'a, T, const N: usize> Send for Producer<'a, T, N> where T: Send {} +unsafe impl<'a, T, S: Storage> Send for ProducerInner<'a, T, S> where T: Send {} -impl<'a, T, const N: usize> Consumer<'a, T, N> { +impl<'a, T, S: Storage> ConsumerInner<'a, T, S> { /// Returns the item in the front of the queue, or `None` if the queue is empty #[inline] pub fn dequeue(&mut self) -> Option { @@ -550,7 +633,7 @@ impl<'a, T, const N: usize> Consumer<'a, T, N> { /// Returns the maximum number of elements the queue can hold #[inline] pub fn capacity(&self) -> usize { - self.rb.capacity() + self.rb.storage_capacity() } /// Returns the item in the front of the queue without dequeuing, or `None` if the queue is @@ -575,7 +658,7 @@ impl<'a, T, const N: usize> Consumer<'a, T, N> { } } -impl<'a, T, const N: usize> Producer<'a, T, N> { +impl<'a, T, S: Storage> ProducerInner<'a, T, S> { /// Adds an `item` to the end of the queue, returns back the `item` if the queue is full #[inline] pub fn enqueue(&mut self, val: T) -> Result<(), T> { @@ -624,7 +707,7 @@ impl<'a, T, const N: usize> Producer<'a, T, N> { /// Returns the maximum number of elements the queue can hold #[inline] pub fn capacity(&self) -> usize { - self.rb.capacity() + self.rb.storage_capacity() } } diff --git a/tests/cpass.rs b/tests/cpass.rs index 95f0b7847b..efe1a3b06a 100644 --- a/tests/cpass.rs +++ b/tests/cpass.rs @@ -1,8 +1,9 @@ //! Collections of `Send`-able things are `Send` use heapless::{ - spsc::{Consumer, Producer, Queue}, - HistoryBuffer, Vec, + histbuf::HistoryBufferView, + spsc::{Consumer, ConsumerView, Producer, ProducerView, Queue, QueueView}, + HistoryBuffer, Vec, VecView, }; #[test] @@ -13,13 +14,18 @@ fn send() { fn is_send() where - T: Send, + T: Send + ?Sized, { } is_send::>(); + is_send::>(); is_send::>(); + is_send::>(); is_send::>(); + is_send::>(); is_send::>(); + is_send::>(); is_send::>(); + is_send::>(); }