diff --git a/Cargo.toml b/Cargo.toml index a33b057..c843727 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,5 +37,6 @@ tempfile = "3.10.1" thiserror = "1.0.57" tokio = { version = "1.38.0", features = ["full"] } uuid = { version = "0.8", features = ["serde", "v4"] } +use = "0.0.1-pre.0" [target.'cfg(target_os = "linux")'] diff --git a/v2/Cargo.toml b/v2/Cargo.toml index 59812f8..3837aaa 100644 --- a/v2/Cargo.toml +++ b/v2/Cargo.toml @@ -14,6 +14,7 @@ log = "0.4.22" lz4_flex = "0.11.3" nanoid = "0.4.0" path-absolutize = "3.1.1" +quick_cache = "0.6.2" seahash = "4.1.0" tempfile = "3.10.1" test-log = "0.2.16" diff --git a/v2/src/block_cache.rs b/v2/src/block_cache.rs new file mode 100644 index 0000000..c379223 --- /dev/null +++ b/v2/src/block_cache.rs @@ -0,0 +1,119 @@ +use std::sync::Arc; + +use crate::block_cache::Either::Left; +use crate::block_cache::Either::Right; +use crate::{ + either::Either, + sst::{ + block_index::{block_handle::KeyedBlockHandle, IndexBlock}, + id::GlobalSegmentId, + value_offset_block::ValueOffsetBlock, + }, +}; +use quick_cache::Weighter; +use quick_cache::{sync::Cache, Equivalent}; + +type Item = Either, Arc>; + +// (Type (TreeId, Segment ID), Block offset) +#[derive(Eq, std::hash::Hash, PartialEq)] +struct CacheKey(GlobalSegmentId, u64); + +impl Equivalent for (GlobalSegmentId, u64) { + fn equivalent(&self, key: &CacheKey) -> bool { + self.0 == key.0 && self.1 == key.1 + } +} + +impl From<(GlobalSegmentId, u64)> for CacheKey { + fn from((gid, bid): (GlobalSegmentId, u64)) -> Self { + Self(gid, bid) + } +} + +#[derive(Clone)] +struct BlockWeighter; + +impl Weighter for BlockWeighter { + fn weight(&self, _: &CacheKey, block: &Item) -> u64 { + // NOTE: Truncation is fine: blocks are definitely below 4 GiB + #[allow(clippy::cast_possible_truncation)] + match block { + Either::Left(block) => block.size() as u64, + Either::Right(block) => block + .items + .iter() + .map(|x| x.end_key.len() + std::mem::size_of::()) + .sum::() as u64, + } + } +} + +pub struct BlockCache { + data: Cache, + capacity: u64, +} + +impl BlockCache { + #[must_use] + pub fn with_capacity_bytes(bytes: u64) -> Self { + Self { + data: Cache::with_weighter(1_000_000, bytes, BlockWeighter), + capacity: bytes, + } + } + + /// Returns the amount of cached bytes + #[must_use] + pub fn size(&self) -> u64 { + self.data.weight() + } + + /// Returns the cache capacity in bytes. + #[must_use] + pub fn capacity(&self) -> u64 { + self.capacity + } + + /// Returns the number of cached blocks. + #[must_use] + pub fn len(&self) -> usize { + self.data.len() + } + + /// Returns `true` if there are no cached blocks. + #[must_use] + pub fn is_empty(&self) -> bool { + self.data.is_empty() + } + + #[doc(hidden)] + pub fn insert_disk_block(&self, sstable_id: GlobalSegmentId, offset: u64, value: Arc) { + if self.capacity > 0 { + self.data.insert((sstable_id, offset).into(), Left(value)); + } + } + + #[doc(hidden)] + pub fn insert_index_block(&self, sstable_id: GlobalSegmentId, offset: u64, value: Arc) { + if self.capacity > 0 { + self.data.insert((sstable_id, offset).into(), Right(value)); + } + } + + #[doc(hidden)] + #[must_use] + pub fn get_disk_block(&self, sstable_id: GlobalSegmentId, offset: u64) -> Option> { + let key = (sstable_id, offset); + let item = self.data.get(&key)?; + Some(item.left()) + } + + #[doc(hidden)] + #[must_use] + pub fn get_index_block(&self, sstable_id: GlobalSegmentId, offset: u64) -> Option> { + let key = (sstable_id, offset); + let item = self.data.get(&key)?; + Some(item.right()) + } +} diff --git a/v2/src/descriptor_table/lru.rs b/v2/src/descriptor_table/lru.rs new file mode 100644 index 0000000..d55f564 --- /dev/null +++ b/v2/src/descriptor_table/lru.rs @@ -0,0 +1,31 @@ +use std::collections::VecDeque; + +#[derive(Default)] +#[allow(clippy::module_name_repetitions)] +pub struct LruList(VecDeque); + +impl LruList { + #[must_use] + pub fn with_capacity(n: usize) -> Self { + Self(VecDeque::with_capacity(n)) + } + + pub fn remove_by(&mut self, f: impl FnMut(&T) -> bool) { + self.0.retain(f); + } + + pub fn remove(&mut self, item: &T) { + self.remove_by(|x| x != item); + } + + pub fn refresh(&mut self, item: T) { + self.remove(&item); + self.0.push_back(item); + } + + pub fn get_least_recently_used(&mut self) -> Option { + let front = self.0.pop_front()?; + self.0.push_back(front.clone()); + Some(front) + } +} diff --git a/v2/src/descriptor_table/mod.rs b/v2/src/descriptor_table/mod.rs new file mode 100644 index 0000000..cacd30a --- /dev/null +++ b/v2/src/descriptor_table/mod.rs @@ -0,0 +1,251 @@ +use crate::sst::id::GlobalSegmentId; +use lru::LruList; +use std::{ + collections::HashMap, + fs::File, + io::BufReader, + path::PathBuf, + sync::{ + atomic::{AtomicBool, AtomicUsize}, + Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, + }, +}; + +mod lru; + +pub struct FileGuard(Arc); + +impl std::ops::Deref for FileGuard { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Drop for FileGuard { + fn drop(&mut self) { + self.0.is_used.store(false, std::sync::atomic::Ordering::Release); + } +} + +pub struct FileDescriptorWrapper { + pub file: Mutex>, + is_used: AtomicBool, +} + +pub struct FileHandle { + descriptors: RwLock>>, + path: PathBuf, +} + +pub struct FileDescriptorInner { + table: HashMap, + lru: Mutex>, + size: AtomicUsize, +} + +pub struct FileDescriptorTable { + inner: RwLock, + concurrency: usize, + limit: usize, +} + +impl FileDescriptorTable { + /// Closes all file descriptor + pub fn clear(&self) { + let mut lock = self.inner.write().expect("lock is poisoned"); + lock.table.clear() + } + + #[must_use] + pub fn new(limit: usize, concurrency: usize) -> Self { + Self { + inner: RwLock::new(FileDescriptorInner { + table: HashMap::with_capacity(100), + lru: Mutex::new(LruList::with_capacity(100)), + size: AtomicUsize::default(), + }), + concurrency, + limit, + } + } + + /// Number of segments + pub fn len(&self) -> usize { + self.inner.read().expect("lock is poisoned").table.len() + } + + pub fn size(&self) -> usize { + self.inner + .read() + .expect("lock is poisoned") + .size + .load(std::sync::atomic::Ordering::Acquire) + } + + pub fn access(&self, id: &GlobalSegmentId) -> crate::Result> { + let lock = self.inner.read().expect("lock is poisoned"); + + let Some(item) = lock.table.get(id) else { + return Ok(None); + }; + + let fd_vec = item.descriptors.read().expect("lock is poisoned"); + + if fd_vec.is_empty() { + drop(fd_vec); + drop(lock); + + let lock = self.inner.write().expect("lock is poisoned"); + let mut lru = lock.lru.lock().expect("lock is poisoned"); + lru.refresh(*id); + + let fd = { + let item = lock.table.get(id).expect("should exist"); + let mut fd_lock = item.descriptors.write().expect("lock is poisoned"); + + for _ in 0..(self.concurrency - 1) { + let fd = Arc::new(FileDescriptorWrapper { + file: Mutex::new(BufReader::new(File::open(&item.path)?)), + is_used: AtomicBool::default(), + }); + fd_lock.push(fd.clone()); + } + let fd = Arc::new(FileDescriptorWrapper { + file: Mutex::new(BufReader::new(File::open(&item.path)?)), + is_used: AtomicBool::default(), + }); + fd_lock.push(fd.clone()); + + fd + }; + + let mut size_now = lock + .size + .fetch_add(self.concurrency, std::sync::atomic::Ordering::AcqRel) + + self.concurrency; + + while size_now > self.limit { + if let Some(oldest) = lru.get_least_recently_used() { + if &oldest != id { + if let Some(item) = lock.table.get(&oldest) { + let mut oldest_lock = item.descriptors.write().expect("lock is poisoned"); + + lock.size + .fetch_sub(oldest_lock.len(), std::sync::atomic::Ordering::Release); + size_now -= oldest_lock.len(); + + oldest_lock.clear(); + } + } + } else { + break; + } + } + Ok(Some(FileGuard(fd))) + } else { + loop { + for shard in &*fd_vec { + if shard.is_used.compare_exchange( + false, + true, + std::sync::atomic::Ordering::SeqCst, + std::sync::atomic::Ordering::SeqCst, + ) == Ok(false) + { + return Ok(Some(FileGuard(shard.clone()))); + } + } + } + } + } + + fn inner_insert(mut lock: RwLockWriteGuard<'_, FileDescriptorInner>, path: PathBuf, id: GlobalSegmentId) { + lock.table.insert( + id, + FileHandle { + descriptors: RwLock::new(vec![]), + path, + }, + ); + lock.lru.lock().expect("lock poisoned").refresh(id); + } + + pub fn insert>(&self, path: P, id: GlobalSegmentId) { + let lock = self.inner.write().expect("lock poisoned"); + Self::inner_insert(lock, path.into(), id); + } + + pub fn remove(&self, id: GlobalSegmentId) { + let mut lock = self.inner.write().expect("lock is poisoned"); + + if let Some(item) = lock.table.remove(&id) { + lock.size.fetch_sub( + item.descriptors.read().expect("lock is poisoned").len(), + std::sync::atomic::Ordering::Release, + ); + } + + lock.lru.lock().expect("lock is poisoned").remove(&id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use test_log::test; + + #[test] + fn descriptor_table_limit() -> crate::Result<()> { + let folder = tempfile::tempdir()?; + let path = folder.path(); + + File::create(path.join("1"))?; + File::create(path.join("2"))?; + File::create(path.join("3"))?; + + let table = FileDescriptorTable::new(2, 1); + + assert_eq!(0, table.size()); + + table.insert(path.join("1"), (0, 1).into()); + assert_eq!(0, table.size()); + + { + let _ = table.access(&(0, 1).into()); + assert_eq!(1, table.size()); + } + + table.insert(path.join("2"), (0, 2).into()); + + { + assert_eq!(1, table.size()); + let _ = table.access(&(0, 1).into()); + } + + { + let _ = table.access(&(0, 2).into()); + assert_eq!(2, table.size()); + } + + table.insert(path.join("3"), (0, 3).into()); + assert_eq!(2, table.size()); + + { + let _ = table.access(&(0, 3).into()); + assert_eq!(2, table.size()); + } + + table.remove((0, 3).into()); + assert_eq!(1, table.size()); + + table.remove((0, 2).into()); + assert_eq!(0, table.size()); + + let _ = table.access(&(0, 1).into()); + assert_eq!(1, table.size()); + + Ok(()) + } +} diff --git a/v2/src/lib.rs b/v2/src/lib.rs index d0dd04b..3018bf1 100644 --- a/v2/src/lib.rs +++ b/v2/src/lib.rs @@ -11,9 +11,12 @@ mod file; mod either; mod sst; mod compression; +mod tree; +mod block_cache; +mod descriptor_table; pub use { error::{Error, Result}, serde::{DeserializeError, SerializeError}, - lsm_entry::{SeqNo, UserKey, UserValue, LSMEntry, ValueType}, + lsm_entry::{SeqNo, UserKey, UserValue, LSMEntry, ValueType} }; diff --git a/v2/src/sst/id.rs b/v2/src/sst/id.rs new file mode 100644 index 0000000..8b671c3 --- /dev/null +++ b/v2/src/sst/id.rs @@ -0,0 +1,25 @@ +use crate::tree::inner::TreeId; + +use super::meta::SegmentId; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[allow(clippy::module_name_repetitions)] +pub struct GlobalSegmentId((TreeId, SegmentId)); + +impl GlobalSegmentId { + #[must_use] + pub fn tree_id(&self) -> TreeId { + self.0 .0 + } + + #[must_use] + pub fn segment_id(&self) -> SegmentId { + self.0 .1 + } +} + +impl From<(TreeId, SegmentId)> for GlobalSegmentId { + fn from(value: (TreeId, SegmentId)) -> Self { + Self(value) + } +} diff --git a/v2/src/sst/meta/mod.rs b/v2/src/sst/meta/mod.rs new file mode 100644 index 0000000..2f8ae8d --- /dev/null +++ b/v2/src/sst/meta/mod.rs @@ -0,0 +1 @@ +pub type SegmentId = u64; \ No newline at end of file diff --git a/v2/src/sst/mod.rs b/v2/src/sst/mod.rs index eb8f22d..4f86a13 100644 --- a/v2/src/sst/mod.rs +++ b/v2/src/sst/mod.rs @@ -1,4 +1,6 @@ pub mod block; -mod value_offset_block; -mod block_index; +pub mod value_offset_block; +pub mod block_index; +mod meta; +pub mod id; diff --git a/v2/src/sst/value_offset_block.rs b/v2/src/sst/value_offset_block.rs index 7ef9435..791eb6b 100644 --- a/v2/src/sst/value_offset_block.rs +++ b/v2/src/sst/value_offset_block.rs @@ -23,3 +23,12 @@ pub enum CachePolicy{ #[allow(clippy::module_name_repetitions)] pub type ValueOffsetBlock = Block; +impl ValueOffsetBlock { + #[must_use] + pub fn size(&self) -> usize{ + std::mem::size_of::() + self.items.iter().map(LSMEntry::size).sum::() + } + + +} + diff --git a/v2/src/tree/inner.rs b/v2/src/tree/inner.rs new file mode 100644 index 0000000..f6db3be --- /dev/null +++ b/v2/src/tree/inner.rs @@ -0,0 +1,4 @@ +/// Unique tree ID +/// +/// Tree IDs are monotonically increasing integers. +pub type TreeId = u64; diff --git a/v2/src/tree/mod.rs b/v2/src/tree/mod.rs new file mode 100644 index 0000000..788982c --- /dev/null +++ b/v2/src/tree/mod.rs @@ -0,0 +1 @@ +pub mod inner; \ No newline at end of file