From af757261ee0283fc9fef9feeac39691eaf47a378 Mon Sep 17 00:00:00 2001 From: Adewumi Sunkanmi Date: Thu, 14 Mar 2024 21:52:28 +0100 Subject: [PATCH] Added biggest key index --- .vscode/settings.json | 1 + src/block/block.rs | 6 +- src/bloom_filter/bf.rs | 18 ++++- src/compaction/bucket_coordinator.rs | 1 + src/compaction/compactor.rs | 42 ++++++++--- src/consts/mod.rs | 2 +- src/err/mod.rs | 12 +++- src/key_offseter/key_offseter.rs | 33 +++++++++ src/key_offseter/mod.rs | 2 + src/lib.rs | 3 +- src/memtable/inmemory.rs | 33 ++++++--- src/sstable/sst.rs | 12 +++- src/storage_engine/storage.rs | 102 ++++++++++++++++++--------- 13 files changed, 208 insertions(+), 59 deletions(-) create mode 100644 src/key_offseter/key_offseter.rs create mode 100644 src/key_offseter/mod.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 12faa86..6a85b24 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,6 +4,7 @@ "./Cargo.toml", "./Cargo.toml", "./Cargo.toml", + "./Cargo.toml", "./Cargo.toml" ] } \ No newline at end of file diff --git a/src/block/block.rs b/src/block/block.rs index c88a7d0..dad9745 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -50,7 +50,7 @@ use tokio::{ fs::File, - io::{self, AsyncSeekExt, AsyncWriteExt}, + io::{self, AsyncWriteExt}, }; use err::StorageEngineError::*; @@ -59,7 +59,7 @@ use crate::{ consts::{SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8}, err::{self, StorageEngineError}, }; -const BLOCK_SIZE: usize = 8 * 1024; // 4KB +const BLOCK_SIZE: usize = 64 * 1024; // 4KB #[derive(Debug, Clone)] pub struct Block { @@ -129,7 +129,7 @@ impl Block { } pub async fn write_to_file(&self, file: &mut File) -> Result<(), StorageEngineError> { - for entry in &self.data { + for entry in &self.data { let entry_len = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8; let mut entry_vec = Vec::with_capacity(entry_len); diff --git a/src/bloom_filter/bf.rs b/src/bloom_filter/bf.rs index 33b11f7..69682bd 100644 --- a/src/bloom_filter/bf.rs +++ b/src/bloom_filter/bf.rs @@ -2,6 +2,7 @@ use bit_vec::BitVec; use std::{ collections::hash_map::DefaultHasher, hash::{Hash, Hasher}, + path::{self, PathBuf}, sync::{ atomic::{AtomicU32, Ordering}, Arc, Mutex, @@ -67,8 +68,23 @@ impl BloomFilter { self.sstable_path = Some(sstable_path); } + pub fn filter_by_sstable_paths<'a>( + bloom_filters: &'a Vec, + paths: Vec<&'a PathBuf>, + ) -> Vec<&'a BloomFilter> { + let mut filtered_bfs = Vec::new(); + paths.into_iter().for_each(|p| { + bloom_filters.iter().for_each(|b| { + if b.get_sstable_path().data_file_path.as_path() == p.as_path() { + filtered_bfs.push(b) + } + }) + }); + filtered_bfs + } + pub fn get_sstable_paths_that_contains_key( - bloom_filters: &Vec, + bloom_filters: Vec<&BloomFilter>, key: &T, ) -> Option> { let mut sstables: Vec = Vec::new(); diff --git a/src/compaction/bucket_coordinator.rs b/src/compaction/bucket_coordinator.rs index 7bf1fa6..193c83a 100644 --- a/src/compaction/bucket_coordinator.rs +++ b/src/compaction/bucket_coordinator.rs @@ -28,6 +28,7 @@ use StorageEngineError::*; pub trait IndexWithSizeInBytes { fn get_index(&self) -> Arc, (usize, u64, bool)>>; // usize for value offset, u64 to store entry creation date in milliseconds fn size(&self) -> usize; + fn find_biggest_key_from_table(&self) -> Result, StorageEngineError>; } impl Bucket { diff --git a/src/compaction/compactor.rs b/src/compaction/compactor.rs index fdbc115..d12700d 100644 --- a/src/compaction/compactor.rs +++ b/src/compaction/compactor.rs @@ -10,6 +10,7 @@ use crate::{ bloom_filter::BloomFilter, consts::TOMB_STONE_TTL, err::StorageEngineError, + key_offseter::TableBiggestKeys, memtable::Entry, sstable::{SSTable, SSTablePath}, }; @@ -53,6 +54,7 @@ impl Compactor { &mut self, buckets: &mut BucketMap, bloom_filters: &mut Vec, + biggest_key_index: &mut TableBiggestKeys, ) -> Result { let mut number_of_compactions = 0; // The compaction loop will keep running until there @@ -92,10 +94,16 @@ impl Compactor { sst_file_path.data_file_path, sst_file_path.index_file_path ); // Step 4: Map this bloom filter to its sstable file path + let sstable_data_file_path = sst_file_path.get_data_file_path(); m.bloom_filter.set_sstable_path(sst_file_path); // Step 5: Store the bloom filter in the bloom filters vector bloom_filters.push(m.bloom_filter); + let biggest_key = m.sstable.find_biggest_key()?; + if biggest_key.is_empty() { + return Err(BiggestKeyIndexError); + } + biggest_key_index.set(sstable_data_file_path, biggest_key); actual_number_of_sstables_written_to_disk += 1; } Err(err) => { @@ -103,15 +111,26 @@ impl Compactor { // Ensure that bloom filter is restored to the previous state by removing entries added so far in // the compaction process and also remove merged sstables written to disk so far to prevent unstable state - while let Some(bf) = bloom_filters.pop() { - match fs::remove_dir_all(bf.get_sstable_path().dir.clone()) - .await - { - Ok(()) => info!("Stale SSTable File successfully deleted."), - Err(e) => error!("Stale SSTable File not deleted. {}", e), + while actual_number_of_sstables_written_to_disk > 0 { + if let Some(bf) = bloom_filters.pop() { + match fs::remove_dir_all(bf.get_sstable_path().dir.clone()) + .await + { + Ok(()) => { + biggest_key_index.remove( + bf.get_sstable_path().data_file_path.clone(), + ); + info!("Stale SSTable File successfully deleted.") + } + Err(e) => { + error!("Stale SSTable File not deleted. {}", e) + } + } } + actual_number_of_sstables_written_to_disk -= 1; } + error!("merged SSTable was not written to disk {}", err); return Err(CompactionFailed(err.to_string())); } @@ -130,6 +149,7 @@ impl Compactor { buckets, &sstables_files_to_remove, bloom_filters, + biggest_key_index, ); match bloom_filter_updated_opt.await { Some(is_bloom_filter_updated) => { @@ -159,9 +179,15 @@ impl Compactor { buckets: &mut BucketMap, sstables_to_delete: &Vec<(Uuid, Vec)>, bloom_filters_with_both_old_and_new_sstables: &mut Vec, + biggest_key_index: &mut TableBiggestKeys, ) -> Option { + // Remove obsolete keys from biggest keys index + sstables_to_delete.iter().for_each(|(_, sstables)| { + sstables.iter().for_each(|s| { + biggest_key_index.remove(s.get_data_file_path()); + }) + }); let all_sstables_deleted = buckets.delete_sstables(sstables_to_delete).await; - // if all sstables were not deleted then don't remove the associated bloom filters // although this can lead to redundancy bloom filters are in-memory and its also less costly // since keys are represented in bits @@ -217,7 +243,7 @@ impl Compactor { ) .await .map_err(|err| CompactionFailed(err.to_string()))?; - + match sst_opt { Some(sst) => { merged_sstable = self diff --git a/src/consts/mod.rs b/src/consts/mod.rs index 5efbfca..1bd04a2 100644 --- a/src/consts/mod.rs +++ b/src/consts/mod.rs @@ -2,7 +2,7 @@ use crate::storage_engine::SizeUnit; pub const GC_THREAD_COUNT: u32 = 5; -pub const DEFAULT_MEMTABLE_CAPACITY: usize = SizeUnit::Kilobytes.to_bytes(30); +pub const DEFAULT_MEMTABLE_CAPACITY: usize = SizeUnit::Megabytes.to_bytes(1); pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 1e-200; diff --git a/src/err/mod.rs b/src/err/mod.rs index 71de370..f1347e9 100644 --- a/src/err/mod.rs +++ b/src/err/mod.rs @@ -149,11 +149,17 @@ pub enum StorageEngineError { #[error("Index file write error")] IndexFileWriteError(#[source] io::Error), - /// Error while reading from index file - #[error("Index file read error")] - IndexFileReadError(#[source] io::Error), + /// Error while reading from index file + #[error("Index file read error")] + IndexFileReadError(#[source] io::Error), /// Error while flushing write to disk for index file #[error("Index file flush error")] IndexFileFlushError(#[source] io::Error), + + #[error("Error finding biggest key in memtable (None was returned)")] + BiggestKeyIndexError, + + #[error("All bloom filters return false for all sstables")] + KeyNotFoundByAnyBloomFilterError, } diff --git a/src/key_offseter/key_offseter.rs b/src/key_offseter/key_offseter.rs new file mode 100644 index 0000000..268cc5e --- /dev/null +++ b/src/key_offseter/key_offseter.rs @@ -0,0 +1,33 @@ +use std::{cmp::Ordering, collections::HashMap, path::PathBuf}; + +#[derive(Clone, Debug)] +pub struct TableBiggestKeys { + pub sstables: HashMap>, +} +impl TableBiggestKeys { + pub fn new() -> Self { + Self { + sstables: HashMap::new(), + } + } + + pub fn set(&mut self, sst_path: PathBuf, biggest_key: Vec) -> bool { + self.sstables.insert(sst_path, biggest_key).is_some() + } + + pub fn remove(&mut self, sst_path: PathBuf) -> bool { + self.sstables.remove(&sst_path).is_some() + } + + // Returns SSTables whose last key is greater than the supplied key parameter + pub fn filter_sstables_by_biggest_key(&self, key: &Vec) -> Vec<&PathBuf> { + self.sstables + .iter() + .filter(|(_, key_prefix)| { + key_prefix.as_slice().cmp(key) == Ordering::Greater + || key_prefix.as_slice().cmp(key) == Ordering::Equal + }) + .map(|(path, _)| return path) + .collect() + } +} diff --git a/src/key_offseter/mod.rs b/src/key_offseter/mod.rs new file mode 100644 index 0000000..7a79c9c --- /dev/null +++ b/src/key_offseter/mod.rs @@ -0,0 +1,2 @@ +mod key_offseter; +pub use key_offseter::TableBiggestKeys; \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index af5ef7b..416aa44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,4 +10,5 @@ pub mod storage_engine; pub mod value_log; pub mod block; pub mod garbage_collector; -pub mod sparse_index; \ No newline at end of file +pub mod sparse_index; +pub mod key_offseter; \ No newline at end of file diff --git a/src/memtable/inmemory.rs b/src/memtable/inmemory.rs index 3c5b08a..9b3d91f 100644 --- a/src/memtable/inmemory.rs +++ b/src/memtable/inmemory.rs @@ -1,13 +1,15 @@ use crate::bloom_filter::BloomFilter; use crate::compaction::IndexWithSizeInBytes; -use crate::consts::{DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MEMTABLE_CAPACITY}; +use crate::consts::{ + DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MEMTABLE_CAPACITY, SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8, +}; use crate::err::StorageEngineError; //use crate::memtable::val_option::ValueOption; use crate::storage_engine::SizeUnit; use chrono::{DateTime, Utc}; use crossbeam_skiplist::SkipMap; - use std::cmp; +use StorageEngineError::*; use std::{hash::Hash, sync::Arc}; @@ -36,6 +38,9 @@ impl IndexWithSizeInBytes for InMemoryTable> { fn size(&self) -> usize { self.size } + fn find_biggest_key_from_table(&self) -> Result, StorageEngineError> { + self.find_biggest_key() + } } impl Entry, usize> { @@ -48,7 +53,6 @@ impl Entry, usize> { } } - pub fn has_expired(&self, ttl: u64) -> bool { // Current time let current_time = Utc::now(); @@ -104,7 +108,7 @@ impl InMemoryTable> { // key length + value offset length + date created length // it takes 4 bytes to store a 32 bit integer ande 1 byte for tombstone checker - let entry_length_byte = entry.key.len() + 4 + 8 + 1; + let entry_length_byte = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8; self.size += entry_length_byte; return Ok(()); } @@ -115,7 +119,7 @@ impl InMemoryTable> { ); // key length + value offset length + date created length // it takes 4 bytes to store a 32 bit integer since 8 bits makes 1 byte - let entry_length_byte = entry.key.len() + 4 + 8 + 1; + let entry_length_byte = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8; self.size += entry_length_byte; Ok(()) } @@ -131,7 +135,7 @@ impl InMemoryTable> { pub fn update(&mut self, entry: &Entry, usize>) -> Result<(), StorageEngineError> { if !self.bloom_filter.contains(&entry.key) { - return Err(StorageEngineError::KeyNotFoundInMemTable); + return Err(KeyNotFoundInMemTable); } // If the key already exist in the bloom filter then just insert into the entry alone self.index.insert( @@ -147,16 +151,29 @@ impl InMemoryTable> { pub fn delete(&mut self, entry: &Entry, usize>) -> Result<(), StorageEngineError> { if !self.bloom_filter.contains(&entry.key) { - return Err(StorageEngineError::KeyNotFoundInMemTable); + return Err(KeyNotFoundInMemTable); } let created_at = Utc::now(); // Insert thumb stone to indicate deletion self.index.insert( entry.key.to_vec(), - (entry.val_offset, created_at.timestamp_millis() as u64, entry.is_tombstone), + ( + entry.val_offset, + created_at.timestamp_millis() as u64, + entry.is_tombstone, + ), ); Ok(()) } + + // Find the biggest element in the skip list + pub fn find_biggest_key(&self) -> Result, StorageEngineError> { + let largest_entry = self.index.iter().next_back(); + match largest_entry { + Some(e) => return Ok(e.key().to_vec()), + None => Err(BiggestKeyIndexError), + } + } pub fn false_positive_rate(&mut self) -> f64 { self.false_positive_rate } diff --git a/src/sstable/sst.rs b/src/sstable/sst.rs index ec07dbe..f7748e7 100644 --- a/src/sstable/sst.rs +++ b/src/sstable/sst.rs @@ -44,6 +44,9 @@ impl IndexWithSizeInBytes for SSTable { fn size(&self) -> usize { self.size } + fn find_biggest_key_from_table(&self) -> Result, StorageEngineError>{ + self.find_biggest_key() + } } #[derive(Debug, Clone)] @@ -134,7 +137,14 @@ impl SSTable { created_at: created_at.timestamp_millis() as u64, } } - + // Find the biggest element in the skip list + pub fn find_biggest_key(&self) -> Result, StorageEngineError> { + let largest_entry = self.index.iter().next_back(); + match largest_entry { + Some(e) => return Ok(e.key().to_vec()), + None => Err(BiggestKeyIndexError), + } + } pub(crate) async fn write_to_file(&self) -> Result<(), StorageEngineError> { // Open the file in write mode with the append flag. let data_file_path = &self.data_file_path; diff --git a/src/storage_engine/storage.rs b/src/storage_engine/storage.rs index 590a091..646c986 100644 --- a/src/storage_engine/storage.rs +++ b/src/storage_engine/storage.rs @@ -7,6 +7,7 @@ use crate::{ META_DIRECTORY_NAME, TAIL_ENTRY_KEY, TOMB_STONE_MARKER, VALUE_LOG_DIRECTORY_NAME, }, err::StorageEngineError, + key_offseter::TableBiggestKeys, memtable::{Entry, InMemoryTable}, meta::Meta, sparse_index::SparseIndex, @@ -15,13 +16,12 @@ use crate::{ }; use chrono::Utc; +use crate::err::StorageEngineError::*; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use std::{collections::HashMap, fs, mem, path::PathBuf}; use std::{hash::Hash, result}; -use crate::err::StorageEngineError::*; - #[derive(Clone, Debug)] pub struct StorageEngine { pub dir: DirPath, @@ -29,22 +29,12 @@ pub struct StorageEngine { pub bloom_filters: Vec, pub val_log: ValueLog, pub buckets: BucketMap, - pub key_index: LevelsBiggestKeys, + pub biggest_key_index: TableBiggestKeys, pub compactor: Compactor, pub meta: Meta, pub config: Config, } -#[derive(Clone, Debug)] -pub struct LevelsBiggestKeys { - levels: Vec>, -} -impl LevelsBiggestKeys { - pub fn new(levels: Vec>) -> Self { - Self { levels } - } -} - #[derive(Clone, Debug)] pub struct DirPath { root: PathBuf, @@ -161,9 +151,20 @@ impl StorageEngine> { return Err(KeyFoundAsTombstoneInMemtableError); } } else { - // Step 2: If key does not exist in MemTable then we can load sstables that probaby contains this key fr8om bloom filter + let filtered_paths = self.biggest_key_index.filter_sstables_by_biggest_key(&key); + if filtered_paths.is_empty() { + return Err(KeyNotFoundInAnySSTableError); + } + println!("BF LENGTH BEFIRE {}", self.bloom_filters.len()); + let filtered_bloom_filters = + BloomFilter::filter_by_sstable_paths(&self.bloom_filters, filtered_paths); + if filtered_bloom_filters.is_empty() { + return Err(KeyNotFoundByAnyBloomFilterError); + } + println!("BF LENGTH AFTER {}", filtered_bloom_filters.len()); + // Step 2: If key does not exist in MemTable then we can load sstables that probaby contains this key from bloom filter let sstable_paths = - BloomFilter::get_sstable_paths_that_contains_key(&self.bloom_filters, &key); + BloomFilter::get_sstable_paths_that_contains_key(filtered_bloom_filters, &key); match sstable_paths { Some(paths) => { // Step 3: Get the most recent value offset from sstables @@ -324,6 +325,7 @@ impl StorageEngine> { //write the memtable to the disk as SS Tables // insert to bloom filter let mut bf = self.memtable.get_bloom_filter(); + let data_file_path = sstable_path.get_data_file_path().clone(); bf.set_sstable_path(sstable_path); self.bloom_filters.push(bf); @@ -333,6 +335,8 @@ impl StorageEngine> { .get_hotness() .cmp(&a.get_sstable_path().get_hotness()) }); + let biggest_key = self.memtable.find_biggest_key()?; + self.biggest_key_index.set(data_file_path, biggest_key); Ok(()) } @@ -354,11 +358,10 @@ impl StorageEngine> { let vlog_path = &dir.clone().val_log; let buckets_path = dir.buckets.clone(); let vlog_exit = vlog_path.exists(); - let vlog = ValueLog::new(vlog_path); let vlog_empty = !vlog_exit || fs::metadata(vlog_path).map_err(GetFileMetaDataError)?.len() == 0; - let key_index = LevelsBiggestKeys::new(Vec::new()); + let biggest_key_index = TableBiggestKeys::new(); let mut vlog = ValueLog::new(vlog_path).await?; let meta = Meta::new(&dir.meta); if vlog_empty { @@ -394,7 +397,7 @@ impl StorageEngine> { bloom_filters: Vec::new(), buckets: BucketMap::new(buckets_path.clone()), dir, - key_index, + biggest_key_index, compactor: Compactor::new(config.enable_ttl, config.entry_ttl_millis), config: config.clone(), meta, @@ -556,7 +559,7 @@ impl StorageEngine> { dir, buckets: buckets_map, bloom_filters, - key_index, + biggest_key_index, meta, compactor: Compactor::new(config.enable_ttl, config.entry_ttl_millis), config: config.clone(), @@ -607,7 +610,11 @@ impl StorageEngine> { async fn run_compaction(&mut self) -> Result { self.compactor - .run_compaction(&mut self.buckets, &mut self.bloom_filters) + .run_compaction( + &mut self.buckets, + &mut self.bloom_filters, + &mut self.biggest_key_index, + ) .await } @@ -661,17 +668,15 @@ impl SizeUnit { #[cfg(test)] mod tests { - use std::rc::Rc; - use std::sync::Arc; - use crate::err; + use std::sync::Arc; use super::*; use log::info; - use rand::random; + use tokio::fs; use tokio::sync::RwLock; - + // Generate test to find keys after compaction #[tokio::test] async fn storage_engine_create_asynchronous() { @@ -679,7 +684,7 @@ mod tests { let s_engine = StorageEngine::new(path.clone()).await.unwrap(); // Specify the number of random strings to generate - let num_strings = 6000; + let num_strings = 40000; // Specify the length of each random string let string_length = 10; @@ -743,7 +748,7 @@ mod tests { } } - // random_strings.sort(); + random_strings.sort(); println!("About to start reading"); let tasks = random_strings.iter().map(|k| { let s_engine = Arc::clone(&sg); @@ -761,7 +766,8 @@ mod tests { Ok((value, _)) => { assert_eq!(value, b"boy"); } - Err(_) => { + Err(err) => { + println!("ERROR {:?}", err); assert!(false, "No err should be found"); } } @@ -769,7 +775,7 @@ mod tests { } } - let _ = fs::remove_dir_all(path.clone()).await; + // let _ = fs::remove_dir_all(path.clone()).await; // sort to make fetch random } @@ -837,7 +843,7 @@ mod tests { let s_engine = StorageEngine::new(path.clone()).await.unwrap(); // Specify the number of random strings to generate - let num_strings = 6000; + let num_strings =100000; // Specify the length of each random string let string_length = 10; @@ -884,8 +890,11 @@ mod tests { random_strings.sort(); let key = &random_strings[0]; - let get_res = sg.read().await.get(key).await; - match get_res { + let get_res1 = sg.read().await.get(key).await; + let get_res2 = sg.read().await.get(key).await; + let get_res3 = sg.read().await.get(key).await; + let get_res4 = sg.read().await.get(key).await; + match get_res1 { Ok(v) => { assert_eq!(v.0, b"boyode"); } @@ -894,6 +903,33 @@ mod tests { } } + match get_res2 { + Ok(v) => { + assert_eq!(v.0, b"boyode"); + } + Err(_) => { + assert!(false, "No error should be found"); + } + } + + match get_res3 { + Ok(v) => { + assert_eq!(v.0, b"boyode"); + } + Err(_) => { + assert!(false, "No error should be found"); + } + } + match get_res4 { + Ok(v) => { + assert_eq!(v.0, b"boyode"); + } + Err(_) => { + assert!(false, "No error should be found"); + } + } + + let del_res = sg.write().await.delete(key).await; match del_res { Ok(v) => { @@ -923,7 +959,7 @@ mod tests { // We expect tombstone to be flushed to an sstable at this point let get_res2 = sg.read().await.get(key).await; match get_res2 { - Ok(v) => { + Ok(_) => { assert!(false, "Should not be found after compaction") } Err(err) => {