Skip to content

Commit

Permalink
Add functions to take/release just producer or consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
timvisee committed Nov 12, 2020
1 parent cb51821 commit 59a040e
Showing 1 changed file with 236 additions and 12 deletions.
248 changes: 236 additions & 12 deletions core/src/bbbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ use core::{
result::Result as CoreResult,
slice::from_raw_parts_mut,
sync::atomic::{
AtomicBool, AtomicUsize,
AtomicBool, AtomicU8, AtomicUsize,
Ordering::{AcqRel, Acquire, Release},
},
};
pub use generic_array::typenum::consts;
use generic_array::{ArrayLength, GenericArray};

/// Bit to define producer taken.
const BIT_PRODUCER: u8 = 1 << 0;

/// Bit to define consumer taken.
const BIT_CONSUMER: u8 = 1 << 1;

/// A backing structure for a BBQueue. Can be used to create either
/// a BBQueue or a split Producer/Consumer pair
pub struct BBBuffer<N: ArrayLength<u8>>(
Expand All @@ -41,7 +47,7 @@ where
/// is placed at `static` scope within the `.bss` region, the explicit initialization
/// will be elided (as it is already performed as part of memory initialization)
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// while splitting.
///
/// ```rust
Expand All @@ -64,7 +70,13 @@ where
/// # }
/// ```
pub fn try_split(&'a self) -> Result<(Producer<'a, N>, Consumer<'a, N>)> {
if atomic::swap(&self.0.already_split, true, AcqRel) {
// Set producer/consumer taken bit, error and reset if one was already set
let prev = self
.0
.split_prod_cons
.fetch_or(BIT_PRODUCER | BIT_CONSUMER, AcqRel);
if prev > 0 {
self.0.split_prod_cons.store(prev, Release);
return Err(Error::AlreadySplit);
}

Expand Down Expand Up @@ -101,14 +113,82 @@ where
/// is placed at `static` scope within the `.bss` region, the explicit initialization
/// will be elided (as it is already performed as part of memory initialization)
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while splitting.
pub fn try_split_framed(&'a self) -> Result<(FrameProducer<'a, N>, FrameConsumer<'a, N>)> {
let (producer, consumer) = self.try_split()?;
Ok((FrameProducer { producer }, FrameConsumer { consumer }))
}

/// Attempt to release the Producer and Consumer
/// Attempt to take a `Producer` from the `BBBuffer` to gain access to the
/// buffer. If a producer has already been taken, an error will be returned.
///
/// NOTE: When splitting, the underlying buffer will be explicitly initialized
/// to zero. This may take a measurable amount of time, depending on the size
/// of the buffer. This is necessary to prevent undefined behavior. If the buffer
/// is placed at `static` scope within the `.bss` region, the explicit initialization
/// will be elided (as it is already performed as part of memory initialization)
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// while splitting.
pub fn try_take_producer(&'a self) -> Result<Producer<'a, N>> {
// Set producer taken bit, error if already set
if self.0.split_prod_cons.fetch_or(BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 {
return Err(Error::AlreadySplit);
}

unsafe {
// TODO: do we need to zero buffer here, like try_split?
// // Explicitly zero the data to avoid undefined behavior.
// // This is required, because we hand out references to the buffers,
// // which mean that creating them as references is technically UB for now
// let mu_ptr = self.0.buf.get();
// (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);

let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);

Ok(Producer {
bbq: nn1,
pd: PhantomData,
})
}
}

/// Attempt to take a `Consumer` from the `BBBuffer` to gain access to the
/// buffer. If a consumer has already been taken, an error will be returned.
///
/// NOTE: When splitting, the underlying buffer will be explicitly initialized
/// to zero. This may take a measurable amount of time, depending on the size
/// of the buffer. This is necessary to prevent undefined behavior. If the buffer
/// is placed at `static` scope within the `.bss` region, the explicit initialization
/// will be elided (as it is already performed as part of memory initialization)
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical section
/// while splitting.
pub fn try_take_consumer(&'a self) -> Result<Consumer<'a, N>> {
// Set producer taken bit, error if already set
if self.0.split_prod_cons.fetch_or(BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 {
return Err(Error::AlreadySplit);
}

unsafe {
// TODO: do we need to zero buffer here, like try_split?
// // Explicitly zero the data to avoid undefined behavior.
// // This is required, because we hand out references to the buffers,
// // which mean that creating them as references is technically UB for now
// let mu_ptr = self.0.buf.get();
// (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);

let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);

Ok(Consumer {
bbq: nn1,
pd: PhantomData,
})
}
}

/// Attempt to release the `Producer` and `Consumer`
///
/// This re-initializes the buffer so it may be split in a different mode at a later
/// time. There must be no read or write grants active, or an error will be returned.
Expand Down Expand Up @@ -178,7 +258,7 @@ where
self.0.last.store(0, Release);

// Mark the buffer as ready to be split
self.0.already_split.store(false, Release);
self.0.split_prod_cons.store(0, Release);

Ok(())
}
Expand All @@ -201,6 +281,148 @@ where
(FrameProducer { producer }, FrameConsumer { consumer })
})
}

/// Attempt to release the `Producer`.
///
/// This re-initializes the buffer so it may be split in a different mode at a later
/// time. There must be no read or write grants active, or an error will be returned.
///
/// The `Producer` ust be from THIS `BBBuffer`, or an error will be returned.
///
/// ```rust
/// # // bbqueue test shim!
/// # fn bbqtest() {
/// use bbqueue::{BBBuffer, consts::*};
///
/// // Create and split a new buffer
/// let buffer: BBBuffer<U6> = BBBuffer::new();
/// let (prod, cons) = buffer.try_split().unwrap();
///
/// // Not possible to split twice
/// assert!(buffer.try_split().is_err());
///
/// // Release the producer and consumer
/// assert!(buffer.try_release(prod, cons).is_ok());
///
/// // Split the buffer in framed mode
/// let (fprod, fcons) = buffer.try_split_framed().unwrap();
/// # // bbqueue test shim!
/// # }
/// #
/// # fn main() {
/// # #[cfg(not(feature = "thumbv6"))]
/// # bbqtest();
/// # }
/// ```
pub fn try_release_producer(
&'a self,
prod: Producer<'a, N>,
) -> CoreResult<(), Producer<'a, N>> {
// Note: Re-entrancy is not possible because we require ownership
// of the producer, which are not cloneable. We also
// can assume the buffer has been split, because

// Is this our producer?
let our_prod = prod.bbq.as_ptr() as *const Self == self;

if !(our_prod) {
// Can't release, not our producer
return Err(prod);
}

let wr_in_progress = self.0.write_in_progress.load(Acquire);
let rd_in_progress = self.0.read_in_progress.load(Acquire);

if wr_in_progress || rd_in_progress {
// Can't release, active grant(s) in progress
return Err(prod);
}

// Drop the producer
drop(prod);

// Re-initialize the buffer (not totally needed, but nice to do)
self.0.write.store(0, Release);
self.0.read.store(0, Release);
self.0.reserve.store(0, Release);
self.0.last.store(0, Release);

// Mark the buffer as ready to retake producer
self.0.split_prod_cons.fetch_and(!BIT_PRODUCER, Release);

Ok(())
}

/// Attempt to release the `Consumer`.
///
/// This re-initializes the buffer so it may be split in a different mode at a later
/// time. There must be no read or write grants active, or an error will be returned.
///
/// The `Consumer` must be from THIS `BBBuffer`, or an error will be returned.
///
/// ```rust
/// # // bbqueue test shim!
/// # fn bbqtest() {
/// use bbqueue::{BBBuffer, consts::*};
///
/// // Create and split a new buffer
/// let buffer: BBBuffer<U6> = BBBuffer::new();
/// let (prod, cons) = buffer.try_split().unwrap();
///
/// // Not possible to split twice
/// assert!(buffer.try_split().is_err());
///
/// // Release the producer and consumer
/// assert!(buffer.try_release(prod, cons).is_ok());
///
/// // Split the buffer in framed mode
/// let (fprod, fcons) = buffer.try_split_framed().unwrap();
/// # // bbqueue test shim!
/// # }
/// #
/// # fn main() {
/// # #[cfg(not(feature = "thumbv6"))]
/// # bbqtest();
/// # }
/// ```
pub fn try_release_consumer(
&'a self,
cons: Consumer<'a, N>,
) -> CoreResult<(), Consumer<'a, N>> {
// Note: Re-entrancy is not possible because we require ownership
// of the consumer, which are not cloneable. We also
// can assume the buffer has been split, because

// Is this our consumer?
let our_cons = cons.bbq.as_ptr() as *const Self == self;

if !(our_cons) {
// Can't release, not our consumer
return Err(cons);
}

let wr_in_progress = self.0.write_in_progress.load(Acquire);
let rd_in_progress = self.0.read_in_progress.load(Acquire);

if wr_in_progress || rd_in_progress {
// Can't release, active grant(s) in progress
return Err(cons);
}

// Drop the consumer
drop(cons);

// Re-initialize the buffer (not totally needed, but nice to do)
self.0.write.store(0, Release);
self.0.read.store(0, Release);
self.0.reserve.store(0, Release);
self.0.last.store(0, Release);

// Mark the buffer as ready to retake consumer
self.0.split_prod_cons.fetch_and(!BIT_CONSUMER, Release);

Ok(())
}
}

/// `const-fn` version BBBuffer
Expand Down Expand Up @@ -237,8 +459,10 @@ pub struct ConstBBBuffer<A> {
/// Is there an active write grant?
write_in_progress: AtomicBool,

/// Have we already split?
already_split: AtomicBool,
/// Whether we have split the producer and/or consumer parts off.
///
/// See the `BIT_PRODUCER` and `BIT_CONSUMER` bits which define what parts have been split off.
split_prod_cons: AtomicU8,
}

impl<A> ConstBBBuffer<A> {
Expand Down Expand Up @@ -292,7 +516,7 @@ impl<A> ConstBBBuffer<A> {
write_in_progress: AtomicBool::new(false),

/// We haven't split at the start
already_split: AtomicBool::new(false),
split_prod_cons: AtomicU8::new(0),
}
}
}
Expand Down Expand Up @@ -820,7 +1044,7 @@ where
/// If `used` is larger than the given grant, the maximum amount will
/// be commited
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while committing.
pub fn commit(mut self, used: usize) {
self.commit_inner(used);
Expand Down Expand Up @@ -947,7 +1171,7 @@ where
/// If `used` is larger than the given grant, the full grant will
/// be released.
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while releasing.
pub fn release(mut self, used: usize) {
// Saturate the grant release
Expand Down Expand Up @@ -1065,7 +1289,7 @@ where
/// If `used` is larger than the given grant, the full grant will
/// be released.
///
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// NOTE: If the `thumbv6` feature is selected, this function takes a short critical
/// section while releasing.
pub fn release(mut self, used: usize) {
// Saturate the grant release
Expand Down

0 comments on commit 59a040e

Please sign in to comment.