Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runtime/fuzz/fuzz_targets/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ fn fuzz(input: FuzzInput) {

FuzzOperation::AppendCloneBlob => {
if let Some(ref append) = append_buffer {
let _ = append.clone_blob();
let _ = append.clone_blob().await;
}
}

Expand Down
158 changes: 125 additions & 33 deletions runtime/src/utils/buffer/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,20 @@ use std::{num::NonZeroUsize, sync::Arc};

/// A [Blob] wrapper that supports appending new data that is both read and write cached, and
/// provides buffer-pool managed read caching of older data.
///
/// # Concurrent Access
///
/// This implementation allows readers to proceed while flush I/O is in progress, as long as they
/// are reading from the write buffer or the pool cache. Readers that need to access data from the
/// underlying blob (cache miss) will wait for any in-progress write to complete.
///
/// The implementation involves two locks: one for the write buffer (and blob size metadata), and
/// one for the underlying blob itself. To avoid deadlocks, the buffer lock is always acquired
/// before the blob lock.
#[derive(Clone)]
pub struct Append<B: Blob> {
/// The underlying blob being wrapped.
blob: B,
/// The underlying blob being wrapped, protected by a lock for I/O coordination.
blob: Arc<RwLock<B>>,

/// Unique id assigned by the buffer pool.
id: u64,
Expand Down Expand Up @@ -58,7 +68,7 @@ impl<B: Blob> Append<B> {
}

Ok(Self {
blob,
blob: Arc::new(RwLock::new(blob)),
id: pool_ref.next_id().await,
pool_ref,
buffer: Arc::new(RwLock::new((buffer, size))),
Expand All @@ -71,7 +81,8 @@ impl<B: Blob> Append<B> {
let buf = buf.into();

// Acquire a write lock on the buffer and blob_size.
let (buffer, blob_size) = &mut *self.buffer.write().await;
let mut guard = self.buffer.write().await;
let (buffer, _) = &mut *guard;

// Ensure the write doesn't overflow.
buffer
Expand All @@ -81,7 +92,7 @@ impl<B: Blob> Append<B> {

if buffer.append(buf.as_ref()) {
// Buffer is over capacity, flush it to the underlying blob.
return self.flush(buffer, blob_size).await;
return self.flush_internal(guard).await;
}

Ok(())
Expand All @@ -97,16 +108,54 @@ impl<B: Blob> Append<B> {

/// Flush the append buffer to the underlying blob, caching each page worth of written data in
/// the buffer pool.
async fn flush(&self, buffer: &mut Buffer, blob_size: &mut u64) -> Result<(), Error> {
// Take the buffered data, if any.
let Some((mut buf, offset)) = buffer.take() else {
///
/// This method acquires the blob write lock before releasing the buffer lock, ensuring readers
/// that need blob access will wait for the write to complete.
async fn flush_internal(
&self,
mut guard: crate::RwLockWriteGuard<'_, (Buffer, u64)>,
) -> Result<(), Error> {
let (buffer, blob_size) = &mut *guard;

// Prepare the data to be written.
let Some((buf, write_offset)) = self.prepare_flush_data(buffer, *blob_size).await else {
return Ok(());
};

// Insert the flushed data into the buffer pool. This step isn't just to ensure recently
// written data remains cached for future reads, but is in fact required to purge
// potentially stale cache data which might result from the edge the case of rewinding a
// blob across a page boundary.
// Update blob_size *before* releasing the lock. We do this optimistically; if the write
// fails below, the program will return an error and likely abort/panic, so maintaining
// exact consistency on error isn't strictly required.
*blob_size = write_offset + buf.len() as u64;

// Acquire blob write lock BEFORE releasing buffer lock. This ensures no reader can access
// the blob until the write completes.
let blob_guard = self.blob.write().await;

// Release buffer lock, allowing concurrent buffered reads while the write is in progress.
// Any attempts to read from the blob will block until the write completes.
drop(guard);

// Perform the write while holding only blob lock.
blob_guard.write_at(buf, write_offset).await?;

Ok(())
}

/// Prepares data from the buffer to be flushed to the blob.
///
/// This method:
/// 1. Takes the data from the write buffer.
/// 2. Caches it in the buffer pool.
/// 3. Returns the data to be written and the offset to write it at (if any).
async fn prepare_flush_data(
&self,
buffer: &mut Buffer,
blob_size: u64,
) -> Option<(Vec<u8>, u64)> {
// Take the buffered data, if any.
let (mut buf, offset) = buffer.take()?;

// Insert the flushed data into the buffer pool.
let remaining = self.pool_ref.cache(self.id, &buf, offset).await;

// If there's any data left over that doesn't constitute an entire page, re-buffer it into
Expand All @@ -121,22 +170,20 @@ impl<B: Blob> Append<B> {

// Early exit if there's no new data to write.
if new_data_start >= buf.len() {
return Ok(());
return None;
}

if new_data_start > 0 {
buf.drain(0..new_data_start);
}
let new_data_len = buf.len() as u64;
self.blob.write_at(buf, *blob_size).await?;
*blob_size += new_data_len;

Ok(())
// Return the data to write, and the offset where to write it within the blob.
Some((buf, blob_size))
}

/// Clones and returns the underlying blob.
pub fn clone_blob(&self) -> B {
self.blob.clone()
pub async fn clone_blob(&self) -> B {
self.blob.read().await.clone()
}
}

Expand All @@ -155,7 +202,8 @@ impl<B: Blob> Blob for Append<B> {
.ok_or(Error::OffsetOverflow)?;

// Acquire a read lock on the buffer.
let (buffer, _) = &*self.buffer.read().await;
let guard = self.buffer.read().await;
let (buffer, _) = &*guard;

// If the data required is beyond the size of the blob, return an error.
if end_offset > buffer.size() {
Expand All @@ -164,13 +212,40 @@ impl<B: Blob> Blob for Append<B> {

// Extract any bytes from the buffer that overlap with the requested range.
let remaining = buffer.extract(buf.as_mut(), offset);

// Release buffer lock before potential I/O.
drop(guard);

if remaining == 0 {
return Ok(buf);
}

// If there are bytes remaining to be read, use the buffer pool to get them.
// Fast path: try to read *only from pool cache without acquiring blob lock. This allows
// concurrent reads even while a flush is in progress.
let cached = self
.pool_ref
.read_cached(self.id, &mut buf.as_mut()[..remaining], offset)
.await;

if cached == remaining {
// All bytes found in cache.
return Ok(buf);
}

// Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight
// write completes before we read from the blob.
let blob_guard = self.blob.read().await;

// Read remaining bytes that were not already obtained from the earlier cache read.
let uncached_offset = offset + cached as u64;
let uncached_len = remaining - cached;
self.pool_ref
.read(&self.blob, self.id, &mut buf.as_mut()[..remaining], offset)
.read(
&*blob_guard,
self.id,
&mut buf.as_mut()[cached..cached + uncached_len],
uncached_offset,
)
.await?;

Ok(buf)
Expand All @@ -184,11 +259,15 @@ impl<B: Blob> Blob for Append<B> {
}

async fn sync(&self) -> Result<(), Error> {
// Flush any buffered data. When flush_internal returns, the write_at has completed
// and data is in the OS buffer.
{
let (buffer, blob_size) = &mut *self.buffer.write().await;
self.flush(buffer, blob_size).await?;
let guard = self.buffer.write().await;
self.flush_internal(guard).await?;
}
self.blob.sync().await
// Sync the OS buffer to disk. We need the blob read lock here since sync() requires
// access to the blob, but only a read lock since we're not modifying blob state.
self.blob.read().await.sync().await
}

/// Resize the blob to the provided `size`.
Expand All @@ -199,17 +278,30 @@ impl<B: Blob> Blob for Append<B> {
// always updated should the blob grow back to the point where we have new data for the same
// page, if any old data hasn't expired naturally by then.

// Acquire a write lock on the buffer.
let (buffer, blob_size) = &mut *self.buffer.write().await;
// Acquire buffer lock first.
// NOTE: We MUST acquire the buffer lock before the blob lock to avoid deadlocks with
// `append`, which acquires buffer then blob (via `flush_internal`).
let mut guard = self.buffer.write().await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut guard = self.buffer.write().await;
let mut buf_guard = self.buffer.write().await;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

let (buffer, blob_size) = &mut *guard;

// Flush any buffered bytes to the underlying blob. (Note that a fancier implementation
// might avoid flushing those bytes that are backed up over by the next step, if any.)
self.flush(buffer, blob_size).await?;
// Acquire blob write lock to prevent concurrent reads throughout the resize.
let blob_guard = self.blob.write().await;

// Flush any buffered bytes first, using the helper.
// We hold both locks here, so no concurrent operations can happen.
if let Some((buf, write_offset)) = self.prepare_flush_data(buffer, *blob_size).await {
// Write the data to the blob.
let len = buf.len() as u64;
blob_guard.write_at(buf, write_offset).await?;

// Update blob_size to reflect the flush.
*blob_size = write_offset + len;
}

// Resize the underlying blob.
self.blob.resize(size).await?;
blob_guard.resize(size).await?;

// Update the physical blob size.
// Update the blob size.
*blob_size = size;

// Reset the append buffer to the new size, ensuring its page alignment.
Expand All @@ -218,7 +310,7 @@ impl<B: Blob> Blob for Append<B> {
buffer.data.clear();
if leftover_size != 0 {
let page_buf = vec![0; leftover_size as usize];
let buf = self.blob.read_at(page_buf, buffer.offset).await?;
let buf = blob_guard.read_at(page_buf, buffer.offset).await?;
assert!(!buffer.append(buf.as_ref()));
}

Expand Down
29 changes: 29 additions & 0 deletions runtime/src/utils/buffer/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,35 @@ impl PoolRef {
Pool::offset_to_page(self.page_size, offset)
}

/// Try to read the specified bytes from the buffer pool cache only. Returns the number of
/// bytes successfully read from cache and copied to `buf` before a page fault, if any.
///
/// This method never reads from the underlying blob - it only checks the cache.
///
/// # Warning
///
/// Attempts to read any of the last (blob_size % page_size) "trailing bytes" of the blob will
/// always return 0 since the buffer pool only deals with page sized chunks.
pub(super) async fn read_cached(
&self,
blob_id: u64,
mut buf: &mut [u8],
mut offset: u64,
) -> usize {
let original_len = buf.len();
let buffer_pool = self.pool.read().await;
while !buf.is_empty() {
let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset);
if count == 0 {
// Cache miss - return how many bytes we successfully read
break;
}
offset += count as u64;
buf = &mut buf[count..];
}
original_len - buf.len()
}

/// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the
/// buffer pool will be read from the provided `blob` and cached for future reads.
///
Expand Down
10 changes: 5 additions & 5 deletions storage/src/journal/contiguous/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,15 +498,15 @@ impl<E: Storage + Metrics, A: CodecFixed<Cfg = ()>> Journal<E, A> {
assert!(start_blob <= self.tail_index);
let blobs = self.blobs.range(start_blob..).collect::<Vec<_>>();
let full_size = items_per_blob * Self::CHUNK_SIZE_U64;
let mut blob_plus = blobs
.into_iter()
.map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size))
.collect::<Vec<_>>();
let mut blob_plus = Vec::with_capacity(blobs.len() + 1);
for (blob_index, blob) in blobs {
blob_plus.push((*blob_index, blob.clone_blob().await, full_size));
}

// Include the tail blob.
self.tail.sync().await?; // make sure no data is buffered
let tail_size = self.tail.size().await;
blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size));
blob_plus.push((self.tail_index, self.tail.clone_blob().await, tail_size));
let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64;

// Replay all blobs in order and stream items as they are read (to avoid occupying too much
Expand Down
Loading