diff --git a/Cargo.toml b/Cargo.toml index cb728b7..a6d8d0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ edition = "2021" [dependencies] [features] -default = [] +default = ["generic"] drop = [] tls = ["drop"] +generic = [] diff --git a/README.md b/README.md index 763ec0a..2a1c92f 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,9 @@ _Note_: `Head` and `Tail` canaries are fixed regular guards, with the exception 4. **Read-Only Buffers**: Cloneable and shareable across threads. They're efficient but involve extra heap allocation for reference counting, so use them wisely. -5. **Thread local storage allocator**: allocator instance can be created for +5. Generic `Vec` like fixed capacity buffers, these can be allocated from + the same backing store as regular `u8` buffers +6. **Thread local storage allocator**: allocator instance can be created for each thread, and accessed from anywhere in the code, removeing the need to pass the allocator around @@ -132,6 +134,21 @@ assert_eq!(buffer.len(), size); assert_eq!(buffer.spare(), 256 - size); ``` +## Generic Buffer +```rust +let mut allocator = RingAl::new(1024); // Create an allocator with initial size +struct MyType { + field1: usize, + field2: u128 +} +let mut buffer = allocator.generic::(16).unwrap(); +buffer.push(MyType { field1: 42, field2: 43 }); +assert_eq!(buffer.len(), 1); +let t = buffer.pop().unwrap(); +assert_eq!(t.field1, 42); +assert_eq!(t.field2, 43); +``` + ## Multi-threaded Environment ```rust let mut allocator = RingAl::new(1024); // Create an allocator with initial size diff --git a/src/buffer.rs b/src/buffer.rs index 01bff4b..3ba193e 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -6,7 +6,7 @@ use std::{ use crate::{ header::{Guard, Header}, - RingAl, + RingAl, USIZEALIGN, }; /// Extendable buffer. It dynamically grows to accomodate any extra data beyond the current @@ -73,7 +73,7 @@ impl Write for ExtBuf<'_> { return Ok(count); } // extend the buffer with the missing capacity - let Some(header) = self.ringal.alloc(count - available) else { + let Some(header) = self.ringal.alloc(count - available, USIZEALIGN) else { return Err(io::Error::new( io::ErrorKind::StorageFull, "allocator's capacity exhausted", @@ -163,12 +163,14 @@ impl FixedBufMut { impl Deref for FixedBufMut { type Target = [u8]; + #[inline(always)] fn deref(&self) -> &Self::Target { unsafe { self.inner.get_unchecked(..self.initialized) } } } impl DerefMut for FixedBufMut { + #[inline(always)] fn deref_mut(&mut self) -> &mut Self::Target { unsafe { self.inner.get_unchecked_mut(..self.initialized) } } @@ -203,7 +205,146 @@ pub struct FixedBuf { impl Deref for FixedBuf { type Target = [u8]; + #[inline(always)] fn deref(&self) -> &Self::Target { self.inner } } + +/// Mutable fixed buffer, which is generic over its elements T, with `Vec` like API +#[cfg(feature = "generic")] +pub struct GenericBufMut { + /// the lock release mechanism upon drop + pub(crate) _guard: Guard, + /// exclusively owned slice from backing store + /// properly aligned for T + pub(crate) inner: &'static mut [T], + /// current item count in buffer + pub(crate) initialized: usize, +} + +#[cfg(feature = "generic")] +mod generic { + use super::{Deref, DerefMut, GenericBufMut}; + + impl Deref for GenericBufMut { + type Target = [T]; + + #[inline(always)] + fn deref(&self) -> &Self::Target { + unsafe { self.inner.get_unchecked(..self.initialized) } + } + } + + impl DerefMut for GenericBufMut { + #[inline(always)] + fn deref_mut(&mut self) -> &mut Self::Target { + unsafe { self.inner.get_unchecked_mut(..self.initialized) } + } + } + + impl GenericBufMut { + /// Returns the total capacity of the buffer, i.e., the number of elements it can hold. + pub fn capacity(&self) -> usize { + self.inner.len() + } + + /// Pushes an element to the end of the buffer. + /// + /// If the buffer is full it returns the element as `Some(value)`. Otherwise, it returns `None` + /// after successfully adding the element to end. Operation is O(1) + pub fn push(&mut self, value: T) -> Option { + if self.inner.len() == self.initialized { + return Some(value); + } + unsafe { + let cell = self.inner.get_unchecked_mut(self.initialized) as *mut T; + std::ptr::write(cell, value); + } + self.initialized += 1; + None + } + + /// Inserts an element at the specified index in the buffer. + /// + /// If the index is greater than the number of initialized elements, the operation fails and + /// `value` is returned. Otherwise, it shifts elements to the right and inserts the new + /// element, returning `None`. This operation is O(N). + pub fn insert(&mut self, mut value: T, mut index: usize) -> Option { + if self.initialized < index { + return Some(value); + } + self.initialized += 1; + unsafe { + while index < self.initialized { + let cell = self.inner.get_unchecked_mut(index) as *mut T; + let temp = std::ptr::read(cell as *const T); + std::ptr::write(cell, value); + value = temp; + index += 1; + } + } + std::mem::forget(value); + None + } + + /// Removes and returns the last element from the buffer. + /// + /// Returns `None` if the buffer is empty. The operation is O(1) + pub fn pop(&mut self) -> Option { + (self.initialized != 0).then_some(())?; + self.initialized -= 1; + let value = unsafe { self.inner.get_unchecked_mut(self.initialized) }; + let owned = unsafe { std::ptr::read(value as *const T) }; + Some(owned) + } + + /// Removes and returns the element at the specified index from the buffer. + /// + /// Shifts all elements following the index to the left to fill the gap. + /// Returns `None` if the index is out of bounds. The operation is O(N) + pub fn remove(&mut self, mut index: usize) -> Option { + (self.initialized > index).then_some(())?; + if self.initialized - 1 == index { + return self.pop(); + } + let mut value = unsafe { self.inner.get_unchecked_mut(index) } as *mut T; + let element = unsafe { std::ptr::read(value) }; + self.initialized -= 1; + unsafe { + while index < self.initialized { + index += 1; + let next = self.inner.get_unchecked_mut(index) as *mut T; + { + let next = std::ptr::read(next); + std::ptr::write(value, next); + } + value = next; + } + } + Some(element) + } + + /// Tries to remove and return the element at the specified index, replacing + /// it with the last element. The operation is O(1) + pub fn swap_remove(&mut self, index: usize) -> Option { + (index < self.initialized).then_some(())?; + if self.initialized - 1 == index { + return self.pop(); + } + + // Swap the element at the given index with the last element + self.initialized -= 1; + let element = unsafe { + let last = self.inner.get_unchecked_mut(self.initialized) as *mut T; + let value = self.inner.get_unchecked_mut(index) as *mut T; + let element = std::ptr::read(value); + let temp = std::ptr::read(last); + std::ptr::write(value, temp); + element + }; + + Some(element) + } + } +} diff --git a/src/header.rs b/src/header.rs index df235f1..361be4f 100644 --- a/src/header.rs +++ b/src/header.rs @@ -35,7 +35,7 @@ impl Header { #[inline(always)] pub(crate) fn available(&self) -> bool { let atomic = self.0 as *const AtomicUsize; - (unsafe { &*atomic }).load(Ordering::Relaxed) & 1 == 0 + (unsafe { &*atomic }).load(Ordering::Acquire) & 1 == 0 } /// distance (in `size_of()`) to given guard @@ -84,6 +84,6 @@ impl Drop for Guard { // in the execution timeline. Importantly, the allocator is restricted from write access // the region until it is released. let atomic = self.0 as *mut usize as *const AtomicUsize; - unsafe { &*atomic }.fetch_and(usize::MAX << 1, Ordering::Release); + unsafe { &*atomic }.fetch_and(usize::MAX << 1, Ordering::Relaxed); } } diff --git a/src/lib.rs b/src/lib.rs index 5cec9f0..bc74178 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,22 @@ //! tx.send(buffer); //! handle.join().unwrap(); //! ``` +//! ### Generic buffers +//! ```rust +//! # use ringal::{ RingAl, GenericBufMut }; +//! # let mut allocator = RingAl::new(1024); // Create an allocator with initial size +//! +//! struct MyType { +//! field1: usize, +//! field2: u128 +//! } +//! let mut buffer = allocator.generic::(16).unwrap(); +//! buffer.push(MyType { field1: 42, field2: 43 }); +//! assert_eq!(buffer.len(), 1); +//! let t = buffer.pop().unwrap(); +//! assert_eq!(t.field1, 42); +//! assert_eq!(t.field2, 43); +//! ``` //! //! ### Thread Local Storage //! ```rust @@ -107,6 +123,7 @@ //! println!("bytes written: {}", fixed.len()); //! ``` //! +//! //! ## Safety Considerations //! //! While `unsafe` code is used for performance reasons, the public API is designed to be safe. @@ -115,7 +132,7 @@ use std::alloc::{alloc, Layout}; -pub use buffer::{ExtBuf, FixedBuf, FixedBufMut}; +pub use buffer::{ExtBuf, FixedBuf, FixedBufMut, GenericBufMut}; use header::Header; /// Ring Allocator, see crate level documentation on features and usage @@ -129,6 +146,7 @@ const USIZELEN: usize = size_of::(); /// capacity to multiple of 8 machine words (converted to byte count) const MINALLOC: usize = 8; const DEFAULT_ALIGNMENT: usize = 64; +const USIZEALIGN: usize = align_of::(); impl RingAl { /// Initialize ring allocator with given capacity and default alignment. Allocated capacity @@ -169,7 +187,7 @@ impl RingAl { /// `None`, if backing store doesn't have enough capacity available. #[inline(always)] pub fn fixed(&mut self, size: usize) -> Option { - let header = self.alloc(size)?; + let header = self.alloc(size, USIZEALIGN)?; header.set(); let ptr = header.buffer(); let capacity = header.capacity(); @@ -189,7 +207,7 @@ impl RingAl { /// allocator (used to dynamically grow the buffer), which effectively /// prevents any further allocations while this buffer is around pub fn extendable(&mut self, size: usize) -> Option> { - let header = self.alloc(size)?; + let header = self.alloc(size, USIZEALIGN)?; header.set(); Some(ExtBuf { header, @@ -199,18 +217,36 @@ impl RingAl { }) } - /// Try to lock at least `size` bytes from backing store - fn alloc(&mut self, mut size: usize) -> Option
{ + /// Allocate a fixed buffer, that can accomodate `count` elements of type T + pub fn generic(&mut self, count: usize) -> Option> { + let tsize = size_of::(); + let size = count * tsize; + let header = self.alloc(size, align_of::())?; + header.set(); + let buffer = header.buffer(); + let offset = buffer.align_offset(align_of::()); + let inner = unsafe { buffer.add(offset) } as *mut T; + let inner = unsafe { std::slice::from_raw_parts_mut(inner, count) }; + Some(GenericBufMut { + _guard: header.into(), + inner, + initialized: 0, + }) + } + + /// Try to lock at least `size` bytes from backing store, aligning them to requested address + fn alloc(&mut self, mut size: usize, align: usize) -> Option
{ // we need to convert the size from count of bytes to count of usizes size = size / USIZELEN + 1; // begin accumulating capacity let mut start = Header(self.head); + let mut offset = start.buffer().align_offset(align) / USIZELEN; let mut accumulated = 0; let mut next = start; // when the current guard sequence references a memory region smaller than the requested size, // an attempt is made to merge subsequent regions. This process continues until the required // capacity is satisfied or all available capacity is exhausted. - while accumulated < size { + while accumulated < size + offset { next.available().then_some(())?; next = next.next(); accumulated = start.distance(&next); @@ -225,6 +261,7 @@ impl RingAl { if next < start { accumulated = 0; start = next; + offset = start.buffer().align_offset(align) / USIZELEN; } } // If the difference between the accumulated capacity and the requested size is less than @@ -310,10 +347,10 @@ impl Drop for RingAl { let nnext = next.next(); // Check for possible wrap-around if nnext > next { - capacity += next.capacity() / USIZELEN; + capacity += next.capacity(); } // Increment capacity to account for guard size - capacity += 1; + capacity += USIZELEN; next = nnext; // If we have looped back to the starting point, all allocations // are released, and the full capacity is recalculated @@ -323,13 +360,13 @@ impl Drop for RingAl { let inner = next.0; head = head.min(inner) } - let slice = std::ptr::slice_from_raw_parts_mut(head, capacity); + let layout = unsafe { Layout::from_size_align_unchecked(capacity, 64) }; // SAFETY: // 1. All pointers are guaranteed to lie within the original backing store. // 2. The initial slice length has been accurately recalculated. // 3. The starting memory address is determined through wrap-around detection. // 4. This is a controlled reclamation of a previously leaked boxed slice. - let _ = unsafe { Box::from_raw(slice) }; + unsafe { std::alloc::dealloc(head as *mut u8, layout) }; } } diff --git a/src/tests.rs b/src/tests.rs index 2636168..02bd134 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,11 +1,17 @@ +#![allow(unused)] + use std::{ - collections::VecDeque, io::Write, mem::transmute, sync::mpsc::sync_channel, time::Instant, + collections::{HashSet, VecDeque}, + io::Write, + mem::transmute, + sync::mpsc::sync_channel, + time::Instant, }; use crate::{ buffer::FixedBuf, header::{Guard, Header}, - RingAl, MINALLOC, USIZELEN, + RingAl, MINALLOC, USIZEALIGN, USIZELEN, }; const SIZE: usize = 4096; @@ -58,7 +64,7 @@ fn test_init() { fn test_alloc() { let (mut ringal, _g) = setup!(); let start = ringal.head; - let header = ringal.alloc(1024); + let header = ringal.alloc(1024, USIZEALIGN); assert!( header.is_some(), "should be able to allocate with new allocator" @@ -75,7 +81,7 @@ fn test_multi_alloc() { let mut allocations = Vec::::with_capacity(COUNT); for i in 0..COUNT { let size = SIZE / COUNT - USIZELEN * 2 - (i == COUNT - 1) as usize * USIZELEN; - let header = ringal.alloc(size); + let header = ringal.alloc(size, USIZEALIGN); assert!( header.is_some(), "should have enough capacity for all allocations" @@ -84,10 +90,10 @@ fn test_multi_alloc() { header.set(); allocations.push(header.into()); } - let header = ringal.alloc(SIZE / COUNT - USIZELEN); + let header = ringal.alloc(SIZE / COUNT - USIZELEN, USIZEALIGN); assert!(header.is_none(), "should have run out of capacity"); allocations.clear(); - let header = ringal.alloc(SIZE / COUNT - USIZELEN); + let header = ringal.alloc(SIZE / COUNT - USIZELEN, USIZEALIGN); assert!( header.is_some(), "should have all capacity after dropping allocations" @@ -95,13 +101,13 @@ fn test_multi_alloc() { } #[test] -fn test_continious_alloc() { +fn test_continuous_alloc() { let (mut ringal, _g) = setup!(); const ITERS: u64 = 8192; let mut allocations = VecDeque::::with_capacity(4); for i in 0..ITERS { - let size = (unsafe { transmute::(Instant::now()) }.0 * i) % 256; - let header = ringal.alloc(size as usize); + let size = (unsafe { transmute::(Instant::now()) }.0 * i) % 256; + let header = ringal.alloc(size as usize, USIZEALIGN); assert!(header.is_some(), "should always have capacity"); let header = header.unwrap(); header.set(); @@ -172,7 +178,7 @@ fn test_ext_buf_continuous_alloc() { let mut buffers = VecDeque::with_capacity(4); const ITERS: usize = 8192; for i in 0..ITERS { - let random = unsafe { transmute::(Instant::now()).0 }; + let random = unsafe { transmute::(Instant::now()).0 }; let mut size = (random * i) % 256; if size < MINALLOC * USIZELEN { size = 256 - size @@ -262,7 +268,7 @@ fn test_fixed_buf_continuous_alloc() { let mut buffers = VecDeque::with_capacity(4); const ITERS: usize = 8192; for i in 0..ITERS { - let random = unsafe { transmute::(Instant::now()).0 }; + let random = unsafe { transmute::(Instant::now()).0 }; let mut size = (random * i) % 256; if size < MINALLOC * USIZELEN { size = 256 - size @@ -306,7 +312,7 @@ fn test_fixed_buf_continuous_alloc_multi_thread() { threads.push(tx); } for i in 0..ITERS { - let random = unsafe { transmute::(Instant::now()).0 }; + let random = unsafe { transmute::(Instant::now()).0 }; let mut size = (random * i) % 256; if size < MINALLOC * USIZELEN { size = 256 - size @@ -363,3 +369,89 @@ fn test_thread_local_allocator() { } some_fn(); } + +macro_rules! test_generic_buf { + ($int: ty) => { + let (mut ringal, _g) = setup!(); + const ITERS: $int = 3; + struct SomeType { + i: $int, + string: FixedBuf, + } + let mut string = ringal.fixed(64).unwrap(); + let _ = string.write(b"hello world").unwrap(); + let string = string.freeze(); + let buffer = ringal.generic::(ITERS as usize); + assert!( + buffer.is_some(), + "should be able to allocate generic buf with new allocator" + ); + let mut buffer = buffer.unwrap(); + for i in 0..ITERS { + let instance = SomeType { + i, + string: string.clone(), + }; + assert!(buffer.push(instance).is_none()); + } + assert_eq!(buffer.len(), ITERS as usize); + for i in (0..ITERS).rev() { + let elem = buffer.pop(); + assert!(elem.is_some(), "buffer should pop all pushed elements"); + let elem = elem.unwrap(); + assert_eq!(elem.i, i); + assert_eq!(elem.string.as_ref(), string.as_ref()); + assert!(buffer.insert(elem, 0).is_none()); + } + assert_eq!(buffer.len(), ITERS as usize); + for _ in 0..ITERS { + let elem = buffer.swap_remove(0); + assert!( + elem.is_some(), + "buffer should swap remove all pushed elements" + ); + let elem = elem.unwrap(); + buffer.push(elem); + } + assert_eq!(buffer.len(), ITERS as usize); + let mut indices: HashSet<$int> = (0..ITERS).into_iter().collect(); + for _ in 0..ITERS { + let elem = buffer.remove(0); + assert!( + elem.is_some(), + "buffer should swap remove all pushed elements" + ); + let removed = &elem.unwrap().i; + println!("{} removing {removed}", stringify!($int)); + assert!(indices.remove(&removed)); + } + assert_eq!(buffer.len(), 0); + drop(string); + drop(buffer); + }; +} + +//#[cfg(feature = "generic")] +#[test] +fn test_generic_buf_word() { + test_generic_buf!(usize); +} + +#[test] +fn test_generic_buf_double_word_align() { + test_generic_buf!(u128); +} +#[test] +fn test_generic_buf_half_word_align() { + test_generic_buf!(u32); +} + +#[test] +fn test_generic_buf_quarter_word_align() { + test_generic_buf!(u16); +} + +#[test] +fn test_generic_buf_eighth_word_align() { + test_generic_buf!(u8); +}