Skip to content

Commit 9548f23

Browse files
committed
commitlog: Make offset index usable externally
* make `OffsetIndexWriter::append_after_commit` pub * expose offset index len calculation as method on `Options` * add conversions between mutable and read-only index (`as_mut`, `From` impls) * add an iterator, which may be used to check if the index is valid
1 parent 3798f46 commit 9548f23

File tree

4 files changed

+137
-31
lines changed

4 files changed

+137
-31
lines changed

crates/commitlog/src/index/indexfile.rs

+122-24
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const ENTRY_SIZE: usize = KEY_SIZE + mem::size_of::<u64>();
1919
/// key-value pairs
2020
/// Succesive key written should be sorted in ascending order, 0 is invalid-key value
2121
#[derive(Debug)]
22-
pub struct IndexFileMut<Key: Into<u64> + From<u64>> {
22+
pub struct IndexFileMut<Key> {
2323
// A mutable memory-mapped buffer that represents the file contents.
2424
inner: MmapMut,
2525
/// The number of entries currently stored in the index file.
@@ -129,20 +129,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
129129
return Err(IndexError::OutOfRange);
130130
}
131131

132-
let entry = &self.inner[start..start + ENTRY_SIZE];
133-
134-
let key = u64::from_le_bytes(
135-
entry[..mem::size_of::<u64>()]
136-
.try_into()
137-
.map_err(|_| IndexError::InvalidFormat)?,
138-
);
139-
let value = u64::from_le_bytes(
140-
entry[mem::size_of::<u64>()..]
141-
.try_into()
142-
.map_err(|_| IndexError::InvalidFormat)?,
143-
);
144-
145-
Ok((Key::from(key), value))
132+
entry(&self.inner, start)
146133
}
147134

148135
/// Returns the last key in the index file.
@@ -152,9 +139,7 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
152139
return Ok(0);
153140
}
154141
let start = (self.num_entries - 1) * ENTRY_SIZE;
155-
let key_bytes: &[u8] = &self.inner[start..start + KEY_SIZE];
156-
let key = u64::from_le_bytes(key_bytes.try_into().map_err(|_| IndexError::InvalidFormat)?);
157-
Ok(key)
142+
u64_from_le_bytes(&self.inner[start..start + KEY_SIZE])
158143
}
159144

160145
// Return (key, value) pair of key just smaller or equal to given key
@@ -222,12 +207,36 @@ impl<Key: Into<u64> + From<u64>> IndexFileMut<Key> {
222207

223208
Ok(())
224209
}
210+
211+
/// Obtain an iterator over the entries of the index.
212+
pub fn entries(&self) -> Entries<Key> {
213+
Entries {
214+
mmap: &self.inner,
215+
pos: 0,
216+
max: self.num_entries * ENTRY_SIZE,
217+
_key: PhantomData,
218+
}
219+
}
220+
}
221+
222+
impl<'a, K: Into<u64> + From<u64>> IntoIterator for &'a IndexFileMut<K> {
223+
type Item = Result<(K, u64), IndexError>;
224+
type IntoIter = Entries<'a, K>;
225+
226+
fn into_iter(self) -> Self::IntoIter {
227+
self.entries()
228+
}
229+
}
230+
231+
impl<Key: Into<u64> + From<u64>> From<IndexFile<Key>> for IndexFileMut<Key> {
232+
fn from(IndexFile { inner }: IndexFile<Key>) -> Self {
233+
inner
234+
}
225235
}
226236

227237
/// A wrapper over [`IndexFileMut`] to provide read-only access to the index file.
228-
pub struct IndexFile<Key: Into<u64> + From<u64>> {
238+
pub struct IndexFile<Key> {
229239
inner: IndexFileMut<Key>,
230-
_marker: PhantomData<Key>,
231240
}
232241

233242
impl<Key: Into<u64> + From<u64>> IndexFile<Key> {
@@ -244,15 +253,79 @@ impl<Key: Into<u64> + From<u64>> IndexFile<Key> {
244253
.num_entries()
245254
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
246255

247-
Ok(Self {
248-
inner,
249-
_marker: PhantomData,
250-
})
256+
Ok(Self { inner })
251257
}
252258

253259
pub fn key_lookup(&self, key: Key) -> Result<(Key, u64), IndexError> {
254260
self.inner.key_lookup(key)
255261
}
262+
263+
/// Obtain an iterator over the entries of the index.
264+
pub fn entries(&self) -> Entries<Key> {
265+
self.inner.entries()
266+
}
267+
}
268+
269+
impl<K> AsMut<IndexFileMut<K>> for IndexFile<K> {
270+
fn as_mut(&mut self) -> &mut IndexFileMut<K> {
271+
&mut self.inner
272+
}
273+
}
274+
275+
impl<'a, Key: Into<u64> + From<u64>> IntoIterator for &'a IndexFile<Key> {
276+
type Item = Result<(Key, u64), IndexError>;
277+
type IntoIter = Entries<'a, Key>;
278+
279+
fn into_iter(self) -> Self::IntoIter {
280+
self.entries()
281+
}
282+
}
283+
284+
impl<Key: Into<u64> + From<u64>> From<IndexFileMut<Key>> for IndexFile<Key> {
285+
fn from(inner: IndexFileMut<Key>) -> Self {
286+
Self { inner }
287+
}
288+
}
289+
290+
/// Iterator over the entries of an [`IndexFileMut`] or [`IndexFile`].
291+
///
292+
/// Yields pairs of `(K, u64)` or an error if an entry could not be decoded.
293+
pub struct Entries<'a, K> {
294+
mmap: &'a [u8],
295+
pos: usize,
296+
max: usize,
297+
_key: PhantomData<K>,
298+
}
299+
300+
impl<'a, K: From<u64>> Iterator for Entries<'a, K> {
301+
type Item = Result<(K, u64), IndexError>;
302+
303+
fn next(&mut self) -> Option<Self::Item> {
304+
if self.pos >= self.max {
305+
return None;
306+
}
307+
308+
let item = entry(self.mmap, self.pos);
309+
if item.is_ok() {
310+
self.pos += ENTRY_SIZE;
311+
}
312+
Some(item)
313+
}
314+
}
315+
316+
fn entry<K: From<u64>>(mmap: &[u8], start: usize) -> Result<(K, u64), IndexError> {
317+
let entry = &mmap[start..start + ENTRY_SIZE];
318+
let sz = mem::size_of::<u64>();
319+
let key = u64_from_le_bytes(&entry[..sz])?;
320+
let val = u64_from_le_bytes(&entry[sz..])?;
321+
322+
Ok((key.into(), val))
323+
}
324+
325+
fn u64_from_le_bytes(x: &[u8]) -> Result<u64, IndexError> {
326+
x.try_into()
327+
.map_err(|_| IndexError::InvalidFormat)
328+
.map(u64::from_le_bytes)
256329
}
257330

258331
#[cfg(test)]
@@ -370,4 +443,29 @@ mod tests {
370443

371444
Ok(())
372445
}
446+
447+
#[test]
448+
fn test_iterator_iterates() -> Result<(), IndexError> {
449+
let index = create_and_fill_index(100, 100)?;
450+
451+
let expected = (1..100).map(|key| (key * 2, key * 2 * 100)).collect::<Vec<_>>();
452+
let entries = index.entries().collect::<Result<Vec<_>, _>>()?;
453+
assert_eq!(&entries, &expected);
454+
455+
// `IndexFile` should yield the same result
456+
let index: IndexFile<u64> = index.into();
457+
let entries = index.entries().collect::<Result<Vec<_>, _>>()?;
458+
assert_eq!(&entries, &expected);
459+
460+
Ok(())
461+
}
462+
463+
#[test]
464+
fn test_iterator_yields_nothing_for_empty_index() -> Result<(), IndexError> {
465+
let index = create_and_fill_index(100, 0)?;
466+
let entries = index.entries().collect::<Result<Vec<_>, _>>()?;
467+
assert!(entries.is_empty());
468+
469+
Ok(())
470+
}
373471
}

crates/commitlog/src/lib.rs

+8
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ impl Default for Options {
8686
}
8787
}
8888

89+
impl Options {
90+
/// Compute the length in bytes of an offset index based on the settings in
91+
/// `self`.
92+
pub fn offset_index_len(&self) -> u64 {
93+
self.max_segment_size / self.offset_index_interval_bytes
94+
}
95+
}
96+
8997
/// The canonical commitlog, backed by on-disk log files.
9098
///
9199
/// Records in the log are of type `T`, which canonically is instantiated to

crates/commitlog/src/repo/mod.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,8 @@ pub trait Repo: Clone {
9191
}
9292
}
9393

94-
fn offset_index_len(opts: Options) -> u64 {
95-
opts.max_segment_size / opts.offset_index_interval_bytes
96-
}
97-
9894
fn create_offset_index_writer<R: Repo>(repo: &R, offset: u64, opts: Options) -> Option<OffsetIndexWriter> {
99-
repo.create_offset_index(offset, offset_index_len(opts))
95+
repo.create_offset_index(offset, opts.offset_index_len())
10096
.map(|index| OffsetIndexWriter::new(index, opts))
10197
.map_err(|e| {
10298
warn!("failed to get offset index for segment {offset}: {e}");

crates/commitlog/src/segment.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl OffsetIndexWriter {
269269
}
270270

271271
/// Either append to index or save offsets to append at future fsync
272-
fn append_after_commit(
272+
pub fn append_after_commit(
273273
&mut self,
274274
min_tx_offset: TxOffset,
275275
byte_offset: u64,
@@ -312,8 +312,12 @@ impl FileLike for OffsetIndexWriter {
312312
/// Must be called via SegmentWriter::fsync
313313
fn fsync(&mut self) -> io::Result<()> {
314314
let _ = self.append_internal().map_err(|e| {
315-
debug!("failed to append to offset index: {:?}", e);
315+
warn!("failed to append to offset index: {e:?}");
316316
});
317+
let _ = self
318+
.head
319+
.async_flush()
320+
.map_err(|e| warn!("failed to flush offset index: {e:?}"));
317321
Ok(())
318322
}
319323

0 commit comments

Comments
 (0)