diff --git a/runtime/fuzz/fuzz_targets/buffer.rs b/runtime/fuzz/fuzz_targets/buffer.rs index d4a6198000..de596ae15b 100644 --- a/runtime/fuzz/fuzz_targets/buffer.rs +++ b/runtime/fuzz/fuzz_targets/buffer.rs @@ -322,7 +322,7 @@ fn fuzz(input: FuzzInput) { FuzzOperation::AppendCloneBlob => { if let Some(ref append) = append_buffer { - let _ = append.clone_blob(); + let _ = append.clone_blob().await; } } diff --git a/runtime/src/utils/buffer/append.rs b/runtime/src/utils/buffer/append.rs index 3b555eee16..27b35d7e17 100644 --- a/runtime/src/utils/buffer/append.rs +++ b/runtime/src/utils/buffer/append.rs @@ -7,10 +7,20 @@ use std::{num::NonZeroUsize, sync::Arc}; /// A [Blob] wrapper that supports appending new data that is both read and write cached, and /// provides buffer-pool managed read caching of older data. +/// +/// # Concurrent Access +/// +/// This implementation allows readers to proceed while flush I/O is in progress, as long as they +/// are reading from the write buffer or the pool cache. Readers that need to access data from the +/// underlying blob (cache miss) will wait for any in-progress write to complete. +/// +/// The implementation involves two locks: one for the write buffer (and blob size metadata), and +/// one for the underlying blob itself. To avoid deadlocks, the buffer lock is always acquired +/// before the blob lock. #[derive(Clone)] pub struct Append { - /// The underlying blob being wrapped. - blob: B, + /// The underlying blob being wrapped, protected by a lock for I/O coordination. + blob: Arc>, /// Unique id assigned by the buffer pool. id: u64, @@ -58,7 +68,7 @@ impl Append { } Ok(Self { - blob, + blob: Arc::new(RwLock::new(blob)), id: pool_ref.next_id().await, pool_ref, buffer: Arc::new(RwLock::new((buffer, size))), @@ -71,7 +81,8 @@ impl Append { let buf = buf.into(); // Acquire a write lock on the buffer and blob_size. - let (buffer, blob_size) = &mut *self.buffer.write().await; + let mut guard = self.buffer.write().await; + let (buffer, _) = &mut *guard; // Ensure the write doesn't overflow. buffer @@ -81,7 +92,7 @@ impl Append { if buffer.append(buf.as_ref()) { // Buffer is over capacity, flush it to the underlying blob. - return self.flush(buffer, blob_size).await; + return self.flush_internal(guard).await; } Ok(()) @@ -97,16 +108,49 @@ impl Append { /// Flush the append buffer to the underlying blob, caching each page worth of written data in /// the buffer pool. - async fn flush(&self, buffer: &mut Buffer, blob_size: &mut u64) -> Result<(), Error> { - // Take the buffered data, if any. - let Some((mut buf, offset)) = buffer.take() else { + /// + /// This method acquires the blob write lock before releasing the buffer lock, ensuring readers + /// that need blob access will wait for the write to complete. + async fn flush_internal( + &self, + mut buf_guard: crate::RwLockWriteGuard<'_, (Buffer, u64)>, + ) -> Result<(), Error> { + let (buffer, blob_size) = &mut *buf_guard; + + // Prepare the data to be written. + let Some(buf) = self.prepare_flush_data(buffer, *blob_size).await else { return Ok(()); }; - // Insert the flushed data into the buffer pool. This step isn't just to ensure recently - // written data remains cached for future reads, but is in fact required to purge - // potentially stale cache data which might result from the edge the case of rewinding a - // blob across a page boundary. + // Update blob_size *before* releasing the lock. We do this optimistically; if the write + // fails below, the program will return an error and likely abort/panic, so maintaining + // exact consistency on error isn't strictly required. + let write_offset = *blob_size; + *blob_size += buf.len() as u64; + + // Acquire blob write lock BEFORE releasing buffer lock. This ensures no reader can access + // the blob until the write completes. + let blob_guard = self.blob.write().await; + + // Release buffer lock, allowing concurrent buffered reads while the write is in progress. + // Any attempts to read from the blob will block until the write completes. + drop(buf_guard); + + // Perform the write while holding only blob lock. + blob_guard.write_at(buf, write_offset).await + } + + /// Prepares data from the buffer to be flushed to the blob. + /// + /// This method: + /// 1. Takes the data from the write buffer. + /// 2. Caches it in the buffer pool. + /// 3. Returns the data to be written and the offset to write it at (if any). + async fn prepare_flush_data(&self, buffer: &mut Buffer, blob_size: u64) -> Option> { + // Take the buffered data, if any. + let (mut buf, offset) = buffer.take()?; + + // Insert the flushed data into the buffer pool. let remaining = self.pool_ref.cache(self.id, &buf, offset).await; // If there's any data left over that doesn't constitute an entire page, re-buffer it into @@ -121,22 +165,20 @@ impl Append { // Early exit if there's no new data to write. if new_data_start >= buf.len() { - return Ok(()); + return None; } if new_data_start > 0 { buf.drain(0..new_data_start); } - let new_data_len = buf.len() as u64; - self.blob.write_at(buf, *blob_size).await?; - *blob_size += new_data_len; - Ok(()) + // Return the data to write, and the offset where to write it within the blob. + Some(buf) } /// Clones and returns the underlying blob. - pub fn clone_blob(&self) -> B { - self.blob.clone() + pub async fn clone_blob(&self) -> B { + self.blob.read().await.clone() } } @@ -155,7 +197,8 @@ impl Blob for Append { .ok_or(Error::OffsetOverflow)?; // Acquire a read lock on the buffer. - let (buffer, _) = &*self.buffer.read().await; + let guard = self.buffer.read().await; + let (buffer, _) = &*guard; // If the data required is beyond the size of the blob, return an error. if end_offset > buffer.size() { @@ -164,13 +207,40 @@ impl Blob for Append { // Extract any bytes from the buffer that overlap with the requested range. let remaining = buffer.extract(buf.as_mut(), offset); + + // Release buffer lock before potential I/O. + drop(guard); + if remaining == 0 { return Ok(buf); } - // If there are bytes remaining to be read, use the buffer pool to get them. + // Fast path: try to read *only* from pool cache without acquiring blob lock. This allows + // concurrent reads even while a flush is in progress. + let cached = self + .pool_ref + .read_cached(self.id, &mut buf.as_mut()[..remaining], offset) + .await; + + if cached == remaining { + // All bytes found in cache. + return Ok(buf); + } + + // Slow path: cache miss (partial or full), acquire blob read lock to ensure any in-flight + // write completes before we read from the blob. + let blob_guard = self.blob.read().await; + + // Read remaining bytes that were not already obtained from the earlier cache read. + let uncached_offset = offset + cached as u64; + let uncached_len = remaining - cached; self.pool_ref - .read(&self.blob, self.id, &mut buf.as_mut()[..remaining], offset) + .read( + &*blob_guard, + self.id, + &mut buf.as_mut()[cached..cached + uncached_len], + uncached_offset, + ) .await?; Ok(buf) @@ -184,14 +254,22 @@ impl Blob for Append { } async fn sync(&self) -> Result<(), Error> { + // Flush any buffered data. When flush_internal returns, the write_at has completed + // and data is in the OS buffer. { - let (buffer, blob_size) = &mut *self.buffer.write().await; - self.flush(buffer, blob_size).await?; + let guard = self.buffer.write().await; + self.flush_internal(guard).await?; } - self.blob.sync().await + // Sync the OS buffer to disk. We need the blob read lock here since sync() requires + // access to the blob, but only a read lock since we're not modifying blob state. + self.blob.read().await.sync().await } /// Resize the blob to the provided `size`. + /// + /// # Warning + /// + /// Concurrent readers which try to read past the new size during the resize may error. async fn resize(&self, size: u64) -> Result<(), Error> { // Implementation note: rewinding the blob across a page boundary potentially results in // stale data remaining in the buffer pool's cache. We don't proactively purge the data @@ -199,17 +277,32 @@ impl Blob for Append { // always updated should the blob grow back to the point where we have new data for the same // page, if any old data hasn't expired naturally by then. - // Acquire a write lock on the buffer. - let (buffer, blob_size) = &mut *self.buffer.write().await; + // Acquire buffer lock first. + // NOTE: We MUST acquire the buffer lock before the blob lock to avoid deadlocks with + // `append`, which acquires buffer then blob (via `flush_internal`). + let mut buf_guard = self.buffer.write().await; + let (buffer, blob_size) = &mut *buf_guard; + + let flush_data = self.prepare_flush_data(buffer, *blob_size).await; - // Flush any buffered bytes to the underlying blob. (Note that a fancier implementation - // might avoid flushing those bytes that are backed up over by the next step, if any.) - self.flush(buffer, blob_size).await?; + // Acquire blob write lock to prevent concurrent reads throughout the resize. + let blob_guard = self.blob.write().await; + + // Flush any buffered bytes first, using the helper. + // We hold both locks here, so no concurrent operations can happen. + if let Some(buf) = flush_data { + // Write the data to the blob. + let len = buf.len() as u64; + blob_guard.write_at(buf, *blob_size).await?; + + // Update blob_size to reflect the flush. + *blob_size += len; + } // Resize the underlying blob. - self.blob.resize(size).await?; + blob_guard.resize(size).await?; - // Update the physical blob size. + // Update the blob size. *blob_size = size; // Reset the append buffer to the new size, ensuring its page alignment. @@ -218,7 +311,7 @@ impl Blob for Append { buffer.data.clear(); if leftover_size != 0 { let page_buf = vec![0; leftover_size as usize]; - let buf = self.blob.read_at(page_buf, buffer.offset).await?; + let buf = blob_guard.read_at(page_buf, buffer.offset).await?; assert!(!buffer.append(buf.as_ref())); } diff --git a/runtime/src/utils/buffer/pool.rs b/runtime/src/utils/buffer/pool.rs index debaabc557..7ab0381b24 100644 --- a/runtime/src/utils/buffer/pool.rs +++ b/runtime/src/utils/buffer/pool.rs @@ -102,6 +102,35 @@ impl PoolRef { Pool::offset_to_page(self.page_size, offset) } + /// Try to read the specified bytes from the buffer pool cache only. Returns the number of + /// bytes successfully read from cache and copied to `buf` before a page fault, if any. + /// + /// This method never reads from the underlying blob - it only checks the cache. + /// + /// # Warning + /// + /// Attempts to read any of the last (blob_size % page_size) "trailing bytes" of the blob will + /// always return 0 since the buffer pool only deals with page sized chunks. + pub(super) async fn read_cached( + &self, + blob_id: u64, + mut buf: &mut [u8], + mut offset: u64, + ) -> usize { + let original_len = buf.len(); + let buffer_pool = self.pool.read().await; + while !buf.is_empty() { + let count = buffer_pool.read_at(self.page_size, blob_id, buf, offset); + if count == 0 { + // Cache miss - return how many bytes we successfully read + break; + } + offset += count as u64; + buf = &mut buf[count..]; + } + original_len - buf.len() + } + /// Read the specified bytes, preferentially from the buffer pool cache. Bytes not found in the /// buffer pool will be read from the provided `blob` and cached for future reads. /// diff --git a/storage/src/journal/contiguous/fixed.rs b/storage/src/journal/contiguous/fixed.rs index f256756b7d..d2b70df875 100644 --- a/storage/src/journal/contiguous/fixed.rs +++ b/storage/src/journal/contiguous/fixed.rs @@ -498,15 +498,15 @@ impl> Journal { assert!(start_blob <= self.tail_index); let blobs = self.blobs.range(start_blob..).collect::>(); let full_size = items_per_blob * Self::CHUNK_SIZE_U64; - let mut blob_plus = blobs - .into_iter() - .map(|(blob_index, blob)| (*blob_index, blob.clone_blob(), full_size)) - .collect::>(); + let mut blob_plus = Vec::with_capacity(blobs.len() + 1); + for (blob_index, blob) in blobs { + blob_plus.push((*blob_index, blob.clone_blob().await, full_size)); + } // Include the tail blob. self.tail.sync().await?; // make sure no data is buffered let tail_size = self.tail.size().await; - blob_plus.push((self.tail_index, self.tail.clone_blob(), tail_size)); + blob_plus.push((self.tail_index, self.tail.clone_blob().await, tail_size)); let start_offset = (start_pos % items_per_blob) * Self::CHUNK_SIZE_U64; // Replay all blobs in order and stream items as they are read (to avoid occupying too much