Skip to content

Commit

Permalink
Add functions to take/release just producer or consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ithinuel committed Nov 26, 2023
1 parent 7446101 commit 5684713
Showing 1 changed file with 18 additions and 21 deletions.
39 changes: 18 additions & 21 deletions core/src/bbbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ pub struct BBBuffer<const N: usize> {
/// Is there an active write grant?
write_in_progress: AtomicBool,

/// Have we already split?
already_split: AtomicBool,

/// Whether we have split the producer and/or consumer parts off.
///
/// See the `BIT_PRODUCER` and `BIT_CONSUMER` bits which define what parts have been split off.
Expand Down Expand Up @@ -160,7 +157,7 @@ impl<'a, const N: usize> BBBuffer<N> {
/// while splitting.
pub fn try_take_producer(&'a self) -> Result<Producer<'a, N>> {
// Set producer taken bit, error if already set
if self.0.split_prod_cons.fetch_or(BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 {
if self.split_prod_cons.fetch_or(BIT_PRODUCER, AcqRel) & BIT_PRODUCER > 0 {
return Err(Error::AlreadySplit);
}

Expand All @@ -169,7 +166,7 @@ impl<'a, const N: usize> BBBuffer<N> {
// // Explicitly zero the data to avoid undefined behavior.
// // This is required, because we hand out references to the buffers,
// // which mean that creating them as references is technically UB for now
// let mu_ptr = self.0.buf.get();
// let mu_ptr = self.buf.get();
// (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);

let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
Expand All @@ -194,7 +191,7 @@ impl<'a, const N: usize> BBBuffer<N> {
/// while splitting.
pub fn try_take_consumer(&'a self) -> Result<Consumer<'a, N>> {
// Set producer taken bit, error if already set
if self.0.split_prod_cons.fetch_or(BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 {
if self.split_prod_cons.fetch_or(BIT_CONSUMER, AcqRel) & BIT_CONSUMER > 0 {
return Err(Error::AlreadySplit);
}

Expand All @@ -203,7 +200,7 @@ impl<'a, const N: usize> BBBuffer<N> {
// // Explicitly zero the data to avoid undefined behavior.
// // This is required, because we hand out references to the buffers,
// // which mean that creating them as references is technically UB for now
// let mu_ptr = self.0.buf.get();
// let mu_ptr = self.buf.get();
// (*mu_ptr).as_mut_ptr().write_bytes(0u8, 1);

let nn1 = NonNull::new_unchecked(self as *const _ as *mut _);
Expand Down Expand Up @@ -361,8 +358,8 @@ impl<'a, const N: usize> BBBuffer<N> {
return Err(prod);
}

let wr_in_progress = self.0.write_in_progress.load(Acquire);
let rd_in_progress = self.0.read_in_progress.load(Acquire);
let wr_in_progress = self.write_in_progress.load(Acquire);
let rd_in_progress = self.read_in_progress.load(Acquire);

if wr_in_progress || rd_in_progress {
// Can't release, active grant(s) in progress
Expand All @@ -373,13 +370,13 @@ impl<'a, const N: usize> BBBuffer<N> {
drop(prod);

// Re-initialize the buffer (not totally needed, but nice to do)
self.0.write.store(0, Release);
self.0.read.store(0, Release);
self.0.reserve.store(0, Release);
self.0.last.store(0, Release);
self.write.store(0, Release);
self.read.store(0, Release);
self.reserve.store(0, Release);
self.last.store(0, Release);

// Mark the buffer as ready to retake producer
self.0.split_prod_cons.fetch_and(!BIT_PRODUCER, Release);
self.split_prod_cons.fetch_and(!BIT_PRODUCER, Release);

Ok(())
}
Expand Down Expand Up @@ -432,8 +429,8 @@ impl<'a, const N: usize> BBBuffer<N> {
return Err(cons);
}

let wr_in_progress = self.0.write_in_progress.load(Acquire);
let rd_in_progress = self.0.read_in_progress.load(Acquire);
let wr_in_progress = self.write_in_progress.load(Acquire);
let rd_in_progress = self.read_in_progress.load(Acquire);

if wr_in_progress || rd_in_progress {
// Can't release, active grant(s) in progress
Expand All @@ -444,13 +441,13 @@ impl<'a, const N: usize> BBBuffer<N> {
drop(cons);

// Re-initialize the buffer (not totally needed, but nice to do)
self.0.write.store(0, Release);
self.0.read.store(0, Release);
self.0.reserve.store(0, Release);
self.0.last.store(0, Release);
self.write.store(0, Release);
self.read.store(0, Release);
self.reserve.store(0, Release);
self.last.store(0, Release);

// Mark the buffer as ready to retake consumer
self.0.split_prod_cons.fetch_and(!BIT_CONSUMER, Release);
self.split_prod_cons.fetch_and(!BIT_CONSUMER, Release);

Ok(())
}
Expand Down

0 comments on commit 5684713

Please sign in to comment.