Skip to content

Commit

Permalink
Add async functionality
Browse files Browse the repository at this point in the history
This new feature is gated behind the `async` feature flag.

It allows to asynchronously await for a pool slot to become available,
making it easier to share constrained resources between multiple tasks.

It requires the `AtomicWaker` functionality from the `embassy-sync`
crate, which in turn requires a critical section implementation.
  • Loading branch information
danielstuart14 committed Aug 19, 2024
1 parent d93c469 commit 8a7ca7f
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust-miri.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
override: true
components: miri
- name: Test
run: cargo miri test
run: cargo miri test --all-features
28 changes: 22 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,31 @@ repository = "https://github.com/embassy-rs/atomic-pool"
edition = "2021"
readme = "README.md"
license = "MIT OR Apache-2.0"
categories = [
"embedded",
"no-std",
"concurrency",
"memory-management",
]
categories = ["embedded", "no-std", "concurrency", "memory-management"]

[dependencies]
atomic-polyfill = "1.0"
as-slice-01 = { package = "as-slice", version = "0.1.5" }
as-slice-02 = { package = "as-slice", version = "0.2.1" }
stable_deref_trait = { version = "1.2.0", default-features = false }
embassy-sync = { version = "0.6.0", optional = true }

# Used by async_std example and async tests
[dev-dependencies]
embassy-executor = { version = "0.6.0", features = ["arch-std","executor-thread","integrated-timers","task-arena-size-32768"] }
embassy-time = { version = "0.3.2", features = ["std"] }
embassy-futures = "0.1.1"
critical-section = { version = "1.1", features = ["std"] }

[features]
default = []

# Allow to asynchronously wait for a pool slot to become available.
# This feature is optional and requires the `AtomicWaker` functionality from
# the `embassy-sync` crate, which in turn requires a critical section
# implementation. Check: https://crates.io/crates/critical-section
async = ["dep:embassy-sync"]

[[example]]
name = "async_std"
required-features = ["async"]
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

Statically allocated pool providing a std-like Box.

## Optional Features
- `async`<br>
Allow to asynchronously wait for a pool slot to become available. This feature requires the `AtomicWaker` functionality from the `embassy-sync` crate, which in turn requires a critical section implementation like [critical-section](https://crates.io/crates/critical-section).

## License

This work is licensed under either of
Expand Down
79 changes: 79 additions & 0 deletions examples/async_std.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use embassy_executor::Spawner;
use embassy_futures::join::join5;
use embassy_time::Timer;

use std::{mem, process};

use atomic_pool::{pool, Box};

#[derive(Debug)]
#[allow(dead_code)]
struct Packet(u32);

// A maximum of 2 Packet instances can be allocated at a time.
// A maximum of 1 future can be waiting at a time.
pool!(PacketPool: [Packet; 2], 1);

#[embassy_executor::task]
async fn run() {
// Allocate non-blocking
let fut1 = async {
println!("1 - allocating async...");
let box1 = Box::<PacketPool>::new(Packet(1));
println!("1 - allocated: {:?}", box1);
Timer::after_millis(100).await;
println!("1 - dropping allocation...");
mem::drop(box1);
};

// Allocate asynchronously
let fut2 = async {
Timer::after_millis(5).await;
println!("2 - allocating sync...");
let box2 = Box::<PacketPool>::new_async(Packet(2)).await;
println!("2 - allocated: {:?}", box2);
Timer::after_millis(150).await;
println!("2 - dropping allocation...");
mem::drop(box2);
};

// Allocate non-blocking (fails, data pool is full)
let fut3 = async {
Timer::after_millis(10).await;
println!("3 - allocating sync...");
let box3 = Box::<PacketPool>::new(Packet(3));
println!(
"3 - allocation fails because the data pool is full: {:?}",
box3
);
};

// Allocate asynchronously (waits for a deallocation)
let fut4 = async {
Timer::after_millis(15).await;
println!("4 - allocating async...");
let box4 = Box::<PacketPool>::new_async(Packet(4)).await;
println!("4 - allocated: {:?}", box4);
Timer::after_millis(100).await;
println!("4 - dropping allocation...");
};

// Allocate asynchronously (fails, waker pool is full)
let fut5 = async {
Timer::after_millis(20).await;
println!("5 - allocating async...");
let box5 = Box::<PacketPool>::new_async(Packet(5)).await;
println!(
"5 - allocation fails because the waker pool is full: {:?}",
box5
);
};

join5(fut1, fut2, fut3, fut4, fut5).await;
process::exit(0);
}

#[embassy_executor::main]
async fn main(spawner: Spawner) {
spawner.spawn(run()).unwrap();
}
1 change: 1 addition & 0 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::mem;
use atomic_pool::{pool, Box};

#[derive(Debug)]
#[allow(dead_code)]
struct Packet(u32);

pool!(PacketPool: [Packet; 4]);
Expand Down
9 changes: 9 additions & 0 deletions src/atomic_bitset.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#[cfg(feature = "async")]
pub mod droppable_bit;

use atomic_polyfill::{AtomicU32, Ordering};

/// A bitset that can be used to allocate slots in a pool
pub struct AtomicBitset<const N: usize, const K: usize>
where
[AtomicU32; K]: Sized,
Expand All @@ -16,6 +20,11 @@ where
Self { used: [Z; K] }
}

#[cfg(feature = "async")]
pub fn alloc_droppable(&self) -> Option<droppable_bit::DroppableBit<N, K>> {
self.alloc().map(|i| droppable_bit::DroppableBit::new(self, i))
}

pub fn alloc(&self) -> Option<usize> {
for (i, val) in self.used.iter().enumerate() {
let mut allocated = 0;
Expand Down
52 changes: 52 additions & 0 deletions src/atomic_bitset/droppable_bit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use super::AtomicBitset;

/// Automatically frees the Bitset slot when DroppableBit is dropped
/// Useful for async environments where the future might be dropped before it completes
pub struct DroppableBit<'a, const N: usize, const K: usize> {
bitset: &'a AtomicBitset<N, K>,
inner: usize,
}

impl<'a, const N: usize, const K: usize> DroppableBit<'a, N, K> {
/// Only a single instance of DroppableBit should be created for each slot
/// Restrict it to only be created by AtomicBitset `alloc_droppable` method
pub(super) fn new(bitset: &'a AtomicBitset<N, K>, inner: usize) -> Self {
Self { bitset, inner }
}

pub fn inner(&self) -> usize {
self.inner
}
}

impl<const N: usize, const K: usize> Drop for DroppableBit<'_, N, K> {
fn drop(&mut self) {
self.bitset.free(self.inner);
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_16() {
let s = AtomicBitset::<16, 1>::new();
let mut v = vec![];

for _ in 0..16 {
let bit = s.alloc().map(|i| DroppableBit::new(&s, i));
assert!(bit.is_some());

v.push(bit.unwrap());
}
assert_eq!(s.alloc(), None);
v.pop();
v.pop();
assert!(s.alloc().is_some());
assert!(s.alloc().is_some());
assert_eq!(s.alloc(), None);
v.pop();
assert!(s.alloc().is_some());
}
}
Loading

0 comments on commit 8a7ca7f

Please sign in to comment.