Skip to content

Commit

Permalink
Merge pull request #70 from Gifted-s/feat/block_cache
Browse files Browse the repository at this point in the history
[Feat] File Descriptor and Unit tests for it
  • Loading branch information
Gifted-s authored Aug 5, 2024
2 parents e844728 + 98bcdaa commit b7c309f
Show file tree
Hide file tree
Showing 12 changed files with 451 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ tempfile = "3.10.1"
thiserror = "1.0.57"
tokio = { version = "1.38.0", features = ["full"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
use = "0.0.1-pre.0"

[target.'cfg(target_os = "linux")']
1 change: 1 addition & 0 deletions v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ log = "0.4.22"
lz4_flex = "0.11.3"
nanoid = "0.4.0"
path-absolutize = "3.1.1"
quick_cache = "0.6.2"
seahash = "4.1.0"
tempfile = "3.10.1"
test-log = "0.2.16"
Expand Down
119 changes: 119 additions & 0 deletions v2/src/block_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use std::sync::Arc;

use crate::block_cache::Either::Left;
use crate::block_cache::Either::Right;
use crate::{
either::Either,
sst::{
block_index::{block_handle::KeyedBlockHandle, IndexBlock},
id::GlobalSegmentId,
value_offset_block::ValueOffsetBlock,
},
};
use quick_cache::Weighter;
use quick_cache::{sync::Cache, Equivalent};

type Item = Either<Arc<ValueOffsetBlock>, Arc<IndexBlock>>;

// (Type (TreeId, Segment ID), Block offset)
#[derive(Eq, std::hash::Hash, PartialEq)]
struct CacheKey(GlobalSegmentId, u64);

impl Equivalent<CacheKey> for (GlobalSegmentId, u64) {
fn equivalent(&self, key: &CacheKey) -> bool {
self.0 == key.0 && self.1 == key.1
}
}

impl From<(GlobalSegmentId, u64)> for CacheKey {
fn from((gid, bid): (GlobalSegmentId, u64)) -> Self {
Self(gid, bid)
}
}

#[derive(Clone)]
struct BlockWeighter;

impl Weighter<CacheKey, Item> for BlockWeighter {
fn weight(&self, _: &CacheKey, block: &Item) -> u64 {
// NOTE: Truncation is fine: blocks are definitely below 4 GiB
#[allow(clippy::cast_possible_truncation)]
match block {
Either::Left(block) => block.size() as u64,
Either::Right(block) => block
.items
.iter()
.map(|x| x.end_key.len() + std::mem::size_of::<KeyedBlockHandle>())
.sum::<usize>() as u64,
}
}
}

pub struct BlockCache {
data: Cache<CacheKey, Item, BlockWeighter>,
capacity: u64,
}

impl BlockCache {
#[must_use]
pub fn with_capacity_bytes(bytes: u64) -> Self {
Self {
data: Cache::with_weighter(1_000_000, bytes, BlockWeighter),
capacity: bytes,
}
}

/// Returns the amount of cached bytes
#[must_use]
pub fn size(&self) -> u64 {
self.data.weight()
}

/// Returns the cache capacity in bytes.
#[must_use]
pub fn capacity(&self) -> u64 {
self.capacity
}

/// Returns the number of cached blocks.
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}

/// Returns `true` if there are no cached blocks.
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}

#[doc(hidden)]
pub fn insert_disk_block(&self, sstable_id: GlobalSegmentId, offset: u64, value: Arc<ValueOffsetBlock>) {
if self.capacity > 0 {
self.data.insert((sstable_id, offset).into(), Left(value));
}
}

#[doc(hidden)]
pub fn insert_index_block(&self, sstable_id: GlobalSegmentId, offset: u64, value: Arc<IndexBlock>) {
if self.capacity > 0 {
self.data.insert((sstable_id, offset).into(), Right(value));
}
}

#[doc(hidden)]
#[must_use]
pub fn get_disk_block(&self, sstable_id: GlobalSegmentId, offset: u64) -> Option<Arc<ValueOffsetBlock>> {
let key = (sstable_id, offset);
let item = self.data.get(&key)?;
Some(item.left())
}

#[doc(hidden)]
#[must_use]
pub fn get_index_block(&self, sstable_id: GlobalSegmentId, offset: u64) -> Option<Arc<IndexBlock>> {
let key = (sstable_id, offset);
let item = self.data.get(&key)?;
Some(item.right())
}
}
31 changes: 31 additions & 0 deletions v2/src/descriptor_table/lru.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use std::collections::VecDeque;

#[derive(Default)]
#[allow(clippy::module_name_repetitions)]
pub struct LruList<T: Clone + Eq + PartialEq>(VecDeque<T>);

impl<T: Clone + Eq + PartialEq> LruList<T> {
#[must_use]
pub fn with_capacity(n: usize) -> Self {
Self(VecDeque::with_capacity(n))
}

pub fn remove_by(&mut self, f: impl FnMut(&T) -> bool) {
self.0.retain(f);
}

pub fn remove(&mut self, item: &T) {
self.remove_by(|x| x != item);
}

pub fn refresh(&mut self, item: T) {
self.remove(&item);
self.0.push_back(item);
}

pub fn get_least_recently_used(&mut self) -> Option<T> {
let front = self.0.pop_front()?;
self.0.push_back(front.clone());
Some(front)
}
}
Loading

0 comments on commit b7c309f

Please sign in to comment.