Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runtime/fuzz/fuzz_targets/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ fn fuzz(input: FuzzInput) {

FuzzOperation::AppendCloneBlob => {
if let Some(ref append) = append_buffer {
let _ = append.clone_blob();
let _ = append.clone_blob().await;
}
}

Expand Down
159 changes: 126 additions & 33 deletions runtime/src/utils/buffer/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@ use std::{num::NonZeroUsize, sync::Arc};

/// A [Blob] wrapper that supports appending new data that is both read and write cached, and
/// provides buffer-pool managed read caching of older data.
///
/// # Concurrent Access
///
/// This implementation allows readers to proceed while flush I/O is in progress, as long as they
/// are reading from the write buffer or the pool cache. Readers that need to access data from the
/// underlying blob (cache miss) will wait for any in-progress write to complete.
///
/// The implementation involves two locks: one for the write buffer (and blob size metadata), and
/// one for the underlying blob itself. To avoid deadlocks, the buffer lock is always acquired
/// before the blob lock.
#[derive(Clone)]
pub struct Append<B: Blob> {
/// The underlying blob being wrapped.
blob: B,
/// The underlying blob being wrapped, protected by a lock for I/O coordination.
blob: Arc<RwLock<B>>,

/// Unique id assigned by the buffer pool.
id: u64,
Expand Down Expand Up @@ -58,7 +68,7 @@ impl<B: Blob> Append<B> {
}

Ok(Self {
blob,
blob: Arc::new(RwLock::new(blob)),
id: pool_ref.next_id().await,
pool_ref,
buffer: Arc::new(RwLock::new((buffer, size))),
Expand All @@ -71,7 +81,8 @@ impl<B: Blob> Append<B> {
let buf = buf.into();

// Acquire a write lock on the buffer and blob_size.
let (buffer, blob_size) = &mut *self.buffer.write().await;
let mut guard = self.buffer.write().await;
let (buffer, _) = &mut *guard;

// Ensure the write doesn't overflow.
buffer
Expand All @@ -81,7 +92,7 @@ impl<B: Blob> Append<B> {

if buffer.append(buf.as_ref()) {
// Buffer is over capacity, flush it to the underlying blob.
return self.flush(buffer, blob_size).await;
return self.flush_internal(guard).await;
}

Ok(())
Expand All @@ -97,16 +108,49 @@ impl<B: Blob> Append<B> {

/// Flush the append buffer to the underlying blob, caching each page worth of written data in
/// the buffer pool.
async fn flush(&self, buffer: &mut Buffer, blob_size: &mut u64) -> Result<(), Error> {
// Take the buffered data, if any.
let Some((mut buf, offset)) = buffer.take() else {
///
/// This method acquires the blob write lock before releasing the buffer lock, ensuring readers
/// that need blob access will wait for the write to complete.
async fn flush_internal(
&self,
mut buf_guard: crate::RwLockWriteGuard<'_, (Buffer, u64)>,
) -> Result<(), Error> {
let (buffer, blob_size) = &mut *buf_guard;

// Prepare the data to be written.
let Some(buf) = self.prepare_flush_data(buffer, *blob_size).await else {
return Ok(());
};

// Insert the flushed data into the buffer pool. This step isn't just to ensure recently
// written data remains cached for future reads, but is in fact required to purge
// potentially stale cache data which might result from the edge the case of rewinding a
// blob across a page boundary.
// Update blob_size *before* releasing the lock. We do this optimistically; if the write
// fails below, the program will return an error and likely abort/panic, so maintaining
// exact consistency on error isn't strictly required.
let write_offset = *blob_size;
*blob_size += buf.len() as u64;

// Acquire blob write lock BEFORE releasing buffer lock. This ensures no reader can access
// the blob until the write completes.
let blob_guard = self.blob.write().await;

// Release buffer lock, allowing concurrent buffered reads while the write is in progress.
// Any attempts to read from the blob will block until the write completes.
drop(buf_guard);

// Perform the write while holding only blob lock.
blob_guard.write_at(buf, write_offset).await
}

/// Prepares data from the buffer to be flushed to the blob.
///
/// This method:
/// 1. Takes the data from the write buffer.
/// 2. Caches it in the buffer pool.
/// 3. Returns the data to be written and the offset to write it at (if any).
async fn prepare_flush_data(&self, buffer: &mut Buffer, blob_size: u64) -> Option<Vec<u8>> {
// Take the buffered data, if any.
let (mut buf, offset) = buffer.take()?;

// Insert the flushed data into the buffer pool.
let remaining = self.pool_ref.cache(self.id, &buf, offset).await;

// If there's any data left over that doesn't constitute an entire page, re-buffer it into
Expand All @@ -121,22 +165,20 @@ impl<B: Blob> Append<B> {

// Early exit if there's no new data to write.
if new_data_start >= buf.len() {
return Ok(());
return None;
}

if new_data_start > 0 {
buf.drain(0..new_data_start);
}
let new_data_len = buf.len() as u64;
self.blob.write_at(buf, *blob_size).await?;
*blob_size += new_data_len;

Ok(())
// Return the data to write, and the offset where to write it within the blob.
Some(buf)
}

/// Clones and returns the underlying blob.
pub fn clone_blob(&self) -> B {
self.blob.clone()
pub async fn clone_blob(&self) -> B {
self.blob.read().await.clone()
}
}

Expand All @@ -155,7 +197,8 @@ impl<B: Blob> Blob for Append<B> {
.ok_or(Error::OffsetOverflow)?;

// Acquire a read lock on the buffer.
let (buffer, _) = &*self.buffer.read().await;
let guard = self.buffer.read().await;
let (buffer, _) = &*guard;

// If the data required is beyond the size of the blob, return an error.
if end_offset > buffer.size() {
Expand All @@ -164,13 +207,40 @@ impl<B: Blob> Blob for Append<B> {

// Extract any bytes from the buffer that overlap with the requested range.
let remaining = buffer.extract(buf.as_mut(), offset);

// Release buffer lock before potential I/O.
drop(guard);

if remaining == 0 {
return Ok(buf);
}

// If there are bytes remaining to be read, use the buffer pool to get them.
// Fast path: try to read *only* from pool cache without acquiring blob lock. This allows
// concurrent reads even while a flush is in progress.
let cached = self
.pool_ref
.read_cached(self.id, &mut buf.as_mut()[..remaining], offset)
.await;

if cached == remaining {
// All bytes found in cache.
return Ok(buf);
}

// Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight
// write completes before we read from the blob.
let blob_guard = self.blob.read().await;

// Read remaining bytes that were not already obtained from the earlier cache read.
let uncached_offset = offset + cached as u64;
let uncached_len = remaining - cached;
self.pool_ref
.read(&self.blob, self.id, &mut buf.as_mut()[..remaining], offset)
.read(
&*blob_guard,
self.id,
&mut buf.as_mut()[cached..cached + uncached_len],
uncached_offset,
)
.await?;

Ok(buf)
Expand All @@ -184,32 +254,55 @@ impl<B: Blob> Blob for Append<B> {
}

async fn sync(&self) -> Result<(), Error> {
// Flush any buffered data. When flush_internal returns, the write_at has completed
// and data is in the OS buffer.
{
let (buffer, blob_size) = &mut *self.buffer.write().await;
self.flush(buffer, blob_size).await?;
let guard = self.buffer.write().await;
self.flush_internal(guard).await?;
}
self.blob.sync().await
// Sync the OS buffer to disk. We need the blob read lock here since sync() requires
// access to the blob, but only a read lock since we're not modifying blob state.
self.blob.read().await.sync().await
}

/// Resize the blob to the provided `size`.
///
/// # Warning
///
/// Concurrent readers which try to read past the new size during the resize may error.
async fn resize(&self, size: u64) -> Result<(), Error> {
// Implementation note: rewinding the blob across a page boundary potentially results in
// stale data remaining in the buffer pool's cache. We don't proactively purge the data
// within this function since it would be inaccessible anyway. Instead we ensure it is
// always updated should the blob grow back to the point where we have new data for the same
// page, if any old data hasn't expired naturally by then.

// Acquire a write lock on the buffer.
let (buffer, blob_size) = &mut *self.buffer.write().await;
// Acquire buffer lock first.
// NOTE: We MUST acquire the buffer lock before the blob lock to avoid deadlocks with
// `append`, which acquires buffer then blob (via `flush_internal`).
let mut buf_guard = self.buffer.write().await;
let (buffer, blob_size) = &mut *buf_guard;

let flush_data = self.prepare_flush_data(buffer, *blob_size).await;

// Flush any buffered bytes to the underlying blob. (Note that a fancier implementation
// might avoid flushing those bytes that are backed up over by the next step, if any.)
self.flush(buffer, blob_size).await?;
// Acquire blob write lock to prevent concurrent reads throughout the resize.
let blob_guard = self.blob.write().await;

// Flush any buffered bytes first, using the helper.
// We hold both locks here, so no concurrent operations can happen.
if let Some(buf) = flush_data {
// Write the data to the blob.
let len = buf.len() as u64;
blob_guard.write_at(buf, *blob_size).await?;

// Update blob_size to reflect the flush.
*blob_size += len;
}

// Resize the underlying blob.
self.blob.resize(size).await?;
blob_guard.resize(size).await?;

// Update the physical blob size.
// Update the blob size.
*blob_size = size;

// Reset the append buffer to the new size, ensuring its page alignment.
Expand All @@ -218,7 +311,7 @@ impl<B: Blob> Blob for Append<B> {
buffer.data.clear();
if leftover_size != 0 {
let page_buf = vec![0; leftover_size as usize];
let buf = self.blob.read_at(page_buf, buffer.offset).await?;
let buf = blob_guard.read_at(page_buf, buffer.offset).await?;
assert!(!buffer.append(buf.as_ref()));
}

Expand Down
29 changes: 29 additions & 0 deletions runtime/src/utils/buffer/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,35 @@ impl PoolRef {
Pool::offset_to_page(self.page_size, offset)
}

/// Try to read the specified bytes from the buffer pool cache only. Returns the number of
/// bytes successfully read from cache and copied to `buf` before a page fault, if any.
///
/// This method never reads from the underlying blob - it only checks the cache.
///
/// # Warning
///
/// Attempts to read any of the last (blob_size % page_size) "trailing bytes" of the blob will
/// always return 0 since the buffer pool only deals with page sized chunks.
pub(super) async fn read_cached(
&self,
blob_id: u64,
mut buf: &mut [u8],
mut offset: u64,
) -> usize {
let original_len = buf.len();
let buffer_pool = self.pool.read().await;
while !buf.is_empty() {
let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset);
if count == 0 {
// Cache miss - return how many bytes we successfully read
break;
}
offset += count as u64;
buf = &mut buf[count..];
}
original_len - buf.len()
}

/// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the
/// buffer pool will be read from the provided `blob` and cached for future reads.
///
Expand Down
10 changes: 5 additions & 5 deletions storage/src/journal/contiguous/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,15 @@ impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
assert!(start_blob <= self.tail_index);
let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
let mut blob_plus = blobs
.into_iter()
.map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
.collect::<Vec<_>>();
let mut blob_plus = Vec::with_capacity(blobs.len() + 1);
for (blob_index, blob) in blobs {
blob_plus.push((*blob_index, blob.clone_blob().await, full_size));
}

// Include the tail blob.
self.tail.sync().await?; // make sure no data is buffered
let tail_size = self.tail.size().await;
blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
blob_plus.push((self.tail_index, self.tail.clone_blob().await, tail_size));
let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;

// Replay all blobs in order and stream items as they are read (to avoid occupying too much
Expand Down
Loading