Skip to content

Commit

Permalink
Add async functionality
Browse files Browse the repository at this point in the history
This new feature is gated behind the `async` feature flag.

It allows to asynchronously await for a pool slot to become available,
making it easier to share constrained resources between multiple tasks.

It requires the `AtomicWaker` functionality from the `embassy-sync`
crate, which in turn requires a critical section implementation.
  • Loading branch information
danielstuart14 committed Aug 19, 2024
1 parent d93c469 commit fc64881
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 13 deletions.
28 changes: 22 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,31 @@ repository = "https://github.com/embassy-rs/atomic-pool"
edition = "2021"
readme = "README.md"
license = "MIT OR Apache-2.0"
categories = [
"embedded",
"no-std",
"concurrency",
"memory-management",
]
categories = ["embedded", "no-std", "concurrency", "memory-management"]

[dependencies]
atomic-polyfill = "1.0"
as-slice-01 = { package = "as-slice", version = "0.1.5" }
as-slice-02 = { package = "as-slice", version = "0.2.1" }
stable_deref_trait = { version = "1.2.0", default-features = false }
embassy-sync = { version = "0.6.0", optional = true }

# Used by async_std example
[dev-dependencies]
embassy-executor = { version = "0.6.0", features = ["arch-std","executor-thread","integrated-timers","task-arena-size-32768"] }
embassy-time = { version = "0.3.2", features = ["std"] }
embassy-futures = "0.1.1"
critical-section = { version = "1.1", features = ["std"] }

[features]
default = []

# Allow to asynchronously wait for a pool slot to become available.
# This feature is optional and requires the `AtomicWaker` functionality from
# the `embassy-sync` crate, which in turn requires a critical section
# implementation. Check: https://crates.io/crates/critical-section
async = ["dep:embassy-sync"]

[[example]]
name = "async_std"
required-features = ["async"]
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

Statically allocated pool providing a std-like Box.

## Optional Features
- `async`<br>
Allow to asynchronously wait for a pool slot to become available. This feature requires the `AtomicWaker` functionality from the `embassy-sync` crate, which in turn requires a critical section implementation like [critical-section](https://crates.io/crates/critical-section).

## License

This work is licensed under either of
Expand Down
79 changes: 79 additions & 0 deletions examples/async_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use embassy_executor::Spawner;
use embassy_futures::join::join5;
use embassy_time::Timer;

use std::{mem, process};

use atomic_pool::{pool, Box};

#[derive(Debug)]
#[allow(dead_code)]
struct Packet(u32);

// A maximum of 2 Packet instances can be allocated at a time.
// A maximum of 1 future can be waiting at a time.
pool!(PacketPool: [Packet; 2], 1);

#[embassy_executor::task]
async fn run() {
// Allocate non-blocking
let fut1 = async {
println!("1 - allocating async...");
let box1 = Box::<PacketPool>::new(Packet(1));
println!("1 - allocated: {:?}", box1);
Timer::after_millis(100).await;
println!("1 - dropping allocation...");
mem::drop(box1);
};

// Allocate asynchronously
let fut2 = async {
Timer::after_millis(5).await;
println!("2 - allocating sync...");
let box2 = Box::<PacketPool>::new_async(Packet(2)).await;
println!("2 - allocated: {:?}", box2);
Timer::after_millis(150).await;
println!("2 - dropping allocation...");
mem::drop(box2);
};

// Allocate non-blocking (fails, data pool is full)
let fut3 = async {
Timer::after_millis(10).await;
println!("3 - allocating sync...");
let box3 = Box::<PacketPool>::new(Packet(3));
println!(
"3 - allocation fails because the data pool is full: {:?}",
box3
);
};

// Allocate asynchronously (waits for a deallocation)
let fut4 = async {
Timer::after_millis(15).await;
println!("4 - allocating async...");
let box4 = Box::<PacketPool>::new_async(Packet(4)).await;
println!("4 - allocated: {:?}", box4);
Timer::after_millis(100).await;
println!("4 - dropping allocation...");
};

// Allocate asynchronously (fails, waker pool is full)
let fut5 = async {
Timer::after_millis(20).await;
println!("5 - allocating async...");
let box5 = Box::<PacketPool>::new_async(Packet(5)).await;
println!(
"5 - allocation fails because the waker pool is full: {:?}",
box5
);
};

join5(fut1, fut2, fut3, fut4, fut5).await;
process::exit(0);
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
spawner.spawn(run()).unwrap();
}
1 change: 1 addition & 0 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::mem;
use atomic_pool::{pool, Box};

#[derive(Debug)]
#[allow(dead_code)]
struct Packet(u32);

pool!(PacketPool: [Packet; 4]);
Expand Down
117 changes: 110 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,125 @@ mod atomic_bitset;

use atomic_polyfill::AtomicU32;
use core::cell::UnsafeCell;
#[cfg(feature = "async")]
use core::future::{poll_fn, Future};
use core::hash::{Hash, Hasher};
use core::mem::MaybeUninit;
use core::ops::{Deref, DerefMut};
#[cfg(feature = "async")]
use core::task::Poll;
use core::{cmp, mem, ptr::NonNull};

#[cfg(feature = "async")]
use embassy_sync::waitqueue::AtomicWaker;

use crate::atomic_bitset::AtomicBitset;

/// Implementation detail. Not covered by semver guarantees.
#[doc(hidden)]
pub trait PoolStorage<T> {
fn alloc(&self) -> Option<NonNull<T>>;
#[cfg(feature = "async")]
fn alloc_async(&self) -> impl Future<Output = Option<NonNull<T>>>;
unsafe fn free(&self, p: NonNull<T>);
}

/// Implementation detail. Not covered by semver guarantees.
#[doc(hidden)]
pub struct PoolStorageImpl<T, const N: usize, const K: usize>
pub struct PoolStorageImpl<T, const N: usize, const K: usize, const WN: usize, const WK: usize>
where
[AtomicU32; K]: Sized,
[AtomicU32; WK]: Sized,
{
used: AtomicBitset<N, K>,
data: [UnsafeCell<MaybeUninit<T>>; N],
#[cfg(feature = "async")]
wakers_used: AtomicBitset<WN, WK>,
#[cfg(feature = "async")]
wakers: [AtomicWaker; WN],
}

unsafe impl<T, const N: usize, const K: usize> Send for PoolStorageImpl<T, N, K> {}
unsafe impl<T, const N: usize, const K: usize> Sync for PoolStorageImpl<T, N, K> {}
unsafe impl<T, const N: usize, const K: usize, const WN: usize, const WK: usize> Send
for PoolStorageImpl<T, N, K, WN, WK>
{
}
unsafe impl<T, const N: usize, const K: usize, const WN: usize, const WK: usize> Sync
for PoolStorageImpl<T, N, K, WN, WK>
{
}

impl<T, const N: usize, const K: usize> PoolStorageImpl<T, N, K>
impl<T, const N: usize, const K: usize, const WN: usize, const WK: usize>
PoolStorageImpl<T, N, K, WN, WK>
where
[AtomicU32; K]: Sized,
[AtomicU32; WK]: Sized,
{
const UNINIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit());

#[cfg(feature = "async")]
const WAKER: AtomicWaker = AtomicWaker::new();

pub const fn new() -> Self {
Self {
used: AtomicBitset::new(),
data: [Self::UNINIT; N],
#[cfg(feature = "async")]
wakers_used: AtomicBitset::new(),
#[cfg(feature = "async")]
wakers: [Self::WAKER; WN],
}
}
}

impl<T, const N: usize, const K: usize> PoolStorage<T> for PoolStorageImpl<T, N, K>
impl<T, const N: usize, const K: usize, const WN: usize, const WK: usize> PoolStorage<T>
for PoolStorageImpl<T, N, K, WN, WK>
where
[AtomicU32; K]: Sized,
[AtomicU32; WK]: Sized,
{
/// Returns an item from the data pool, if available.
/// Returns None if the data pool is full.
fn alloc(&self) -> Option<NonNull<T>> {
let n = self.used.alloc()?;
let p = self.data[n].get() as *mut T;
Some(unsafe { NonNull::new_unchecked(p) })
}

/// Wait until an item is available in the data pool, then return it.
/// Returns None if the waker pool is full.
#[cfg(feature = "async")]
fn alloc_async(&self) -> impl Future<Output = Option<NonNull<T>>> {
poll_fn(move |cx| {
// Check if there is a free slot in the data pool
if let Some(n) = self.used.alloc() {
let p = self.data[n].get() as *mut T;
return Poll::Ready(Some(unsafe { NonNull::new_unchecked(p) }));
}

// Try to allocate a waker slot
match self.wakers_used.alloc() {
Some(n) => {
self.wakers[n].register(cx.waker());
Poll::Pending
}
None => Poll::Ready(None), // No waker slots available
}
})
}

/// safety: p must be a pointer obtained from self.alloc that hasn't been freed yet.
unsafe fn free(&self, p: NonNull<T>) {
let origin = self.data.as_ptr() as *mut T;
let n = p.as_ptr().offset_from(origin);
assert!(n >= 0);
assert!((n as usize) < N);
self.used.free(n as usize);

// Wake up any wakers waiting for a slot
#[cfg(feature = "async")]
for waker in self.wakers.iter() {
waker.wake();
}
}
}

Expand All @@ -82,6 +143,8 @@ pub struct Box<P: Pool> {
}

impl<P: Pool> Box<P> {
/// Returns an item from the data pool, if available.
/// Returns None if the data pool is full.
pub fn new(item: P::Item) -> Option<Self> {
let p = match P::get().alloc() {
Some(p) => p,
Expand All @@ -91,6 +154,18 @@ impl<P: Pool> Box<P> {
Some(Self { ptr: p })
}

/// Wait until an item is available in the data pool, then return it.
/// Returns None if the waker pool is full.
#[cfg(feature = "async")]
pub async fn new_async(item: P::Item) -> Option<Self> {
let p = match P::get().alloc_async().await {
Some(p) => p,
None => return None,
};
unsafe { p.as_ptr().write(item) };
Some(Self { ptr: p })
}

pub fn into_raw(b: Self) -> NonNull<P::Item> {
let res = b.ptr;
mem::forget(b);
Expand Down Expand Up @@ -231,20 +306,48 @@ where
}
}

/// Create a item pool of a given type and size.
/// If the `async` feature is enabled, a waker pool is also created with a given size.
/// The waker pool is used to wake up tasks waiting for an item to become available in the data pool.
/// The waker pool size should be at least the number of tasks that can be waiting for an item at the same time.
/// The default waker pool size is zero, changing it without the `async` feature enabled will have no effect.
/// Example:
/// ```
/// use atomic_pool::{pool, Box};
///
/// #[derive(Debug)]
/// #[allow(dead_code)]
/// struct Packet(u32);
///
/// // pool!(PacketPool: [Packet; 4], 2); // This would create a waker pool of size 2
/// pool!(PacketPool: [Packet; 4]);
/// ```
#[macro_export]
macro_rules! pool {
($vis:vis $name:ident: [$ty:ty; $n:expr]) => {
$vis struct $name { _uninhabited: ::core::convert::Infallible }
impl $crate::Pool for $name {
type Item = $ty;
type Storage = $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}>;
type Storage = $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, 0, 0>;
fn get() -> &'static Self::Storage {
static POOL: $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, 0, 0> = $crate::PoolStorageImpl::new();
&POOL
}
}
};
($vis:vis $name:ident: [$ty:ty; $n:expr], $wn:expr) => {
$vis struct $name { _uninhabited: ::core::convert::Infallible }
impl $crate::Pool for $name {
type Item = $ty;
type Storage = $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, {$wn}, {($wn+31)/32}>;
fn get() -> &'static Self::Storage {
static POOL: $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}> = $crate::PoolStorageImpl::new();
static POOL: $crate::PoolStorageImpl<$ty, {$n}, {($n+31)/32}, {$wn}, {($wn+31)/32}> = $crate::PoolStorageImpl::new();
&POOL
}
}
};
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit fc64881

Please sign in to comment.