Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic allocations #12

Merged
merged 3 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Write for ExtBuf<'_> {
// wrap around adjustment of pointer
self.header = header;
// lock the new memory region
self.header.set();
self.header.lock();
self.initialized = 0;
// copy existing data from tail to new memory region
let _ = self.write(old)?;
Expand Down
11 changes: 5 additions & 6 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ use crate::USIZELEN;
pub(crate) struct Header(pub(crate) *mut usize);
/// Allocation guard, used through its Drop implementation, which
/// unlocks (releases) the memory region back to allocator
pub(crate) struct Guard(&'static mut usize);
pub(crate) struct Guard(&'static AtomicUsize);

impl Header {
/// lock the memory region, preventing allocator
/// from reusing it until it's unlocked
#[inline(always)]
pub(crate) fn set(&self) {
pub(crate) fn lock(&self) {
unsafe { *self.0 |= 1 }
}

Expand All @@ -35,7 +35,7 @@ impl Header {
#[inline(always)]
pub(crate) fn available(&self) -> bool {
let atomic = self.0 as *const AtomicUsize;
(unsafe { &*atomic }).load(Ordering::Acquire) & 1 == 0
(unsafe { &*atomic }).load(Ordering::Relaxed) & 1 == 0
}

/// distance (in `size_of<usize>()`) to given guard
Expand Down Expand Up @@ -64,7 +64,7 @@ impl Header {
impl From<Header> for Guard {
#[inline(always)]
fn from(value: Header) -> Self {
Self(unsafe { &mut *value.0 })
Self(unsafe { &*(value.0 as *const AtomicUsize) })
}
}

Expand All @@ -83,7 +83,6 @@ impl Drop for Guard {
// The caller of `drop` is the sole entity capable of writing to the region at this point
// in the execution timeline. Importantly, the allocator is restricted from write access
// the region until it is released.
let atomic = self.0 as *mut usize as *const AtomicUsize;
unsafe { &*atomic }.fetch_and(usize::MAX << 1, Ordering::Relaxed);
self.0.fetch_and(usize::MAX << 1, Ordering::Relaxed);
}
}
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl RingAl {
///
/// NOTE: not entire capacity will be available for allocation due to guard headers being part
/// of allocations and taking up space as well
pub fn new_with_align(mut size: usize, align: usize) -> Self {
fn new_with_align(mut size: usize, align: usize) -> Self {
assert!(size > USIZELEN && size < usize::MAX >> 1 && align.is_power_of_two());
size = size.next_power_of_two();
let inner = unsafe { alloc(Layout::from_size_align_unchecked(size, align)) };
Expand All @@ -188,7 +188,7 @@ impl RingAl {
#[inline(always)]
pub fn fixed(&mut self, size: usize) -> Option<FixedBufMut> {
let header = self.alloc(size, USIZEALIGN)?;
header.set();
header.lock();
let ptr = header.buffer();
let capacity = header.capacity();
let inner = unsafe { std::slice::from_raw_parts_mut(ptr, capacity) };
Expand All @@ -208,7 +208,7 @@ impl RingAl {
/// prevents any further allocations while this buffer is around
pub fn extendable(&mut self, size: usize) -> Option<ExtBuf<'_>> {
let header = self.alloc(size, USIZEALIGN)?;
header.set();
header.lock();
Some(ExtBuf {
header,
initialized: 0,
Expand All @@ -222,7 +222,7 @@ impl RingAl {
let tsize = size_of::<T>();
let size = count * tsize;
let header = self.alloc(size, align_of::<T>())?;
header.set();
header.lock();
let buffer = header.buffer();
let offset = buffer.align_offset(align_of::<T>());
let inner = unsafe { buffer.add(offset) } as *mut T;
Expand Down Expand Up @@ -340,7 +340,7 @@ impl Drop for RingAl {
loop {
// Busy wait until the current allocation is marked as available
if !next.available() {
std::thread::sleep(Duration::from_millis(100));
std::thread::sleep(Duration::from_millis(50));
continue;
}
// Reconstruct the original capacity used for the backing store
Expand All @@ -360,7 +360,7 @@ impl Drop for RingAl {
let inner = next.0;
head = head.min(inner)
}
let layout = unsafe { Layout::from_size_align_unchecked(capacity, 64) };
let layout = unsafe { Layout::from_size_align_unchecked(capacity, DEFAULT_ALIGNMENT) };
// SAFETY:
// 1. All pointers are guaranteed to lie within the original backing store.
// 2. The initial slice length has been accurately recalculated.
Expand Down
15 changes: 10 additions & 5 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
io::Write,
mem::transmute,
sync::mpsc::sync_channel,
time::Instant,
time::{Duration, Instant},
};

use crate::{
Expand Down Expand Up @@ -87,7 +87,7 @@ fn test_multi_alloc() {
"should have enough capacity for all allocations"
);
let header = header.unwrap();
header.set();
header.lock();
allocations.push(header.into());
}
let header = ringal.alloc(SIZE / COUNT - USIZELEN, USIZEALIGN);
Expand All @@ -110,7 +110,7 @@ fn test_continuous_alloc() {
let header = ringal.alloc(size as usize, USIZEALIGN);
assert!(header.is_some(), "should always have capacity");
let header = header.unwrap();
header.set();
header.lock();
allocations.push_back(header.into());
if allocations.len() == 4 {
allocations.pop_front();
Expand Down Expand Up @@ -299,17 +299,19 @@ fn test_fixed_buf_continuous_alloc_multi_thread() {
let (mut ringal, _g) = setup!();
let mut string = Vec::<u8>::new();
let mut threads = Vec::with_capacity(4);
let mut handles = Vec::with_capacity(4);
const ITERS: usize = 8192;
for i in 0..4 {
let (tx, rx) = sync_channel::<FixedBuf>(2);
std::thread::spawn(move || {
let h = std::thread::spawn(move || {
let mut number = 0;
while let Ok(s) = rx.recv() {
number += s.len() * i;
}
number
});
threads.push(tx);
handles.push(h);
}
for i in 0..ITERS {
let random = unsafe { transmute::<Instant, (usize, u32)>(Instant::now()).0 };
Expand All @@ -335,6 +337,10 @@ fn test_fixed_buf_continuous_alloc_multi_thread() {
tx.send(buffer.clone()).unwrap();
}
}
drop(threads);
for h in handles {
let _ = h.join();
}
}

#[cfg(feature = "tls")]
Expand Down Expand Up @@ -422,7 +428,6 @@ macro_rules! test_generic_buf {
"buffer should swap remove all pushed elements"
);
let removed = &elem.unwrap().i;
println!("{} removing {removed}", stringify!($int));
assert!(indices.remove(&removed));
}
assert_eq!(buffer.len(), 0);
Expand Down
Loading