Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commitlog: Make offset index usable externally #2108

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 122 additions & 24 deletions crates/commitlog/src/index/indexfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const ENTRY_SIZE: usize = KEY_SIZE + mem::size_of::<u64>();
/// key-value pairs
/// Succesive key written should be sorted in ascending order, 0 is invalid-key value
#[derive(Debug)]
pub struct IndexFileMut<Key: Into<u64> + From<u64>> {
pub struct IndexFileMut<Key> {
// A mutable memory-mapped buffer that represents the file contents.
inner: MmapMut,
/// The number of entries currently stored in the index file.
Expand Down Expand Up @@ -129,20 +129,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
return Err(IndexError::OutOfRange);
}

let entry = &self.inner[start..start + ENTRY_SIZE];

let key = u64::from_le_bytes(
entry[..mem::size_of::<u64>()]
.try_into()
.map_err(|_| IndexError::InvalidFormat)?,
);
let value = u64::from_le_bytes(
entry[mem::size_of::<u64>()..]
.try_into()
.map_err(|_| IndexError::InvalidFormat)?,
);

Ok((Key::from(key), value))
entry(&self.inner, start)
}

/// Returns the last key in the index file.
Expand All @@ -152,9 +139,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
return Ok(0);
}
let start = (self.num_entries - 1) * ENTRY_SIZE;
let key_bytes: &[u8] = &self.inner[start..start + KEY_SIZE];
let key = u64::from_le_bytes(key_bytes.try_into().map_err(|_| IndexError::InvalidFormat)?);
Ok(key)
u64_from_le_bytes(&self.inner[start..start + KEY_SIZE])
}

// Return (key, value) pair of key just smaller or equal to given key
Expand Down Expand Up @@ -222,12 +207,36 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {

Ok(())
}

/// Obtain an iterator over the entries of the index.
pub fn entries(&self) -> Entries<Key> {
Entries {
mmap: &self.inner,
pos: 0,
max: self.num_entries * ENTRY_SIZE,
_key: PhantomData,
}
}
}

impl<'a, K: Into<u64> + From<u64>> IntoIterator for &'a IndexFileMut<K> {
type Item = Result<(K, u64), IndexError>;
type IntoIter = Entries<'a, K>;

fn into_iter(self) -> Self::IntoIter {
self.entries()
}
}

impl<Key: Into<u64> + From<u64>> From<IndexFile<Key>> for IndexFileMut<Key> {
fn from(IndexFile { inner }: IndexFile<Key>) -> Self {
inner
}
}

/// A wrapper over [`IndexFileMut`] to provide read-only access to the index file.
pub struct IndexFile<Key: Into<u64> + From<u64>> {
pub struct IndexFile<Key> {
inner: IndexFileMut<Key>,
_marker: PhantomData<Key>,
}

impl<Key: Into<u64> + From<u64>> IndexFile<Key> {
Expand All @@ -244,15 +253,79 @@ impl<Key: Into<u64> + From<u64>> IndexFile<Key> {
.num_entries()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(Self {
inner,
_marker: PhantomData,
})
Ok(Self { inner })
}

pub fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError> {
self.inner.key_lookup(key)
}

/// Obtain an iterator over the entries of the index.
pub fn entries(&self) -> Entries<Key> {
self.inner.entries()
}
}

impl<K> AsMut<IndexFileMut<K>> for IndexFile<K> {
fn as_mut(&mut self) -> &mut IndexFileMut<K> {
&mut self.inner
}
}

impl<'a, Key: Into<u64> + From<u64>> IntoIterator for &'a IndexFile<Key> {
type Item = Result<(Key, u64), IndexError>;
type IntoIter = Entries<'a, Key>;

fn into_iter(self) -> Self::IntoIter {
self.entries()
}
}

impl<Key: Into<u64> + From<u64>> From<IndexFileMut<Key>> for IndexFile<Key> {
fn from(inner: IndexFileMut<Key>) -> Self {
Self { inner }
}
}

/// Iterator over the entries of an [`IndexFileMut`] or [`IndexFile`].
///
/// Yields pairs of `(K, u64)` or an error if an entry could not be decoded.
pub struct Entries<'a, K> {
mmap: &'a [u8],
pos: usize,
max: usize,
_key: PhantomData<K>,
}

impl<'a, K: From<u64>> Iterator for Entries<'a, K> {
type Item = Result<(K, u64), IndexError>;

fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.max {
return None;
}

let item = entry(self.mmap, self.pos);
if item.is_ok() {
self.pos += ENTRY_SIZE;
}
Some(item)
}
}

fn entry<K: From<u64>>(mmap: &[u8], start: usize) -> Result<(K, u64), IndexError> {
let entry = &mmap[start..start + ENTRY_SIZE];
let sz = mem::size_of::<u64>();
let key = u64_from_le_bytes(&entry[..sz])?;
let val = u64_from_le_bytes(&entry[sz..])?;

Ok((key.into(), val))
}

fn u64_from_le_bytes(x: &[u8]) -> Result<u64, IndexError> {
x.try_into()
.map_err(|_| IndexError::InvalidFormat)
.map(u64::from_le_bytes)
}

#[cfg(test)]
Expand Down Expand Up @@ -370,4 +443,29 @@ mod tests {

Ok(())
}

#[test]
fn test_iterator_iterates() -> Result<(), IndexError> {
let index = create_and_fill_index(100, 100)?;

let expected = (1..100).map(|key| (key * 2, key * 2 * 100)).collect::<Vec<_>>();
let entries = index.entries().collect::<Result<Vec<_>, _>>()?;
assert_eq!(&entries, &expected);

// `IndexFile` should yield the same result
let index: IndexFile<u64> = index.into();
let entries = index.entries().collect::<Result<Vec<_>, _>>()?;
assert_eq!(&entries, &expected);

Ok(())
}

#[test]
fn test_iterator_yields_nothing_for_empty_index() -> Result<(), IndexError> {
let index = create_and_fill_index(100, 0)?;
let entries = index.entries().collect::<Result<Vec<_>, _>>()?;
assert!(entries.is_empty());

Ok(())
}
}
8 changes: 8 additions & 0 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ impl Default for Options {
}
}

impl Options {
/// Compute the length in bytes of an offset index based on the settings in
/// `self`.
pub fn offset_index_len(&self) -> u64 {
self.max_segment_size / self.offset_index_interval_bytes
}
}

/// The canonical commitlog, backed by on-disk log files.
///
/// Records in the log are of type `T`, which canonically is instantiated to
Expand Down
6 changes: 1 addition & 5 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ pub trait Repo: Clone {
}
}

fn offset_index_len(opts: Options) -> u64 {
opts.max_segment_size / opts.offset_index_interval_bytes
}

fn create_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) -> Option<OffsetIndexWriter> {
repo.create_offset_index(offset, offset_index_len(opts))
repo.create_offset_index(offset, opts.offset_index_len())
.map(|index| OffsetIndexWriter::new(index, opts))
.map_err(|e| {
warn!("failed to get offset index for segment {offset}: {e}");
Expand Down
10 changes: 7 additions & 3 deletions crates/commitlog/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
ops::Range,
};

use log::debug;
use log::{debug, warn};

use crate::{
commit::{self, Commit, StoredCommit},
Expand Down Expand Up @@ -269,7 +269,7 @@ impl OffsetIndexWriter {
}

/// Either append to index or save offsets to append at future fsync
fn append_after_commit(
pub fn append_after_commit(
&mut self,
min_tx_offset: TxOffset,
byte_offset: u64,
Expand Down Expand Up @@ -312,8 +312,12 @@ impl FileLike for OffsetIndexWriter {
/// Must be called via SegmentWriter::fsync
fn fsync(&mut self) -> io::Result<()> {
let _ = self.append_internal().map_err(|e| {
debug!("failed to append to offset index: {:?}", e);
warn!("failed to append to offset index: {e:?}");
});
let _ = self
.head
.async_flush()
.map_err(|e| warn!("failed to flush offset index: {e:?}"));
Ok(())
}

Expand Down
Loading