Skip to content

Commit

Permalink
chore: cleanup + improve tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bmuddha committed Dec 27, 2024
1 parent 35e71ab commit eea5ae1
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Write for ExtBuf<'_> {
// wrap around adjustment of pointer
self.header = header;
// lock the new memory region
self.header.set();
self.header.lock();
self.initialized = 0;
// copy existing data from tail to new memory region
let _ = self.write(old)?;
Expand Down
11 changes: 5 additions & 6 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use crate::USIZELEN;
pub(crate) struct Header(pub(crate) *mut usize);
/// Allocation guard, used through its Drop implementation, which
/// unlocks (releases) the memory region back to allocator
pub(crate) struct Guard(&'static mut usize);
pub(crate) struct Guard(&'static AtomicUsize);

impl Header {
/// lock the memory region, preventing allocator
/// from reusing it until it's unlocked
#[inline(always)]
pub(crate) fn set(&self) {
pub(crate) fn lock(&self) {
unsafe { *self.0 |= 1 }
}

Expand All @@ -35,7 +35,7 @@ impl Header {
#[inline(always)]
pub(crate) fn available(&self) -> bool {
let atomic = self.0 as *const AtomicUsize;
(unsafe { &*atomic }).load(Ordering::Acquire) & 1 == 0
(unsafe { &*atomic }).load(Ordering::Relaxed) & 1 == 0
}

/// distance (in `size_of<usize>()`) to given guard
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Header {
impl From<Header> for Guard {
#[inline(always)]
fn from(value: Header) -> Self {
Self(unsafe { &mut *value.0 })
Self(unsafe { &*(value.0 as *const AtomicUsize) })
}
}

Expand All @@ -83,7 +83,6 @@ impl Drop for Guard {
// The caller of `drop` is the sole entity capable of writing to the region at this point
// in the execution timeline. Importantly, the allocator is restricted from write access
// the region until it is released.
let atomic = self.0 as *mut usize as *const AtomicUsize;
unsafe { &*atomic }.fetch_and(usize::MAX << 1, Ordering::Relaxed);
self.0.fetch_and(usize::MAX << 1, Ordering::Relaxed);
}
}
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl RingAl {
///
/// NOTE: not entire capacity will be available for allocation due to guard headers being part
/// of allocations and taking up space as well
pub fn new_with_align(mut size: usize, align: usize) -> Self {
fn new_with_align(mut size: usize, align: usize) -> Self {
assert!(size > USIZELEN && size < usize::MAX >> 1 && align.is_power_of_two());
size = size.next_power_of_two();
let inner = unsafe { alloc(Layout::from_size_align_unchecked(size, align)) };
Expand All @@ -188,7 +188,7 @@ impl RingAl {
#[inline(always)]
pub fn fixed(&mut self, size: usize) -> Option<FixedBufMut> {
let header = self.alloc(size, USIZEALIGN)?;
header.set();
header.lock();
let ptr = header.buffer();
let capacity = header.capacity();
let inner = unsafe { std::slice::from_raw_parts_mut(ptr, capacity) };
Expand All @@ -208,7 +208,7 @@ impl RingAl {
/// prevents any further allocations while this buffer is around
pub fn extendable(&mut self, size: usize) -> Option<ExtBuf<'_>> {
let header = self.alloc(size, USIZEALIGN)?;
header.set();
header.lock();
Some(ExtBuf {
header,
initialized: 0,
Expand All @@ -222,7 +222,7 @@ impl RingAl {
let tsize = size_of::<T>();
let size = count * tsize;
let header = self.alloc(size, align_of::<T>())?;
header.set();
header.lock();
let buffer = header.buffer();
let offset = buffer.align_offset(align_of::<T>());
let inner = unsafe { buffer.add(offset) } as *mut T;
Expand Down Expand Up @@ -340,7 +340,7 @@ impl Drop for RingAl {
loop {
// Busy wait until the current allocation is marked as available
if !next.available() {
std::thread::sleep(Duration::from_millis(100));
std::thread::sleep(Duration::from_millis(50));
continue;
}
// Reconstruct the original capacity used for the backing store
Expand All @@ -360,7 +360,7 @@ impl Drop for RingAl {
let inner = next.0;
head = head.min(inner)
}
let layout = unsafe { Layout::from_size_align_unchecked(capacity, 64) };
let layout = unsafe { Layout::from_size_align_unchecked(capacity, DEFAULT_ALIGNMENT) };
// SAFETY:
// 1. All pointers are guaranteed to lie within the original backing store.
// 2. The initial slice length has been accurately recalculated.
Expand Down
15 changes: 10 additions & 5 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
io::Write,
mem::transmute,
sync::mpsc::sync_channel,
time::Instant,
time::{Duration, Instant},
};

use crate::{
Expand Down Expand Up @@ -87,7 +87,7 @@ fn test_multi_alloc() {
"should have enough capacity for all allocations"
);
let header = header.unwrap();
header.set();
header.lock();
allocations.push(header.into());
}
let header = ringal.alloc(SIZE / COUNT - USIZELEN, USIZEALIGN);
Expand All @@ -110,7 +110,7 @@ fn test_continuous_alloc() {
let header = ringal.alloc(size as usize, USIZEALIGN);
assert!(header.is_some(), "should always have capacity");
let header = header.unwrap();
header.set();
header.lock();
allocations.push_back(header.into());
if allocations.len() == 4 {
allocations.pop_front();
Expand Down Expand Up @@ -299,17 +299,19 @@ fn test_fixed_buf_continuous_alloc_multi_thread() {
let (mut ringal, _g) = setup!();
let mut string = Vec::<u8>::new();
let mut threads = Vec::with_capacity(4);
let mut handles = Vec::with_capacity(4);
const ITERS: usize = 8192;
for i in 0..4 {
let (tx, rx) = sync_channel::<FixedBuf>(2);
std::thread::spawn(move || {
let h = std::thread::spawn(move || {
let mut number = 0;
while let Ok(s) = rx.recv() {
number += s.len() * i;
}
number
});
threads.push(tx);
handles.push(h);
}
for i in 0..ITERS {
let random = unsafe { transmute::<Instant, (usize, u32)>(Instant::now()).0 };
Expand All @@ -335,6 +337,10 @@ fn test_fixed_buf_continuous_alloc_multi_thread() {
tx.send(buffer.clone()).unwrap();
}
}
drop(threads);
for h in handles {
let _ = h.join();
}
}

#[cfg(feature = "tls")]
Expand Down Expand Up @@ -422,7 +428,6 @@ macro_rules! test_generic_buf {
"buffer should swap remove all pushed elements"
);
let removed = &elem.unwrap().i;
println!("{} removing {removed}", stringify!($int));
assert!(indices.remove(&removed));
}
assert_eq!(buffer.len(), 0);
Expand Down

0 comments on commit eea5ae1

Please sign in to comment.