diff --git a/crates/commitlog/src/index/indexfile.rs b/crates/commitlog/src/index/indexfile.rs index a3c4ef0196c..ba1dd65d1fa 100644 --- a/crates/commitlog/src/index/indexfile.rs +++ b/crates/commitlog/src/index/indexfile.rs @@ -19,7 +19,7 @@ const ENTRY_SIZE: usize = KEY_SIZE + mem::size_of::(); /// key-value pairs /// Succesive key written should be sorted in ascending order, 0 is invalid-key value #[derive(Debug)] -pub struct IndexFileMut + From> { +pub struct IndexFileMut { // A mutable memory-mapped buffer that represents the file contents. inner: MmapMut, /// The number of entries currently stored in the index file. @@ -129,20 +129,7 @@ impl + From> IndexFileMut { return Err(IndexError::OutOfRange); } - let entry = &self.inner[start..start + ENTRY_SIZE]; - - let key = u64::from_le_bytes( - entry[..mem::size_of::()] - .try_into() - .map_err(|_| IndexError::InvalidFormat)?, - ); - let value = u64::from_le_bytes( - entry[mem::size_of::()..] - .try_into() - .map_err(|_| IndexError::InvalidFormat)?, - ); - - Ok((Key::from(key), value)) + entry(&self.inner, start) } /// Returns the last key in the index file. @@ -152,9 +139,7 @@ impl + From> IndexFileMut { return Ok(0); } let start = (self.num_entries - 1) * ENTRY_SIZE; - let key_bytes: &[u8] = &self.inner[start..start + KEY_SIZE]; - let key = u64::from_le_bytes(key_bytes.try_into().map_err(|_| IndexError::InvalidFormat)?); - Ok(key) + u64_from_le_bytes(&self.inner[start..start + KEY_SIZE]) } // Return (key, value) pair of key just smaller or equal to given key @@ -222,12 +207,36 @@ impl + From> IndexFileMut { Ok(()) } + + /// Obtain an iterator over the entries of the index. + pub fn entries(&self) -> Entries { + Entries { + mmap: &self.inner, + pos: 0, + max: self.num_entries * ENTRY_SIZE, + _key: PhantomData, + } + } +} + +impl<'a, K: Into + From> IntoIterator for &'a IndexFileMut { + type Item = Result<(K, u64), IndexError>; + type IntoIter = Entries<'a, K>; + + fn into_iter(self) -> Self::IntoIter { + self.entries() + } +} + +impl + From> From> for IndexFileMut { + fn from(IndexFile { inner }: IndexFile) -> Self { + inner + } } /// A wrapper over [`IndexFileMut`] to provide read-only access to the index file. -pub struct IndexFile + From> { +pub struct IndexFile { inner: IndexFileMut, - _marker: PhantomData, } impl + From> IndexFile { @@ -244,15 +253,79 @@ impl + From> IndexFile { .num_entries() .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - Ok(Self { - inner, - _marker: PhantomData, - }) + Ok(Self { inner }) } pub fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError> { self.inner.key_lookup(key) } + + /// Obtain an iterator over the entries of the index. + pub fn entries(&self) -> Entries { + self.inner.entries() + } +} + +impl AsMut> for IndexFile { + fn as_mut(&mut self) -> &mut IndexFileMut { + &mut self.inner + } +} + +impl<'a, Key: Into + From> IntoIterator for &'a IndexFile { + type Item = Result<(Key, u64), IndexError>; + type IntoIter = Entries<'a, Key>; + + fn into_iter(self) -> Self::IntoIter { + self.entries() + } +} + +impl + From> From> for IndexFile { + fn from(inner: IndexFileMut) -> Self { + Self { inner } + } +} + +/// Iterator over the entries of an [`IndexFileMut`] or [`IndexFile`]. +/// +/// Yields pairs of `(K, u64)` or an error if an entry could not be decoded. +pub struct Entries<'a, K> { + mmap: &'a [u8], + pos: usize, + max: usize, + _key: PhantomData, +} + +impl<'a, K: From> Iterator for Entries<'a, K> { + type Item = Result<(K, u64), IndexError>; + + fn next(&mut self) -> Option { + if self.pos >= self.max { + return None; + } + + let item = entry(self.mmap, self.pos); + if item.is_ok() { + self.pos += ENTRY_SIZE; + } + Some(item) + } +} + +fn entry>(mmap: &[u8], start: usize) -> Result<(K, u64), IndexError> { + let entry = &mmap[start..start + ENTRY_SIZE]; + let sz = mem::size_of::(); + let key = u64_from_le_bytes(&entry[..sz])?; + let val = u64_from_le_bytes(&entry[sz..])?; + + Ok((key.into(), val)) +} + +fn u64_from_le_bytes(x: &[u8]) -> Result { + x.try_into() + .map_err(|_| IndexError::InvalidFormat) + .map(u64::from_le_bytes) } #[cfg(test)] @@ -370,4 +443,29 @@ mod tests { Ok(()) } + + #[test] + fn test_iterator_iterates() -> Result<(), IndexError> { + let index = create_and_fill_index(100, 100)?; + + let expected = (1..100).map(|key| (key * 2, key * 2 * 100)).collect::>(); + let entries = index.entries().collect::, _>>()?; + assert_eq!(&entries, &expected); + + // `IndexFile` should yield the same result + let index: IndexFile = index.into(); + let entries = index.entries().collect::, _>>()?; + assert_eq!(&entries, &expected); + + Ok(()) + } + + #[test] + fn test_iterator_yields_nothing_for_empty_index() -> Result<(), IndexError> { + let index = create_and_fill_index(100, 0)?; + let entries = index.entries().collect::, _>>()?; + assert!(entries.is_empty()); + + Ok(()) + } } diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 72ca54d00eb..6cfc49c3232 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -86,6 +86,14 @@ impl Default for Options { } } +impl Options { + /// Compute the length in bytes of an offset index based on the settings in + /// `self`. + pub fn offset_index_len(&self) -> u64 { + self.max_segment_size / self.offset_index_interval_bytes + } +} + /// The canonical commitlog, backed by on-disk log files. /// /// Records in the log are of type `T`, which canonically is instantiated to diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index be045a632e8..8a188358cef 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -91,12 +91,8 @@ pub trait Repo: Clone { } } -fn offset_index_len(opts: Options) -> u64 { - opts.max_segment_size / opts.offset_index_interval_bytes -} - fn create_offset_index_writer(repo: &R, offset: u64, opts: Options) -> Option { - repo.create_offset_index(offset, offset_index_len(opts)) + repo.create_offset_index(offset, opts.offset_index_len()) .map(|index| OffsetIndexWriter::new(index, opts)) .map_err(|e| { warn!("failed to get offset index for segment {offset}: {e}"); diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 94c22334db1..979fcfeecb8 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -5,7 +5,7 @@ use std::{ ops::Range, }; -use log::debug; +use log::{debug, warn}; use crate::{ commit::{self, Commit, StoredCommit}, @@ -269,7 +269,7 @@ impl OffsetIndexWriter { } /// Either append to index or save offsets to append at future fsync - fn append_after_commit( + pub fn append_after_commit( &mut self, min_tx_offset: TxOffset, byte_offset: u64, @@ -312,8 +312,12 @@ impl FileLike for OffsetIndexWriter { /// Must be called via SegmentWriter::fsync fn fsync(&mut self) -> io::Result<()> { let _ = self.append_internal().map_err(|e| { - debug!("failed to append to offset index: {:?}", e); + warn!("failed to append to offset index: {e:?}"); }); + let _ = self + .head + .async_flush() + .map_err(|e| warn!("failed to flush offset index: {e:?}")); Ok(()) }