Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic allocations #11

Merged
merged 4 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ edition = "2021"
[dependencies]

[features]
default = []
default = ["generic"]
drop = []
tls = ["drop"]
generic = []
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ _Note_: `Head` and `Tail` canaries are fixed regular guards, with the exception
4. **Read-Only Buffers**: Cloneable and shareable across threads. They're
efficient but involve extra heap allocation for reference counting, so use
them wisely.
5. **Thread local storage allocator**: allocator instance can be created for
5. Generic `Vec<T>` like fixed capacity buffers, these can be allocated from
the same backing store as regular `u8` buffers
6. **Thread local storage allocator**: allocator instance can be created for
each thread, and accessed from anywhere in the code, removeing the need to
pass the allocator around

Expand Down Expand Up @@ -132,6 +134,21 @@ assert_eq!(buffer.len(), size);
assert_eq!(buffer.spare(), 256 - size);
```

## Generic Buffer
```rust
let mut allocator = RingAl::new(1024); // Create an allocator with initial size
struct MyType {
field1: usize,
field2: u128
}
let mut buffer = allocator.generic::<MyType>(16).unwrap();
buffer.push(MyType { field1: 42, field2: 43 });
assert_eq!(buffer.len(), 1);
let t = buffer.pop().unwrap();
assert_eq!(t.field1, 42);
assert_eq!(t.field2, 43);
```

## Multi-threaded Environment
```rust
let mut allocator = RingAl::new(1024); // Create an allocator with initial size
Expand Down
145 changes: 143 additions & 2 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use crate::{
header::{Guard, Header},
RingAl,
RingAl, USIZEALIGN,
};

/// Extendable buffer. It dynamically grows to accomodate any extra data beyond the current
Expand Down Expand Up @@ -73,7 +73,7 @@ impl Write for ExtBuf<'_> {
return Ok(count);
}
// extend the buffer with the missing capacity
let Some(header) = self.ringal.alloc(count - available) else {
let Some(header) = self.ringal.alloc(count - available, USIZEALIGN) else {
return Err(io::Error::new(
io::ErrorKind::StorageFull,
"allocator's capacity exhausted",
Expand Down Expand Up @@ -163,12 +163,14 @@ impl FixedBufMut {
impl Deref for FixedBufMut {
type Target = [u8];

#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { self.inner.get_unchecked(..self.initialized) }
}
}

impl DerefMut for FixedBufMut {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { self.inner.get_unchecked_mut(..self.initialized) }
}
Expand Down Expand Up @@ -203,7 +205,146 @@ pub struct FixedBuf {
impl Deref for FixedBuf {
type Target = [u8];

#[inline(always)]
fn deref(&self) -> &Self::Target {
self.inner
}
}

/// Mutable fixed buffer, which is generic over its elements T, with `Vec<T>` like API
#[cfg(feature = "generic")]
pub struct GenericBufMut<T: Sized + 'static> {
/// the lock release mechanism upon drop
pub(crate) _guard: Guard,
/// exclusively owned slice from backing store
/// properly aligned for T
pub(crate) inner: &'static mut [T],
/// current item count in buffer
pub(crate) initialized: usize,
}

#[cfg(feature = "generic")]
mod generic {
use super::{Deref, DerefMut, GenericBufMut};

impl<T> Deref for GenericBufMut<T> {
type Target = [T];

#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { self.inner.get_unchecked(..self.initialized) }
}
}

impl<T> DerefMut for GenericBufMut<T> {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { self.inner.get_unchecked_mut(..self.initialized) }
}
}

impl<T> GenericBufMut<T> {
/// Returns the total capacity of the buffer, i.e., the number of elements it can hold.
pub fn capacity(&self) -> usize {
self.inner.len()
}

/// Pushes an element to the end of the buffer.
///
/// If the buffer is full it returns the element as `Some(value)`. Otherwise, it returns `None`
/// after successfully adding the element to end. Operation is O(1)
pub fn push(&mut self, value: T) -> Option<T> {
if self.inner.len() == self.initialized {
return Some(value);
}
unsafe {
let cell = self.inner.get_unchecked_mut(self.initialized) as *mut T;
std::ptr::write(cell, value);
}
self.initialized += 1;
None
}

/// Inserts an element at the specified index in the buffer.
///
/// If the index is greater than the number of initialized elements, the operation fails and
/// `value` is returned. Otherwise, it shifts elements to the right and inserts the new
/// element, returning `None`. This operation is O(N).
pub fn insert(&mut self, mut value: T, mut index: usize) -> Option<T> {
if self.initialized < index {
return Some(value);
}
self.initialized += 1;
unsafe {
while index < self.initialized {
let cell = self.inner.get_unchecked_mut(index) as *mut T;
let temp = std::ptr::read(cell as *const T);
std::ptr::write(cell, value);
value = temp;
index += 1;
}
}
std::mem::forget(value);
None
}

/// Removes and returns the last element from the buffer.
///
/// Returns `None` if the buffer is empty. The operation is O(1)
pub fn pop(&mut self) -> Option<T> {
(self.initialized != 0).then_some(())?;
self.initialized -= 1;
let value = unsafe { self.inner.get_unchecked_mut(self.initialized) };
let owned = unsafe { std::ptr::read(value as *const T) };
Some(owned)
}

/// Removes and returns the element at the specified index from the buffer.
///
/// Shifts all elements following the index to the left to fill the gap.
/// Returns `None` if the index is out of bounds. The operation is O(N)
pub fn remove(&mut self, mut index: usize) -> Option<T> {
(self.initialized > index).then_some(())?;
if self.initialized - 1 == index {
return self.pop();
}
let mut value = unsafe { self.inner.get_unchecked_mut(index) } as *mut T;
let element = unsafe { std::ptr::read(value) };
self.initialized -= 1;
unsafe {
while index < self.initialized {
index += 1;
let next = self.inner.get_unchecked_mut(index) as *mut T;
{
let next = std::ptr::read(next);
std::ptr::write(value, next);
}
value = next;
}
}
Some(element)
}

/// Tries to remove and return the element at the specified index, replacing
/// it with the last element. The operation is O(1)
pub fn swap_remove(&mut self, index: usize) -> Option<T> {
(index < self.initialized).then_some(())?;
if self.initialized - 1 == index {
return self.pop();
}

// Swap the element at the given index with the last element
self.initialized -= 1;
let element = unsafe {
let last = self.inner.get_unchecked_mut(self.initialized) as *mut T;
let value = self.inner.get_unchecked_mut(index) as *mut T;
let element = std::ptr::read(value);
let temp = std::ptr::read(last);
std::ptr::write(value, temp);
element
};

Some(element)
}
}
}
4 changes: 2 additions & 2 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Header {
#[inline(always)]
pub(crate) fn available(&self) -> bool {
let atomic = self.0 as *const AtomicUsize;
(unsafe { &*atomic }).load(Ordering::Relaxed) & 1 == 0
(unsafe { &*atomic }).load(Ordering::Acquire) & 1 == 0
}

/// distance (in `size_of<usize>()`) to given guard
Expand Down Expand Up @@ -84,6 +84,6 @@ impl Drop for Guard {
// in the execution timeline. Importantly, the allocator is restricted from write access
// the region until it is released.
let atomic = self.0 as *mut usize as *const AtomicUsize;
unsafe { &*atomic }.fetch_and(usize::MAX << 1, Ordering::Release);
unsafe { &*atomic }.fetch_and(usize::MAX << 1, Ordering::Relaxed);
}
}
57 changes: 47 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@
//! tx.send(buffer);
//! handle.join().unwrap();
//! ```
//! ### Generic buffers
//! ```rust
//! # use ringal::{ RingAl, GenericBufMut };
//! # let mut allocator = RingAl::new(1024); // Create an allocator with initial size
//!
//! struct MyType {
//! field1: usize,
//! field2: u128
//! }
//! let mut buffer = allocator.generic::<MyType>(16).unwrap();
//! buffer.push(MyType { field1: 42, field2: 43 });
//! assert_eq!(buffer.len(), 1);
//! let t = buffer.pop().unwrap();
//! assert_eq!(t.field1, 42);
//! assert_eq!(t.field2, 43);
//! ```
//!
//! ### Thread Local Storage
//! ```rust
Expand All @@ -107,6 +123,7 @@
//! println!("bytes written: {}", fixed.len());
//! ```
//!
//!
//! ## Safety Considerations
//!
//! While `unsafe` code is used for performance reasons, the public API is designed to be safe.
Expand All @@ -115,7 +132,7 @@

use std::alloc::{alloc, Layout};

pub use buffer::{ExtBuf, FixedBuf, FixedBufMut};
pub use buffer::{ExtBuf, FixedBuf, FixedBufMut, GenericBufMut};
use header::Header;

/// Ring Allocator, see crate level documentation on features and usage
Expand All @@ -129,6 +146,7 @@ const USIZELEN: usize = size_of::<usize>();
/// capacity to multiple of 8 machine words (converted to byte count)
const MINALLOC: usize = 8;
const DEFAULT_ALIGNMENT: usize = 64;
const USIZEALIGN: usize = align_of::<usize>();

impl RingAl {
/// Initialize ring allocator with given capacity and default alignment. Allocated capacity
Expand Down Expand Up @@ -169,7 +187,7 @@ impl RingAl {
/// `None`, if backing store doesn't have enough capacity available.
#[inline(always)]
pub fn fixed(&mut self, size: usize) -> Option<FixedBufMut> {
let header = self.alloc(size)?;
let header = self.alloc(size, USIZEALIGN)?;
header.set();
let ptr = header.buffer();
let capacity = header.capacity();
Expand All @@ -189,7 +207,7 @@ impl RingAl {
/// allocator (used to dynamically grow the buffer), which effectively
/// prevents any further allocations while this buffer is around
pub fn extendable(&mut self, size: usize) -> Option<ExtBuf<'_>> {
let header = self.alloc(size)?;
let header = self.alloc(size, USIZEALIGN)?;
header.set();
Some(ExtBuf {
header,
Expand All @@ -199,18 +217,36 @@ impl RingAl {
})
}

/// Try to lock at least `size` bytes from backing store
fn alloc(&mut self, mut size: usize) -> Option<Header> {
/// Allocate a fixed buffer, that can accomodate `count` elements of type T
pub fn generic<T>(&mut self, count: usize) -> Option<GenericBufMut<T>> {
let tsize = size_of::<T>();
let size = count * tsize;
let header = self.alloc(size, align_of::<T>())?;
header.set();
let buffer = header.buffer();
let offset = buffer.align_offset(align_of::<T>());
let inner = unsafe { buffer.add(offset) } as *mut T;
let inner = unsafe { std::slice::from_raw_parts_mut(inner, count) };
Some(GenericBufMut {
_guard: header.into(),
inner,
initialized: 0,
})
}

/// Try to lock at least `size` bytes from backing store, aligning them to requested address
fn alloc(&mut self, mut size: usize, align: usize) -> Option<Header> {
// we need to convert the size from count of bytes to count of usizes
size = size / USIZELEN + 1;
// begin accumulating capacity
let mut start = Header(self.head);
let mut offset = start.buffer().align_offset(align) / USIZELEN;
let mut accumulated = 0;
let mut next = start;
// when the current guard sequence references a memory region smaller than the requested size,
// an attempt is made to merge subsequent regions. This process continues until the required
// capacity is satisfied or all available capacity is exhausted.
while accumulated < size {
while accumulated < size + offset {
next.available().then_some(())?;
next = next.next();
accumulated = start.distance(&next);
Expand All @@ -225,6 +261,7 @@ impl RingAl {
if next < start {
accumulated = 0;
start = next;
offset = start.buffer().align_offset(align) / USIZELEN;
}
}
// If the difference between the accumulated capacity and the requested size is less than
Expand Down Expand Up @@ -310,10 +347,10 @@ impl Drop for RingAl {
let nnext = next.next();
// Check for possible wrap-around
if nnext > next {
capacity += next.capacity() / USIZELEN;
capacity += next.capacity();
}
// Increment capacity to account for guard size
capacity += 1;
capacity += USIZELEN;
next = nnext;
// If we have looped back to the starting point, all allocations
// are released, and the full capacity is recalculated
Expand All @@ -323,13 +360,13 @@ impl Drop for RingAl {
let inner = next.0;
head = head.min(inner)
}
let slice = std::ptr::slice_from_raw_parts_mut(head, capacity);
let layout = unsafe { Layout::from_size_align_unchecked(capacity, 64) };
// SAFETY:
// 1. All pointers are guaranteed to lie within the original backing store.
// 2. The initial slice length has been accurately recalculated.
// 3. The starting memory address is determined through wrap-around detection.
// 4. This is a controlled reclamation of a previously leaked boxed slice.
let _ = unsafe { Box::from_raw(slice) };
unsafe { std::alloc::dealloc(head as *mut u8, layout) };
}
}

Expand Down
Loading
Loading