diff --git a/.github/workflows/rust-miri.yml b/.github/workflows/rust-miri.yml index 05e401f..2f4dfcd 100644 --- a/.github/workflows/rust-miri.yml +++ b/.github/workflows/rust-miri.yml @@ -20,4 +20,4 @@ jobs: override: true components: miri - name: Test - run: cargo miri test + run: cargo miri test --all-features diff --git a/Cargo.toml b/Cargo.toml index a826f1f..2ac6548 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,15 +7,31 @@ repository = "https://github.com/embassy-rs/atomic-pool" edition = "2021" readme = "README.md" license = "MIT OR Apache-2.0" -categories = [ - "embedded", - "no-std", - "concurrency", - "memory-management", -] +categories = ["embedded", "no-std", "concurrency", "memory-management"] [dependencies] atomic-polyfill = "1.0" as-slice-01 = { package = "as-slice", version = "0.1.5" } as-slice-02 = { package = "as-slice", version = "0.2.1" } stable_deref_trait = { version = "1.2.0", default-features = false } +embassy-sync = { version = "0.6.0", optional = true } + +# Used by async_std example and async tests +[dev-dependencies] +embassy-executor = { version = "0.6.0", features = ["arch-std","executor-thread","integrated-timers","task-arena-size-32768"] } +embassy-time = { version = "0.3.2", features = ["std"] } +embassy-futures = "0.1.1" +critical-section = { version = "1.1", features = ["std"] } + +[features] +default = [] + +# Allow to asynchronously wait for a pool slot to become available. +# This feature is optional and requires the `AtomicWaker` functionality from +# the `embassy-sync` crate, which in turn requires a critical section +# implementation. Check: https://crates.io/crates/critical-section +async = ["dep:embassy-sync"] + +[[example]] +name = "async_std" +required-features = ["async"] diff --git a/README.md b/README.md index 66fbb16..2e42634 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,10 @@ Statically allocated pool providing a std-like Box. +## Optional Features +- `async`
+Allow to asynchronously wait for a pool slot to become available. This feature requires the `AtomicWaker` functionality from the `embassy-sync` crate, which in turn requires a critical section implementation like [critical-section](https://crates.io/crates/critical-section). + ## License This work is licensed under either of diff --git a/examples/async_std.rs b/examples/async_std.rs new file mode 100644 index 0000000..227d1bc --- /dev/null +++ b/examples/async_std.rs @@ -0,0 +1,79 @@ +use embassy_executor::Spawner; +use embassy_futures::join::join5; +use embassy_time::Timer; + +use std::{mem, process}; + +use atomic_pool::{pool, Box}; + +#[derive(Debug)] +#[allow(dead_code)] +struct Packet(u32); + +// A maximum of 2 Packet instances can be allocated at a time. +// A maximum of 1 future can be waiting at a time. +pool!(PacketPool: [Packet; 2], 1); + +#[embassy_executor::task] +async fn run() { + // Allocate non-blocking + let fut1 = async { + println!("1 - allocating async..."); + let box1 = Box::::new(Packet(1)); + println!("1 - allocated: {:?}", box1); + Timer::after_millis(100).await; + println!("1 - dropping allocation..."); + mem::drop(box1); + }; + + // Allocate asynchronously + let fut2 = async { + Timer::after_millis(5).await; + println!("2 - allocating sync..."); + let box2 = Box::::new_async(Packet(2)).await; + println!("2 - allocated: {:?}", box2); + Timer::after_millis(150).await; + println!("2 - dropping allocation..."); + mem::drop(box2); + }; + + // Allocate non-blocking (fails, data pool is full) + let fut3 = async { + Timer::after_millis(10).await; + println!("3 - allocating sync..."); + let box3 = Box::::new(Packet(3)); + println!( + "3 - allocation fails because the data pool is full: {:?}", + box3 + ); + }; + + // Allocate asynchronously (waits for a deallocation) + let fut4 = async { + Timer::after_millis(15).await; + println!("4 - allocating async..."); + let box4 = Box::::new_async(Packet(4)).await; + println!("4 - allocated: {:?}", box4); + Timer::after_millis(100).await; + println!("4 - dropping allocation..."); + }; + + // Allocate asynchronously (fails, waker pool is full) + let fut5 = async { + Timer::after_millis(20).await; + println!("5 - allocating async..."); + let box5 = Box::::new_async(Packet(5)).await; + println!( + "5 - allocation fails because the waker pool is full: {:?}", + box5 + ); + }; + + join5(fut1, fut2, fut3, fut4, fut5).await; + process::exit(0); +} + +#[embassy_executor::main] +async fn main(spawner: Spawner) { + spawner.spawn(run()).unwrap(); +} diff --git a/examples/simple.rs b/examples/simple.rs index 3e4a706..6c1f0a5 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -3,6 +3,7 @@ use std::mem; use atomic_pool::{pool, Box}; #[derive(Debug)] +#[allow(dead_code)] struct Packet(u32); pool!(PacketPool: [Packet; 4]); diff --git a/src/atomic_bitset.rs b/src/atomic_bitset.rs index 81e8897..6eed9e9 100644 --- a/src/atomic_bitset.rs +++ b/src/atomic_bitset.rs @@ -1,5 +1,9 @@ +#[cfg(feature = "async")] +pub mod droppable_bit; + use atomic_polyfill::{AtomicU32, Ordering}; +/// A bitset that can be used to allocate slots in a pool pub struct AtomicBitset where [AtomicU32; K]: Sized, @@ -16,6 +20,11 @@ where Self { used: [Z; K] } } + #[cfg(feature = "async")] + pub fn alloc_droppable(&self) -> Option> { + self.alloc().map(|i| droppable_bit::DroppableBit::new(self, i)) + } + pub fn alloc(&self) -> Option { for (i, val) in self.used.iter().enumerate() { let mut allocated = 0; diff --git a/src/atomic_bitset/droppable_bit.rs b/src/atomic_bitset/droppable_bit.rs new file mode 100644 index 0000000..bccdcd9 --- /dev/null +++ b/src/atomic_bitset/droppable_bit.rs @@ -0,0 +1,52 @@ +use super::AtomicBitset; + +/// Automatically frees the Bitset slot when DroppableBit is dropped +/// Useful for async environments where the future might be dropped before it completes +pub struct DroppableBit<'a, const N: usize, const K: usize> { + bitset: &'a AtomicBitset, + inner: usize, +} + +impl<'a, const N: usize, const K: usize> DroppableBit<'a, N, K> { + /// Only a single instance of DroppableBit should be created for each slot + /// Restrict it to only be created by AtomicBitset `alloc_droppable` method + pub(super) fn new(bitset: &'a AtomicBitset, inner: usize) -> Self { + Self { bitset, inner } + } + + pub fn inner(&self) -> usize { + self.inner + } +} + +impl Drop for DroppableBit<'_, N, K> { + fn drop(&mut self) { + self.bitset.free(self.inner); + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_16() { + let s = AtomicBitset::<16, 1>::new(); + let mut v = vec![]; + + for _ in 0..16 { + let bit = s.alloc().map(|i| DroppableBit::new(&s, i)); + assert!(bit.is_some()); + + v.push(bit.unwrap()); + } + assert_eq!(s.alloc(), None); + v.pop(); + v.pop(); + assert!(s.alloc().is_some()); + assert!(s.alloc().is_some()); + assert_eq!(s.alloc(), None); + v.pop(); + assert!(s.alloc().is_some()); + } +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 66ad238..c90adcd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,10 +4,16 @@ mod atomic_bitset; use atomic_polyfill::AtomicU32; use core::cell::UnsafeCell; +#[cfg(feature = "async")] +use core::future::{poll_fn, Future}; use core::hash::{Hash, Hasher}; use core::mem::MaybeUninit; use core::ops::{Deref, DerefMut}; +#[cfg(feature = "async")] +use core::task::Poll; use core::{cmp, mem, ptr::NonNull}; +#[cfg(feature = "async")] +use embassy_sync::waitqueue::AtomicWaker; use crate::atomic_bitset::AtomicBitset; @@ -15,46 +21,99 @@ use crate::atomic_bitset::AtomicBitset; #[doc(hidden)] pub trait PoolStorage { fn alloc(&self) -> Option>; + #[cfg(feature = "async")] + fn alloc_async(&self) -> impl Future>>; unsafe fn free(&self, p: NonNull); } /// Implementation detail. Not covered by semver guarantees. #[doc(hidden)] -pub struct PoolStorageImpl +pub struct PoolStorageImpl where [AtomicU32; K]: Sized, + [AtomicU32; WK]: Sized, { used: AtomicBitset, data: [UnsafeCell>; N], + #[cfg(feature = "async")] + wakers_used: AtomicBitset, + #[cfg(feature = "async")] + wakers: [AtomicWaker; WN], } -unsafe impl Send for PoolStorageImpl {} -unsafe impl Sync for PoolStorageImpl {} +unsafe impl Send + for PoolStorageImpl +{ +} +unsafe impl Sync + for PoolStorageImpl +{ +} -impl PoolStorageImpl +impl + PoolStorageImpl where [AtomicU32; K]: Sized, + [AtomicU32; WK]: Sized, { const UNINIT: UnsafeCell> = UnsafeCell::new(MaybeUninit::uninit()); + #[cfg(feature = "async")] + const WAKER: AtomicWaker = AtomicWaker::new(); + pub const fn new() -> Self { Self { used: AtomicBitset::new(), data: [Self::UNINIT; N], + #[cfg(feature = "async")] + wakers_used: AtomicBitset::new(), + #[cfg(feature = "async")] + wakers: [Self::WAKER; WN], } } } -impl PoolStorage for PoolStorageImpl +impl PoolStorage + for PoolStorageImpl where [AtomicU32; K]: Sized, + [AtomicU32; WK]: Sized, { + /// Returns an item from the data pool, if available. + /// Returns None if the data pool is full. fn alloc(&self) -> Option> { let n = self.used.alloc()?; let p = self.data[n].get() as *mut T; Some(unsafe { NonNull::new_unchecked(p) }) } + /// Wait until an item is available in the data pool, then return it. + /// Returns None if the waker pool is full. + #[cfg(feature = "async")] + fn alloc_async(&self) -> impl Future>> { + let mut waker_slot = None; + poll_fn(move |cx| { + // Check if there is a free slot in the data pool + if let Some(n) = self.used.alloc() { + let p = self.data[n].get() as *mut T; + return Poll::Ready(Some(unsafe { NonNull::new_unchecked(p) })); + } + + // Try to allocate a waker slot if necessary + if waker_slot.is_none() { + waker_slot = self.wakers_used.alloc_droppable(); + } + + match &waker_slot { + Some(bit) => { + self.wakers[bit.inner()].register(cx.waker()); + Poll::Pending + } + None => Poll::Ready(None), // No waker slots available + } + }) + } + /// safety: p must be a pointer obtained from self.alloc that hasn't been freed yet. unsafe fn free(&self, p: NonNull) { let origin = self.data.as_ptr() as *mut T; @@ -62,6 +121,12 @@ where assert!(n >= 0); assert!((n as usize) < N); self.used.free(n as usize); + + // Wake up any wakers waiting for a slot + #[cfg(feature = "async")] + for waker in self.wakers.iter() { + waker.wake(); + } } } @@ -82,6 +147,8 @@ pub struct Box { } impl Box

{ + /// Returns an item from the data pool, if available. + /// Returns None if the data pool is full. pub fn new(item: P::Item) -> Option { let p = match P::get().alloc() { Some(p) => p, @@ -91,6 +158,18 @@ impl Box

{ Some(Self { ptr: p }) } + /// Wait until an item is available in the data pool, then return it. + /// Returns None if the waker pool is full. + #[cfg(feature = "async")] + pub async fn new_async(item: P::Item) -> Option { + let p = match P::get().alloc_async().await { + Some(p) => p, + None => return None, + }; + unsafe { p.as_ptr().write(item) }; + Some(Self { ptr: p }) + } + pub fn into_raw(b: Self) -> NonNull { let res = b.ptr; mem::forget(b); @@ -231,25 +310,55 @@ where } } +/// Create a item pool of a given type and size. +/// If the `async` feature is enabled, a waker pool is also created with a given size. +/// The waker pool is used to wake up tasks waiting for an item to become available in the data pool. +/// The waker pool size should be at least the number of tasks that can be waiting for an item at the same time. +/// The default waker pool size is zero, changing it without the `async` feature enabled will have no effect. +/// Example: +/// ``` +/// use atomic_pool::{pool, Box}; +/// +/// #[derive(Debug)] +/// #[allow(dead_code)] +/// struct Packet(u32); +/// +/// // pool!(PacketPool: [Packet; 4], 2); // This would create a waker pool of size 2 +/// pool!(PacketPool: [Packet; 4]); +/// ``` #[macro_export] macro_rules! pool { ($vis:vis $name:ident: [$ty:ty; $n:expr]) => { $vis struct $name { _uninhabited: ::core::convert::Infallible } impl $crate::Pool for $name { type Item = $ty; - type Storage = $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}>; + type Storage = $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, 0, 0>; fn get() -> &'static Self::Storage { - static POOL: $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}> = $crate::PoolStorageImpl::new(); + static POOL: $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, 0, 0> = $crate::PoolStorageImpl::new(); + &POOL + } + } + }; + ($vis:vis $name:ident: [$ty:ty; $n:expr], $wn:expr) => { + $vis struct $name { _uninhabited: ::core::convert::Infallible } + impl $crate::Pool for $name { + type Item = $ty; + type Storage = $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, {$wn}, {($wn+31)/32}>; + fn get() -> &'static Self::Storage { + static POOL: $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, {$wn}, {($wn+31)/32}> = $crate::PoolStorageImpl::new(); &POOL } } }; } + #[cfg(test)] mod test { use super::*; + use core::mem; pool!(TestPool: [u32; 4]); + pool!(TestPool2: [u32; 4], 2); #[test] fn test_pool() { @@ -270,4 +379,20 @@ mod test { assert_eq!(*b4, 444); assert_eq!(*b5, 555); } + + #[cfg(not(feature = "async"))] + #[test] + fn test_sync_sizes() { + let pool1 = ::get(); + let pool2 = ::get(); + assert_eq!(mem::size_of_val(pool1), mem::size_of_val(pool2)); + } + + #[cfg(feature = "async")] + #[test] + fn test_async_sizes() { + let pool1 = ::get(); + let pool2 = ::get(); + assert!(mem::size_of_val(pool1) < mem::size_of_val(pool2)); + } }