diff --git a/examples/get.rs b/examples/get.rs index b061a3c..3c83581 100644 --- a/examples/get.rs +++ b/examples/get.rs @@ -30,10 +30,19 @@ async fn main() { let entry7 = store.get("***not_found_key**").await.unwrap(); assert_eq!(std::str::from_utf8(&entry1.unwrap().val).unwrap(), "tim cook"); - assert_eq!(std::str::from_utf8(&entry2.unwrap().val).unwrap(), "sundar pichai"); + assert_eq!( + std::str::from_utf8(&entry2.unwrap().val).unwrap(), + "sundar pichai" + ); assert_eq!(std::str::from_utf8(&entry3.unwrap().val).unwrap(), "jensen huang"); - assert_eq!(std::str::from_utf8(&entry4.unwrap().val).unwrap(), "satya nadella"); - assert_eq!(std::str::from_utf8(&entry5.unwrap().val).unwrap(), "mark zuckerberg"); + assert_eq!( + std::str::from_utf8(&entry4.unwrap().val).unwrap(), + "satya nadella" + ); + assert_eq!( + std::str::from_utf8(&entry5.unwrap().val).unwrap(), + "mark zuckerberg" + ); assert_eq!(std::str::from_utf8(&entry6.unwrap().val).unwrap(), "sam altman"); assert!(entry7.is_none()) } diff --git a/learnings.txt b/learnings.txt deleted file mode 100644 index 865f700..0000000 --- a/learnings.txt +++ /dev/null @@ -1,8 +0,0 @@ -- Researched Tokio Uring, 12 Aug 2024 -- Research on Tokio Uring continues 13 Aug 2024 -- Researched maths on relations 14 Aug 2024 -- Researched latest trends in Rust 15 Aug 2024 -- Experimenting with nom parser rust library 16 Aug 2024 -- Solving maths 19 Aug 2024 -- Researched Bigdata 20 Aug 2024 -- Researched Previous Projects on LSM \ No newline at end of file diff --git a/src/bucket/bucket_manager.rs b/src/bucket/bucket_manager.rs index 2fee107..28ad24a 100644 --- a/src/bucket/bucket_manager.rs +++ b/src/bucket/bucket_manager.rs @@ -1,4 +1,6 @@ -use crate::consts::{BUCKET_DIRECTORY_PREFIX, BUCKET_HIGH, BUCKET_LOW, MAX_TRESHOLD, MIN_SSTABLE_SIZE, MIN_TRESHOLD}; +use crate::consts::{ + BUCKET_DIRECTORY_PREFIX, BUCKET_HIGH, BUCKET_LOW, MAX_TRESHOLD, MIN_SSTABLE_SIZE, MIN_TRESHOLD, +}; use crate::err::Error; use crate::filter::BloomFilter; use crate::fs::{FileAsync, FileNode}; @@ -115,7 +117,10 @@ impl Bucket { .iter() .map(|s| tokio::spawn(fs::metadata(s.data_file.path.clone()))); for meta_task in fetch_files_meta { - let meta_data = meta_task.await.map_err(|err| GetFileMetaData(err.into()))?.unwrap(); + let meta_data = meta_task + .await + .map_err(|err| GetFileMetaData(err.into()))? + .unwrap(); size += meta_data.len() as usize; } Ok(size / ssts.len() as u64 as usize) diff --git a/src/cfg/config.rs b/src/cfg/config.rs index 352088c..87ad0cb 100644 --- a/src/cfg/config.rs +++ b/src/cfg/config.rs @@ -2,9 +2,9 @@ use crate::{ compactors, consts::{ DEFAULT_ALLOW_PREFETCH, DEFAULT_COMPACTION_FLUSH_LISTNER_INTERVAL, DEFAULT_COMPACTION_INTERVAL, - DEFAULT_ENABLE_TTL, DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MAX_WRITE_BUFFER_NUMBER, DEFAULT_ONLINE_GC_INTERVAL, - DEFAULT_PREFETCH_SIZE, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL, DEFAULT_TOMBSTONE_TTL, ENTRY_TTL, GC_CHUNK_SIZE, - WRITE_BUFFER_SIZE, + DEFAULT_ENABLE_TTL, DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MAX_WRITE_BUFFER_NUMBER, + DEFAULT_ONLINE_GC_INTERVAL, DEFAULT_PREFETCH_SIZE, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL, + DEFAULT_TOMBSTONE_TTL, ENTRY_TTL, GC_CHUNK_SIZE, WRITE_BUFFER_SIZE, }, }; use crate::{ @@ -74,7 +74,6 @@ fn get_open_file_limit() -> usize { return 150; } - impl Default for Config { fn default() -> Self { Config { @@ -126,7 +125,10 @@ impl DataStore<'static, Key> { /// Sets the write buffer size in kilobytes. /// The size must be at least 50 kilobytes. pub fn with_write_buffer_size(mut self, size: usize) -> Self { - assert!(size >= 50, "write_buffer_size should not be less than 50 Kilobytes"); + assert!( + size >= 50, + "write_buffer_size should not be less than 50 Kilobytes" + ); self.config.write_buffer_size = SizeUnit::Kilobytes.as_bytes(size); self } @@ -358,7 +360,9 @@ mod tests { #[tokio::test] async fn test_with_tombstone_ttl() { let ds = create_datastore(); - let ds = ds.await.with_tombstone_ttl(Duration::from_secs(15 * 24 * 60 * 60)); // 15 days + let ds = ds + .await + .with_tombstone_ttl(Duration::from_secs(15 * 24 * 60 * 60)); // 15 days assert_eq!(ds.config.tombstone_ttl, Duration::from_secs(15 * 24 * 60 * 60)); } @@ -375,11 +379,16 @@ mod tests { async fn test_with_compactor_flush_listener_interval() { let ds = create_datastore().await; let ds = ds.with_compactor_flush_listener_interval(Duration::from_secs(3 * 60)); // 3 minutes - assert_eq!(ds.config.compactor_flush_listener_interval, Duration::from_secs(3 * 60)); + assert_eq!( + ds.config.compactor_flush_listener_interval, + Duration::from_secs(3 * 60) + ); } #[tokio::test] - #[should_panic(expected = "background_compaction_interval should not be less than 5 minutes to prevent overloads")] + #[should_panic( + expected = "background_compaction_interval should not be less than 5 minutes to prevent overloads" + )] async fn test_with_background_compaction_interval_invalid() { let ds = create_datastore().await; ds.with_background_compaction_interval(Duration::from_secs(4 * 60)); // 4 minutes @@ -389,7 +398,10 @@ mod tests { async fn test_with_background_compaction_interval() { let ds = create_datastore().await; let ds = ds.with_background_compaction_interval(Duration::from_secs(6 * 60)); // 6 minutes - assert_eq!(ds.config.background_compaction_interval, Duration::from_secs(6 * 60)); + assert_eq!( + ds.config.background_compaction_interval, + Duration::from_secs(6 * 60) + ); } #[tokio::test] diff --git a/src/compactors/compact.rs b/src/compactors/compact.rs index 85cdf71..d9ae5a0 100644 --- a/src/compactors/compact.rs +++ b/src/compactors/compact.rs @@ -144,7 +144,10 @@ pub struct MergedSSTable { impl Clone for MergedSSTable { fn clone(&self) -> Self { Self { - sstable: Box::new(super::TableInsertor::from(self.sstable.get_entries(), &self.filter)), + sstable: Box::new(super::TableInsertor::from( + self.sstable.get_entries(), + &self.filter, + )), hotness: self.hotness, filter: self.filter.clone(), } @@ -249,7 +252,8 @@ impl Compactor { *state = CompState::Active; drop(state); if let Err(err) = - Compactor::handle_compaction(Arc::clone(&bucket_map), Arc::clone(&key_range), &cfg).await + Compactor::handle_compaction(Arc::clone(&bucket_map), Arc::clone(&key_range), &cfg) + .await { log::info!("{}", Error::CompactionFailed(Box::new(err))); continue; @@ -292,7 +296,8 @@ impl Compactor { ) -> Result<(), Error> { match cfg.strategy { Strategy::STCS => { - let mut runner = super::sized::SizedTierRunner::new(Arc::clone(&buckets), Arc::clone(&key_range), cfg); + let mut runner = + super::sized::SizedTierRunner::new(Arc::clone(&buckets), Arc::clone(&key_range), cfg); runner.run_compaction().await } // LCS, UCS and TWS will be added later } @@ -336,7 +341,10 @@ mod tests { assert_eq!(compactor.config.use_ttl, use_ttl); assert_eq!(compactor.config.entry_ttl, ttl.entry_ttl); assert_eq!(compactor.config.tombstone_ttl, ttl.tombstone_ttl); - assert_eq!(compactor.config.background_interval, intervals.background_interval); + assert_eq!( + compactor.config.background_interval, + intervals.background_interval + ); assert_eq!( compactor.config.flush_listener_interval, intervals.flush_listener_interval diff --git a/src/compactors/mod.rs b/src/compactors/mod.rs index c7cb515..16a46d4 100644 --- a/src/compactors/mod.rs +++ b/src/compactors/mod.rs @@ -5,10 +5,10 @@ mod sized; pub use compact::CompState; pub use compact::CompactionReason; pub use compact::Compactor; +pub use compact::Config; +pub use compact::IntervalParams; pub use compact::MergedSSTable; pub use compact::Strategy; -pub use compact::IntervalParams; pub use compact::TtlParams; -pub use sized::SizedTierRunner; -pub use compact::Config; pub use insertor::TableInsertor; +pub use sized::SizedTierRunner; diff --git a/src/compactors/sized.rs b/src/compactors/sized.rs index 0d92952..f83fbf1 100644 --- a/src/compactors/sized.rs +++ b/src/compactors/sized.rs @@ -38,7 +38,11 @@ pub struct SizedTierRunner<'a> { impl<'a> SizedTierRunner<'a> { /// creates new instance of `SizedTierRunner` - pub fn new(bucket_map: BucketMapHandle, key_range: KeyRangeHandle, config: &'a Config) -> SizedTierRunner<'a> { + pub fn new( + bucket_map: BucketMapHandle, + key_range: KeyRangeHandle, + config: &'a Config, + ) -> SizedTierRunner<'a> { Self { tombstones: HashMap::new(), bucket_map, @@ -111,7 +115,9 @@ impl<'a> SizedTierRunner<'a> { .await; match clean_up_successful { Ok(None) => { - return Err(Error::CompactionPartiallyFailed(Box::new(CompactionCleanupPartial))); + return Err(Error::CompactionPartiallyFailed(Box::new( + CompactionCleanupPartial, + ))); } Err(err) => { return Err(Error::CompactionCleanup(Box::new(err))); @@ -281,7 +287,11 @@ impl<'a> SizedTierRunner<'a> { /// and prevented from being inserted /// /// Returns true if entry should be inserted or false otherwise - pub(crate) fn tombstone_check(&mut self, entry: &Entry, merged_entries: &mut Vec>) { + pub(crate) fn tombstone_check( + &mut self, + entry: &Entry, + merged_entries: &mut Vec>, + ) { let mut should_insert = false; if self.tombstones.contains_key(&entry.key) { let tomb_insert_time = *self.tombstones.get(&entry.key).unwrap(); diff --git a/src/consts/mod.rs b/src/consts/mod.rs index 8e20e6b..6dba6b2 100644 --- a/src/consts/mod.rs +++ b/src/consts/mod.rs @@ -86,7 +86,7 @@ pub const DEFAULT_PREFETCH_SIZE: usize = 10; pub const EOF: &str = "EOF"; pub const HEAD_ENTRY_KEY: &[u8; 4] = b"head"; - + pub const HEAD_KEY_SIZE: usize = 4; // "head" pub const TAIL_ENTRY_KEY: &[u8; 4] = b"tail"; diff --git a/src/db/recovery.rs b/src/db/recovery.rs index 052e6d7..7bbb9d2 100644 --- a/src/db/recovery.rs +++ b/src/db/recovery.rs @@ -7,8 +7,8 @@ use crate::bucket::{Bucket, BucketID, BucketMap}; use crate::cfg::Config; use crate::compactors::{self, Compactor, IntervalParams, TtlParams}; use crate::consts::{ - DEFAULT_DB_NAME, DEFAULT_FLUSH_SIGNAL_CHANNEL_SIZE, HEAD_ENTRY_KEY, HEAD_ENTRY_VALUE, SIZE_OF_U32, SIZE_OF_U64, - SIZE_OF_U8, TAIL_ENTRY_KEY, TAIL_ENTRY_VALUE, + DEFAULT_DB_NAME, DEFAULT_FLUSH_SIGNAL_CHANNEL_SIZE, HEAD_ENTRY_KEY, HEAD_ENTRY_VALUE, SIZE_OF_U32, + SIZE_OF_U64, SIZE_OF_U8, TAIL_ENTRY_KEY, TAIL_ENTRY_VALUE, }; use crate::err::Error; use crate::err::Error::*; @@ -31,6 +31,17 @@ use std::sync::Arc; use tokio::fs::read_dir; use tokio::sync::RwLock; +/// Parameters to create an empty ['DataStore'] or recover exisiting one from ['ValueLog'] +pub struct CreateOrRecoverStoreParams<'a, P> { + pub dir: DirPath, + pub buckets_path: P, + pub vlog: ValueLog, + pub key_range: KeyRange, + pub config: &'a Config, + pub size_unit: SizeUnit, + pub meta: Meta, +} + impl DataStore<'static, Key> { /// Recovers [`DataStore`] state after crash /// @@ -38,14 +49,18 @@ impl DataStore<'static, Key> { /// /// Returns error incase there is an IO error pub async fn recover + Send + Sync + Clone>( - dir: DirPath, - buckets_path: P, - mut vlog: ValueLog, - key_range: KeyRange, - config: &Config, - size_unit: SizeUnit, - mut meta: Meta, + params: CreateOrRecoverStoreParams<'_, P>, ) -> Result, Error> { + let (buckets_path, dir, mut vlog, key_range, config, size_unit, mut meta) = ( + params.buckets_path, + params.dir, + params.vlog, + params.key_range, + params.config, + params.size_unit, + params.meta, + ); + let mut recovered_buckets: IndexMap = IndexMap::new(); // Get bucket diretories streams let mut buckets_stream = open_dir_stream!(buckets_path.as_ref().to_path_buf()); @@ -105,12 +120,18 @@ impl DataStore<'static, Key> { if let Some(b) = recovered_buckets.get(&bucket_uuid) { let temp_sstables = b.sstables.clone(); temp_sstables.write().await.push(table.clone()); - let updated_bucket = - Bucket::from(bucket_dir.path(), bucket_uuid, temp_sstables.read().await.clone(), 0).await?; + let updated_bucket = Bucket::from( + bucket_dir.path(), + bucket_uuid, + temp_sstables.read().await.clone(), + 0, + ) + .await?; recovered_buckets.insert(bucket_uuid, updated_bucket); } else { // Create new bucket - let updated_bucket = Bucket::from(bucket_dir.path(), bucket_uuid, vec![table.clone()], 0).await?; + let updated_bucket = + Bucket::from(bucket_dir.path(), bucket_uuid, vec![table.clone()], 0).await?; recovered_buckets.insert(bucket_uuid, updated_bucket); } @@ -230,7 +251,8 @@ impl DataStore<'static, Key> { head_offset: usize, ) -> Result<(MemTable, ImmutableMemTablesLockFree), Error> { let read_only_memtables: ImmutableMemTablesLockFree = SkipMap::new(); - let mut active_memtable = MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate); + let mut active_memtable = + MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate); let mut vlog = ValueLog::new(vlog_path.as_ref()).await?; let mut most_recent_offset = head_offset; let entries = vlog.recover(head_offset).await?; @@ -244,7 +266,10 @@ impl DataStore<'static, Key> { if active_memtable.is_full(e.key.len()) { // Make memtable read only active_memtable.read_only = true; - read_only_memtables.insert(MemTable::generate_table_id(), Arc::new(active_memtable.to_owned())); + read_only_memtables.insert( + MemTable::generate_table_id(), + Arc::new(active_memtable.to_owned()), + ); active_memtable = MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate); } @@ -264,24 +289,41 @@ impl DataStore<'static, Key> { /// Creates new [`DataStore`] /// Used in case there is no recovery needed pub async fn handle_empty_vlog + Send + Sync>( - dir: DirPath, - buckets_path: P, - mut vlog: ValueLog, - key_range: KeyRange, - config: &Config, - size_unit: SizeUnit, - meta: Meta, + params: CreateOrRecoverStoreParams<'_, P>, ) -> Result, Error> { - let mut active_memtable = - MemTable::with_specified_capacity_and_rate(size_unit, config.write_buffer_size, config.false_positive_rate); + let (buckets_path, dir, mut vlog, key_range, config, size_unit, meta) = ( + params.buckets_path, + params.dir, + params.vlog, + params.key_range, + params.config, + params.size_unit, + params.meta, + ); + + let mut active_memtable = MemTable::with_specified_capacity_and_rate( + size_unit, + config.write_buffer_size, + config.false_positive_rate, + ); // if ValueLog is empty then we want to insert both tail and head let created_at = Utc::now(); let tail_offset = vlog - .append(&TAIL_ENTRY_KEY.to_vec(), &TAIL_ENTRY_VALUE.to_vec(), created_at, false) + .append( + &TAIL_ENTRY_KEY.to_vec(), + &TAIL_ENTRY_VALUE.to_vec(), + created_at, + false, + ) .await?; let tail_entry = Entry::new(TAIL_ENTRY_KEY.to_vec(), tail_offset, created_at, false); let head_offset = vlog - .append(&HEAD_ENTRY_KEY.to_vec(), &HEAD_ENTRY_VALUE.to_vec(), created_at, false) + .append( + &HEAD_ENTRY_KEY.to_vec(), + &HEAD_ENTRY_VALUE.to_vec(), + created_at, + false, + ) .await?; let head_entry = Entry::new(HEAD_ENTRY_KEY.to_vec(), head_offset, created_at, false); vlog.set_head(head_offset); diff --git a/src/db/store.rs b/src/db/store.rs index 02b4cd0..48e75ed 100644 --- a/src/db/store.rs +++ b/src/db/store.rs @@ -1,8 +1,8 @@ use crate::cfg::Config; use crate::compactors::{CompactionReason, Compactor}; use crate::consts::{ - BUCKETS_DIRECTORY_NAME, HEAD_ENTRY_KEY, KB, MAX_KEY_SIZE, MAX_VALUE_SIZE, META_DIRECTORY_NAME, TOMB_STONE_MARKER, - VALUE_LOG_DIRECTORY_NAME, VLOG_START_OFFSET,HEAD_KEY_SIZE + BUCKETS_DIRECTORY_NAME, HEAD_ENTRY_KEY, HEAD_KEY_SIZE, KB, MAX_KEY_SIZE, MAX_VALUE_SIZE, + META_DIRECTORY_NAME, TOMB_STONE_MARKER, VALUE_LOG_DIRECTORY_NAME, VLOG_START_OFFSET, }; use crate::db::keyspace::is_valid_keyspace_name; use crate::flush::Flusher; @@ -25,6 +25,8 @@ use std::sync::Arc; use tokio::fs::{self}; use tokio::sync::{Mutex, RwLock}; +use super::recovery::CreateOrRecoverStoreParams; + /// DataStore struct is the main struct for the library crate /// i.e user-facing struct pub struct DataStore<'a, Key> @@ -138,7 +140,8 @@ impl DataStore<'static, Key> { dir: P, ) -> Result, crate::err::Error> { assert!(is_valid_keyspace_name(keyspace)); - let mut store = Self::create_or_recover(DirPath::build(dir), SizeUnit::Bytes, Config::default()).await?; + let mut store = + Self::create_or_recover(DirPath::build(dir), SizeUnit::Bytes, Config::default()).await?; store.keyspace = keyspace; store.start_background_tasks(); Ok(store) @@ -156,7 +159,8 @@ impl DataStore<'static, Key> { ) -> Result, crate::err::Error> { assert!(is_valid_keyspace_name(keyspace)); log::info!("Opening keyspace at {:?}", dir.as_ref()); - let mut store = Self::create_or_recover(DirPath::build(dir), SizeUnit::Bytes, Config::default()).await?; + let mut store = + Self::create_or_recover(DirPath::build(dir), SizeUnit::Bytes, Config::default()).await?; store.keyspace = keyspace; Ok(store) } @@ -209,7 +213,11 @@ impl DataStore<'static, Key> { /// } /// /// ``` - pub async fn put, V: AsRef<[u8]>>(&mut self, key: K, val: V) -> Result { + pub async fn put, V: AsRef<[u8]>>( + &mut self, + key: K, + val: V, + ) -> Result { self.validate_size(key.as_ref(), Some(val.as_ref()))?; if !self.gc_updated_entries.read().await.is_empty() { @@ -260,8 +268,10 @@ impl DataStore<'static, Key> { if self.read_only_memtables.is_empty() { self.flush_stream.clear(); } - self.read_only_memtables - .insert(MemTable::generate_table_id(), Arc::new(self.active_memtable.to_owned())); + self.read_only_memtables.insert( + MemTable::generate_table_id(), + Arc::new(self.active_memtable.to_owned()), + ); if self.read_only_memtables.len() >= self.config.max_buffer_write_number { self.flush_read_only_memtables(); @@ -374,7 +384,8 @@ impl DataStore<'static, Key> { let capacity = self.active_memtable.capacity(); let size_unit = self.active_memtable.size_unit(); let false_positive_rate = self.active_memtable.false_positive_rate(); - self.active_memtable = MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate); + self.active_memtable = + MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate); self.gc_table = Arc::new(RwLock::new(MemTable::with_specified_capacity_and_rate( size_unit, capacity, @@ -642,8 +653,10 @@ impl DataStore<'static, Key> { self.active_memtable.mark_readonly(); - self.read_only_memtables - .insert(MemTable::generate_table_id(), Arc::new(self.active_memtable.to_owned())); + self.read_only_memtables.insert( + MemTable::generate_table_id(), + Arc::new(self.active_memtable.to_owned()), + ); let immutable_tables = self.read_only_memtables.to_owned(); let mut flusher = Flusher::new( Arc::clone(&self.read_only_memtables), @@ -684,10 +697,21 @@ impl DataStore<'static, Key> { let key_range = KeyRange::default(); let vlog = ValueLog::new(vlog_path).await?; let meta = Meta::new(&dir.meta).await?; + + let params = CreateOrRecoverStoreParams { + dir, + buckets_path, + vlog, + key_range, + config: &config, + size_unit, + meta, + }; + if vlog_empty { - return DataStore::handle_empty_vlog(dir, buckets_path, vlog, key_range, &config, size_unit, meta).await; + return DataStore::handle_empty_vlog(params).await; } - DataStore::recover(dir, buckets_path, vlog, key_range, &config, size_unit, meta).await + DataStore::recover(params).await } /// Trigger compaction mannually diff --git a/src/err/mod.rs b/src/err/mod.rs index 84b0852..a348849 100644 --- a/src/err/mod.rs +++ b/src/err/mod.rs @@ -68,7 +68,10 @@ pub enum Error { MemTableRecovery(#[source] Box), #[error("Invalid string provided to be parsed to UUID `{input_string}`: {error}")] - InvaidUUIDParseString { input_string: String, error: uuid::Error }, + InvaidUUIDParseString { + input_string: String, + error: uuid::Error, + }, #[error("Invalid sstable directory error: `{input_string}`")] InvalidSSTableDirectory { input_string: String }, @@ -169,7 +172,9 @@ pub enum Error { #[error("Compaction cleanup failed but sstable merge was successful : {0} ")] CompactionCleanup(Box), - #[error("Cannot remove obsolete sstables from disk because not every merged sstable was written to disk")] + #[error( + "Cannot remove obsolete sstables from disk because not every merged sstable was written to disk" + )] CannotRemoveObsoleteSST, #[error("Error, merged sstables has empty entries")] diff --git a/src/filter/bf.rs b/src/filter/bf.rs index f8970d5..b246dc3 100644 --- a/src/filter/bf.rs +++ b/src/filter/bf.rs @@ -136,7 +136,8 @@ impl BloomFilter { if self.file_path.is_none() { return Err(FilterFilePathNotProvided); }; - let (false_pos, no_hash_func, no_elements) = FilterFileNode::recover(self.file_path.as_ref().unwrap()).await?; + let (false_pos, no_hash_func, no_elements) = + FilterFileNode::recover(self.file_path.as_ref().unwrap()).await?; self.false_positive_rate = false_pos; self.no_of_hash_func = no_hash_func as usize; self.no_of_elements = AtomicU32::new(no_elements); @@ -162,7 +163,8 @@ impl BloomFilter { serialized_data.extend_from_slice(&(self.no_of_hash_func as u32).to_le_bytes()); - serialized_data.extend_from_slice(&AtomicU32::load(&self.no_of_elements, Ordering::Relaxed).to_le_bytes()); + serialized_data + .extend_from_slice(&AtomicU32::load(&self.no_of_elements, Ordering::Relaxed).to_le_bytes()); serialized_data.extend_from_slice(&util::float_to_le_bytes(self.false_positive_rate)); diff --git a/src/flush/flusher.rs b/src/flush/flusher.rs index a4588c8..3f85125 100644 --- a/src/flush/flusher.rs +++ b/src/flush/flusher.rs @@ -38,7 +38,9 @@ impl Flusher { let flush_data = self; let table_reader = table; if table_reader.entries.is_empty() { - return Err(Error::FailedToInsertToBucket("Cannot flush an empty table".to_string())); + return Err(Error::FailedToInsertToBucket( + "Cannot flush an empty table".to_string(), + )); } let mut bucket_lock = flush_data.bucket_map.write().await; let sst = bucket_lock diff --git a/src/fs/mod.rs b/src/fs/mod.rs index d1ca738..42cba99 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -7,7 +7,8 @@ use crate::{ load_buffer, memtable::{Entry, SkipMapValue}, types::{ - CreatedAt, IsTombStone, Key, LastModified, NoBytesRead, SkipMapEntries, VLogHead, VLogTail, ValOffset, Value, + CreatedAt, IsTombStone, Key, LastModified, NoBytesRead, SkipMapEntries, VLogHead, VLogTail, + ValOffset, Value, }, util, vlog::ValueLogEntry, @@ -88,7 +89,10 @@ pub trait DataFs: Send + Sync + Debug + Clone { searched_key: &[u8], ) -> Result, Error>; - async fn load_entries_within_range(&self, range_offset: RangeOffset) -> Result>, Error>; + async fn load_entries_within_range( + &self, + range_offset: RangeOffset, + ) -> Result>, Error>; } #[async_trait] @@ -110,8 +114,9 @@ pub trait VLogFs: Send + Sync + Debug + Clone { pub trait FilterFs: Send + Sync + Debug + Clone { async fn new + Send + Sync>(path: P, file_type: FileType) -> Result; - async fn recover + Send + Sync>(path: P) - -> Result<(FalsePositive, NoHashFunc, NoOfElements), Error>; + async fn recover + Send + Sync>( + path: P, + ) -> Result<(FalsePositive, NoHashFunc, NoOfElements), Error>; } #[async_trait] @@ -223,7 +228,10 @@ impl FileAsync for FileNode { async fn sync_all(&self) -> Result<(), Error> { let file = self.w_lock().await; - Ok(file.sync_all().await.map_err(|err| Error::FileSync { error: err })?) + Ok(file + .sync_all() + .await + .map_err(|err| Error::FileSync { error: err })?) } async fn flush(&self) -> Result<(), Error> { @@ -376,7 +384,10 @@ impl DataFs for DataFileNode { } } - async fn load_entries_within_range(&self, range_offset: RangeOffset) -> Result>, Error> { + async fn load_entries_within_range( + &self, + range_offset: RangeOffset, + ) -> Result>, Error> { let mut entries = Vec::new(); let mut total_bytes_read = 0; let path = &self.node.file_path; @@ -567,7 +578,9 @@ impl VLogFs for VLogFileNode { let path = &self.node.file_path; let mut entries = Vec::new(); let mut file = self.node.file.write().await; - file.seek(std::io::SeekFrom::Start(offset)).await.map_err(FileSeek)?; + file.seek(std::io::SeekFrom::Start(offset)) + .await + .map_err(FileSeek)?; let mut total_bytes_read: usize = 0; loop { let mut key_len_bytes = [0; SIZE_OF_U32]; @@ -646,7 +659,9 @@ impl IndexFs for IndexFileNode { let path = &self.node.file_path; let block_offset: i32 = -1; let mut file = self.node.file.write().await; - file.seek(std::io::SeekFrom::Start(0_u64)).await.map_err(FileSeek)?; + file.seek(std::io::SeekFrom::Start(0_u64)) + .await + .map_err(FileSeek)?; loop { let mut key_len_bytes = [0; SIZE_OF_U32]; @@ -689,7 +704,9 @@ impl IndexFs for IndexFileNode { let path = &self.node.file_path; let mut range_offset = RangeOffset::new(0, 0); let mut file = self.node.file.write().await; - file.seek(std::io::SeekFrom::Start(0_u64)).await.map_err(FileSeek)?; + file.seek(std::io::SeekFrom::Start(0_u64)) + .await + .map_err(FileSeek)?; loop { let mut key_len_bytes = [0; SIZE_OF_U32]; @@ -732,7 +749,10 @@ pub struct FilterFileNode { #[async_trait] impl FilterFs for FilterFileNode { - async fn new + Send + Sync>(path: P, file_type: FileType) -> Result { + async fn new + Send + Sync>( + path: P, + file_type: FileType, + ) -> Result { let node = FileNode::new(path, file_type).await?; Ok(FilterFileNode { node }) } @@ -830,7 +850,10 @@ pub struct SummaryFileNode { #[async_trait] impl SummaryFs for SummaryFileNode { - async fn new + Send + Sync>(path: P, file_type: FileType) -> Result { + async fn new + Send + Sync>( + path: P, + file_type: FileType, + ) -> Result { let node = FileNode::new(path, file_type).await?; Ok(SummaryFileNode { node }) } diff --git a/src/gc/garbage_collector.rs b/src/gc/garbage_collector.rs index b60dd47..41f6c06 100644 --- a/src/gc/garbage_collector.rs +++ b/src/gc/garbage_collector.rs @@ -200,7 +200,9 @@ impl GC { .await; match most_recent_value { Ok((value, creation_time)) => { - if entry.created_at < creation_time || value == TOMB_STONE_MARKER.as_bytes().to_vec() { + if entry.created_at < creation_time + || value == TOMB_STONE_MARKER.as_bytes().to_vec() + { invalid_entries_ref.write().await.push(entry); } else { valid_entries_ref.write().await.push((entry.key, value)); @@ -233,7 +235,8 @@ impl GC { v_offset, )); - GC::write_valid_entries_to_vlog(valid_entries, synced_entries.to_owned(), Arc::clone(&vlog)).await?; + GC::write_valid_entries_to_vlog(valid_entries, synced_entries.to_owned(), Arc::clone(&vlog)) + .await?; // call fsync on vlog to guarantee persistence to disk vlog.write().await.sync_to_disk().await?; diff --git a/src/key_range/range.rs b/src/key_range/range.rs index b8d59a7..a29f544 100644 --- a/src/key_range/range.rs +++ b/src/key_range/range.rs @@ -99,7 +99,13 @@ impl KeyRange { } let mut restored_range_map: HashMap = HashMap::new(); for (_, range) in self.key_ranges.read().await.iter() { - if has_restored_ranges && self.restored_ranges.read().await.contains_key(range.sst.dir.as_path()) { + if has_restored_ranges + && self + .restored_ranges + .read() + .await + .contains_key(range.sst.dir.as_path()) + { continue; } @@ -176,7 +182,10 @@ impl KeyRange { let restored_ranges = self.restored_ranges.read().await; if !restored_ranges.is_empty() { for (path, range) in restored_ranges.iter() { - self.key_ranges.write().await.insert(path.to_owned(), range.to_owned()); + self.key_ranges + .write() + .await + .insert(path.to_owned(), range.to_owned()); } drop(restored_ranges); self.restored_ranges.write().await.clear(); diff --git a/src/lib.rs b/src/lib.rs index 93686a5..c607eb0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,25 +15,25 @@ //! This is an ongoing project (**not production ready**) designed to optimize data movement during load times, random access, and compaction. Inspired by the WiscKey paper, [WiscKey: Separating Keys from Values in SSD-conscious Storage](https://usenix.org/system/files/conference/fast16/fast16-papers-lu.pdf), velarixdb aims to significantly enhance performance over traditional key-value stores. //! //! ## Problem -//! -//! During compaction in LevelDB or RocksDB, in the worst case, up to 10 SSTable files needs to be read, sorted and re-written since keys are not allowed to overlapp across all the sstables from Level 1 downwards. Suppose after merging SSTables in one level, the next level exceeds its threshold, compaction can cascade from Level 0 all the way to Level 6 meaning the overall write amplification can be up to 50 (ignoring the first compaction level).[ Reference -> [Official LevelDB Compaction Process Docs](https://github.com/facebook/rocksdb/wiki/Leveled-Compaction) ]. +//! +//! During compaction in LevelDB or RocksDB, in the worst case, up to 10 SSTable files needs to be read, sorted and re-written since keys are not allowed to overlapp across all the sstables from Level 1 downwards. Suppose after merging SSTables in one level, the next level exceeds its threshold, compaction can cascade from Level 0 all the way to Level 6 meaning the overall write amplification can be up to 50 (ignoring the first compaction level).[ Reference -> [Official LevelDB Compaction Process Docs](https://github.com/facebook/rocksdb/wiki/Leveled-Compaction) ]. //! This repetitive data movement can cause significant wear on SSDs, reducing their lifespan due to the high number of write cycles. The goal is to minimize the amount of data moved during compaction, thereby reducing the amount of data re-written and extending the device's lifetime. //! //! ## Solution -//! +//! //! To address this, we focus on whether a key has been deleted or updated. Including values in the compaction process (which are often larger than keys) unnecessarily amplifies the amount of data read and written. Therefore, we store keys and values separately. Specifically, we map value offsets to the keys, represented as 32-bit integers. //! This approach reduces the amount of data read, written, and moved during compaction, leading to improved performance and less wear on storage devices, particularly SSDs. By minimizing the data movement, we not only enhance the efficiency of the database but also significantly extend the lifespan of the underlying storage hardware. -//! +//! //! ## Performance Benefits //! //! According to the benchmarks presented in the WiscKey paper, implementations can outperform LevelDB and RocksDB by: //! - **2.5x to 111x** for database loading //! - **1.6x to 14x** for random lookups -//! +//! //! ## Addressing major concerns //! - **Range Query**: Since keys are separate from values, won't that affect range queries performance. Well, we now how have internal parallelism in SSDs, as we fetch the keys from the LSM tree we can fetch the values in parallel from the vlog file. This [benchmark](https://github.com/Gifted-s/velarixdb/blob/main/bench.png) from the Wisckey Paper shows how for request size ≥ 64KB, the aggregate throughput of random reads with 32 threads matches the sequential read throughput. //! - **More Disk I/O for Reads**: Since keys are now seperate from values, we have to make extra disk IO to fetch values? Yes, but since the key density now increases for each level (since we are only storing keys and value offsets in the sstable), we will most likely search fewer levels compared to LevelDB or RocksDB for thesame query. A significant portion of the LSM tree can also be cached in memory. -//! +//! //! ## Designed for asynchronous runtime (unstable) //! //! Based on the introduction and efficiency of async IO at the OS kernel level e.g **io_uring** for the Linux kernel, the experimental version of Velarixdb is designed for asynchronous runtime. In this case Tokio runtime. @@ -41,11 +41,11 @@ //! This means that even though the file system operations themselves are blocking at the OS level, Tokio can handle them without blocking the main async task executor. //! Tokio might adopt [io_uring](https://docs.rs/tokio/latest/tokio/fs/index.html#:~:text=Currently%2C%20Tokio%20will%20always%20use%20spawn_blocking%20on%20all%20platforms%2C%20but%20it%20may%20be%20changed%20to%20use%20asynchronous%20file%20system%20APIs%20such%20as%20io_uring%20in%20the%20future.) in the future, //! (We haven't benchmarked the async version therefore this is unstable and might be removed in future versions) -//! +//! //! ## Disclaimer //! //! Please note that velarixdb is still under development and is not yet production-ready. -//! +//! //! ### Features //! - [x] Atomic `Put()`, `Get()`, `Delete()`, and `Update()` operations //! - [x] 100% safe & stable Rust @@ -62,7 +62,7 @@ //! - [ ] Block Cache //! - [ ] Batched Writes //! - [ ] Range Query -//! - [ ] Snappy Compression +//! - [ ] Snappy Compression //! - [ ] Value Buffer to keep values in memory and only flush in batches to reduce IO (under investigation) //! - [ ] Checksum to detect data corruption //! - [ ] Leveled Compaction (LCS), Time-Window Compaction (TCS), and Unified Compaction (UCS) @@ -164,8 +164,12 @@ //! } //! ``` -#![doc(html_logo_url = "https://firebasestorage.googleapis.com/v0/b/generalsapi.appspot.com/o/Screenshot%202024-07-23%20at%2023.41.43.png?alt=media&token=109ab2a9-25d1-4a36-9d7e-7f8cfeb6ce6b")] -#![doc(html_favicon_url = "https://firebasestorage.googleapis.com/v0/b/generalsapi.appspot.com/o/Screenshot%202024-07-23%20at%2023.41.43.png?alt=media&token=109ab2a9-25d1-4a36-9d7e-7f8cfeb6ce6b")] +#![doc( + html_logo_url = "https://firebasestorage.googleapis.com/v0/b/generalsapi.appspot.com/o/Screenshot%202024-07-23%20at%2023.41.43.png?alt=media&token=109ab2a9-25d1-4a36-9d7e-7f8cfeb6ce6b" +)] +#![doc( + html_favicon_url = "https://firebasestorage.googleapis.com/v0/b/generalsapi.appspot.com/o/Screenshot%202024-07-23%20at%2023.41.43.png?alt=media&token=109ab2a9-25d1-4a36-9d7e-7f8cfeb6ce6b" +)] mod block; mod bucket; diff --git a/src/memtable/mem.rs b/src/memtable/mem.rs index 3429c86..4fff5bb 100644 --- a/src/memtable/mem.rs +++ b/src/memtable/mem.rs @@ -173,7 +173,11 @@ impl MemTable { Self::with_specified_capacity_and_rate(SizeUnit::Bytes, capacity, false_positive_rate) } - pub fn with_specified_capacity_and_rate(size_unit: SizeUnit, capacity: usize, false_positive_rate: f64) -> Self { + pub fn with_specified_capacity_and_rate( + size_unit: SizeUnit, + capacity: usize, + false_positive_rate: f64, + ) -> Self { assert!( false_positive_rate >= 0.0, "False positive rate can not be les than or equal to zero" @@ -347,7 +351,8 @@ mod tests { let buffer_size = 51200; let false_pos_rate = 1e-300; - let memtable = MemTable::with_specified_capacity_and_rate(SizeUnit::Bytes, buffer_size, false_pos_rate); + let memtable = + MemTable::with_specified_capacity_and_rate(SizeUnit::Bytes, buffer_size, false_pos_rate); assert_eq!(memtable.entries.len(), 0); assert_eq!(memtable.bloom_filter.num_elements(), 0); assert_eq!(memtable.size, 0); @@ -394,7 +399,7 @@ mod tests { let expected_len = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8; memtable.insert(&entry); - + assert_eq!(memtable.size, expected_len); memtable.insert(&entry); @@ -601,18 +606,23 @@ mod tests { map.insert(keys[3].to_owned(), (val_offset, created_at, is_tombstone)); map.insert(keys[4].to_owned(), (val_offset, created_at, is_tombstone)); - let within_range = - MemTable::is_entry_within_range(&map.get(&keys[0]).unwrap(), keys[0].to_owned(), keys[3].to_owned()); + let within_range = MemTable::is_entry_within_range( + &map.get(&keys[0]).unwrap(), + keys[0].to_owned(), + keys[3].to_owned(), + ); assert!(within_range); let start_invalid = vec![10, 20, 30, 40]; let end_invalid = vec![0, 0, 0, 0]; - let within_range = MemTable::is_entry_within_range(&map.get(&keys[0]).unwrap(), start_invalid, end_invalid); + let within_range = + MemTable::is_entry_within_range(&map.get(&keys[0]).unwrap(), start_invalid, end_invalid); assert!(!within_range); let start_valid = &keys[0]; let end_invalid = vec![0, 0, 0, 0]; - let within_range = MemTable::is_entry_within_range(&map.get(&keys[0]).unwrap(), start_valid, &end_invalid); + let within_range = + MemTable::is_entry_within_range(&map.get(&keys[0]).unwrap(), start_valid, &end_invalid); assert!(within_range); } diff --git a/src/meta/meta_manager.rs b/src/meta/meta_manager.rs index 4b9ae3c..ca03fb4 100644 --- a/src/meta/meta_manager.rs +++ b/src/meta/meta_manager.rs @@ -80,7 +80,8 @@ impl Meta { /// /// Returns IO error in case it occurs pub async fn recover(&mut self) -> Result<(), Error> { - let (head, tail, created_at, last_modified) = MetaFileNode::recover(self.file_handle.path.to_owned()).await?; + let (head, tail, created_at, last_modified) = + MetaFileNode::recover(self.file_handle.path.to_owned()).await?; self.v_log_head = head; self.v_log_tail = tail; self.created_at = created_at; diff --git a/src/range/range_iterator.rs b/src/range/range_iterator.rs index 6e78600..d23582d 100644 --- a/src/range/range_iterator.rs +++ b/src/range/range_iterator.rs @@ -1,6 +1,5 @@ - -use crate::err::Error; use crate::db::DataStore; +use crate::err::Error; use crate::memtable::Entry; use crate::types::{Key, ValOffset, Value}; use crate::vlog::ValueLog; @@ -48,7 +47,6 @@ impl<'a> RangeIterator<'a> { impl<'a> DataStore<'a, Key> { // TODO: range query, add next and previous method pub async fn seek(&self, _: &'a [u8], _: &'a [u8]) -> Result { - let range_iterator = RangeIterator::<'a>::new( &[1], &[2], diff --git a/src/sst/table.rs b/src/sst/table.rs index b0c32d4..0fb536a 100644 --- a/src/sst/table.rs +++ b/src/sst/table.rs @@ -37,7 +37,10 @@ use crate::{ block::Block, bucket::InsertableToBucket, - consts::{DATA_FILE_NAME, INDEX_FILE_NAME, SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8, SIZE_OF_USIZE, SUMMARY_FILE_NAME}, + consts::{ + DATA_FILE_NAME, INDEX_FILE_NAME, SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8, SIZE_OF_USIZE, + SUMMARY_FILE_NAME, + }, err::Error, filter::BloomFilter, fs::{DataFileNode, DataFs, FileAsync, FileNode, IndexFileNode, IndexFs, SummaryFileNode, SummaryFs}, @@ -230,7 +233,15 @@ impl Table { summary: None, }; table.size = table.data_file.file.node.size().await; - let modified_time = table.data_file.file.node.metadata().await.unwrap().modified().unwrap(); + let modified_time = table + .data_file + .file + .node + .metadata() + .await + .unwrap() + .modified() + .unwrap(); let epoch = SystemTime::UNIX_EPOCH; let elapsed_nanos = modified_time.duration_since(epoch).unwrap().as_nanos() as u64; table.created_at = util::milliseconds_to_datetime(elapsed_nanos / 1_000_000); @@ -256,10 +267,10 @@ impl Table { let mut blocks: Vec = Vec::new(); let mut index = Index::new(self.index_file.path.clone(), index_file.file.clone()); let mut summary = Summary::new(self.dir.to_owned()); - + let smallest_entry = self.entries.front(); let biggest_entry = self.entries.back(); - + summary.smallest_key = smallest_entry.unwrap().key().to_vec(); summary.biggest_key = biggest_entry.unwrap().key().to_vec(); @@ -269,7 +280,10 @@ impl Table { // write filter to disk self.filter.as_mut().unwrap().write(self.dir.to_owned()).await?; - self.filter.as_mut().unwrap().set_sstable_path(&self.data_file.path); + self.filter + .as_mut() + .unwrap() + .set_sstable_path(&self.data_file.path); // write data blocks let mut current_block = Block::new(); if self.size > 0 { diff --git a/src/tests/bucket_test.rs b/src/tests/bucket_test.rs index 67e0eba..4329311 100644 --- a/src/tests/bucket_test.rs +++ b/src/tests/bucket_test.rs @@ -148,17 +148,20 @@ mod tests { } let mut sst_within_size_range = SSTContructor::generate_ssts(1).await[0].to_owned(); new_bucket.avarage_size = sst_within_size_range.size(); - let fits_into_bucket = new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned()))); + let fits_into_bucket = + new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned()))); // size of sstable is not less than bucket low assert!(fits_into_bucket); // increase sstable size to be greater than bucket high range sst_within_size_range.size = ((new_bucket.avarage_size as f64 * BUCKET_HIGH) * 2.0) as usize; - let fits_into_bucket = new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned()))); + let fits_into_bucket = + new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned()))); // sstable size is greater than bucket high range assert!(!fits_into_bucket); // increase bucket average new_bucket.avarage_size = ((new_bucket.avarage_size as f64 * BUCKET_HIGH) * 2.0) as usize; - let fits_into_bucket = new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned()))); + let fits_into_bucket = + new_bucket.fits_into_bucket(Arc::new(Box::new(sst_within_size_range.to_owned()))); // sstable size is within bucket range assert!(fits_into_bucket); } @@ -223,7 +226,10 @@ mod tests { for (_id, ssts) in ssts_to_remove { expected_ssts_to_remove_from_file += ssts.len(); } - assert_eq!(expected_ssts_to_remove_from_file, expected_ssts_to_remove_in_buckets); + assert_eq!( + expected_ssts_to_remove_from_file, + expected_ssts_to_remove_in_buckets + ); // test empty map bucket_map.buckets.clear(); @@ -340,7 +346,7 @@ mod tests { let sst4 = tempdir().unwrap().path().to_owned(); let sst5 = tempdir().unwrap().path().to_owned(); let sst6 = tempdir().unwrap().path().to_owned(); - let ssts = [sst1, sst2, sst3, sst4, sst5,sst6]; + let ssts = [sst1, sst2, sst3, sst4, sst5, sst6]; for (idx, mut s) in sst_samples.iter().cloned().enumerate() { s.dir = ssts[idx].to_owned().to_path_buf(); diff --git a/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db b/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db index cc923c3..da361f1 100644 Binary files a/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db and b/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db differ diff --git a/src/tests/gc_test.rs b/src/tests/gc_test.rs index 47c09b1..b62fa19 100644 --- a/src/tests/gc_test.rs +++ b/src/tests/gc_test.rs @@ -28,13 +28,16 @@ mod tests { async fn datastore_gc_test_success() { let root = tempdir().unwrap(); let path = root.path().join("gc_test_1"); - let s_engine = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let s_engine = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let store = Arc::new(RwLock::new(s_engine)); let workload_size = 5000; let key_len = 5; let val_len = 5; let write_read_ratio = 0.5; - let workload = crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); + let workload = + crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); if let Err(err) = setup(store.clone(), &workload, true).await { log::error!("Setup failed {}", err); return; @@ -63,13 +66,16 @@ mod tests { async fn datastore_gc_test_unsupported_platform() { let root = tempdir().unwrap(); let path = root.path().join("gc_test_2"); - let s_engine = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let s_engine = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let store = Arc::new(RwLock::new(s_engine)); let workload_size = 5000; let key_len = 5; let val_len = 5; let write_read_ratio = 0.5; - let workload = crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); + let workload = + crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); if let Err(err) = setup(store.clone(), &workload, true).await { log::error!("Setup failed {}", err); return; @@ -97,13 +103,16 @@ mod tests { async fn datastore_gc_test_tail_shifted() { let root = tempdir().unwrap(); let path = root.path().join("gc_test_3"); - let s_engine = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let s_engine = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let store = Arc::new(RwLock::new(s_engine)); let workload_size = 5000; let key_len = 5; let val_len = 5; let write_read_ratio = 0.5; - let workload = crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); + let workload = + crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); if let Err(err) = setup(store.clone(), &workload, true).await { log::error!("Setup failed {}", err); return; @@ -133,13 +142,16 @@ mod tests { async fn datastore_gc_test_free_before_synchronization() { let root = tempdir().unwrap(); let path = root.path().join("gc_test_free"); - let s_engine = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let s_engine = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let store = Arc::new(RwLock::new(s_engine)); let workload_size = 5000; let key_len = 5; let val_len = 5; let write_read_ratio = 0.5; - let workload = crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); + let workload = + crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); if let Err(err) = setup(store.clone(), &workload, true).await { log::error!("Setup failed {}", err); return; @@ -168,13 +180,16 @@ mod tests { let bytes_to_scan_for_garbage_colection = SizeUnit::Bytes.as_bytes(100); let root = tempdir().unwrap(); let path = root.path().join("gc_test_4"); - let s_engine = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let s_engine = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let store = Arc::new(RwLock::new(s_engine)); let workload_size = 5; let key_len = 5; let val_len = 5; let write_read_ratio = 0.5; - let workload = crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); + let workload = + crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); if let Err(err) = setup(store.clone(), &workload, true).await { log::error!("Setup failed {}", err); return; @@ -216,7 +231,9 @@ mod tests { let bytes_to_scan_for_garbage_colection = SizeUnit::Bytes.as_bytes(100); let root = tempdir().unwrap(); let path = root.path().join("gc_test_5"); - let s_engine = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let s_engine = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let store = Arc::new(RwLock::new(s_engine)); let _ = store.write().await.put("test_key", "test_val").await; let _ = store.write().await.delete("test_key").await; @@ -224,7 +241,8 @@ mod tests { let key_len = 5; let val_len = 5; let write_read_ratio = 0.5; - let workload = crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); + let workload = + crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); if let Err(err) = setup(store.clone(), &workload, true).await { log::error!("Setup failed {}", err); return; @@ -253,13 +271,16 @@ mod tests { let prepare_delete = false; let root = tempdir().unwrap(); let path = root.path().join("gc_test_no_delete"); - let s_engine = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let s_engine = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let store = Arc::new(RwLock::new(s_engine)); let workload_size = 5000; let key_len = 5; let val_len = 5; let write_read_ratio = 0.5; - let workload = crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); + let workload = + crate::tests::workload::Workload::new(workload_size, key_len, val_len, write_read_ratio); if let Err(err) = setup(store.clone(), &workload, prepare_delete).await { log::error!("Setup failed {}", err); return; diff --git a/src/tests/key_range_test.rs b/src/tests/key_range_test.rs index 4458a0c..10f5a22 100644 --- a/src/tests/key_range_test.rs +++ b/src/tests/key_range_test.rs @@ -1,7 +1,7 @@ #[cfg(test)] mod tests { - use crate::tests::*; use crate::key_range::KeyRange; + use crate::tests::*; use std::time::Duration; use workload::SSTContructor; @@ -93,7 +93,12 @@ mod tests { let biggest_key2 = binding.key(); let fake_sst_dir2 = fake_sstable2.dir.to_owned(); key_range - .set(fake_sst_dir2.to_owned(), smallest_key2, biggest_key2, fake_sstable2) + .set( + fake_sst_dir2.to_owned(), + smallest_key2, + biggest_key2, + fake_sstable2, + ) .await; assert_eq!(key_range.key_ranges.read().await.len(), 2); @@ -146,7 +151,13 @@ mod tests { let sst_with_empty_filter = range.get(&fake_sst_dir).unwrap(); // Ensure Bloom Filter does not exist for this sstable - assert!(sst_with_empty_filter.sst.filter.clone().unwrap().sst_dir.is_none()); + assert!(sst_with_empty_filter + .sst + .filter + .clone() + .unwrap() + .sst_dir + .is_none()); // Searched for the first smallest key let retrieved_sstables = key_range.filter_sstables_by_key_range(smallest_key).await; @@ -213,7 +224,13 @@ mod tests { let sst_with_empty_filter = range.get(&fake_sst_dir).unwrap(); // Ensure Bloom Filter does not exist for this sstable - assert!(sst_with_empty_filter.sst.filter.clone().unwrap().sst_dir.is_none()); + assert!(sst_with_empty_filter + .sst + .filter + .clone() + .unwrap() + .sst_dir + .is_none()); // Ensure restored ranges is loaded let retrieved_sstables = key_range.filter_sstables_by_key_range(smallest_key).await; @@ -264,7 +281,13 @@ mod tests { let sst_with_empty_filter = range.get(&fake_sst_dir).unwrap(); // Ensure Bloom Filter does not exist for this sstable - assert!(sst_with_empty_filter.sst.filter.clone().unwrap().sst_dir.is_none()); + assert!(sst_with_empty_filter + .sst + .filter + .clone() + .unwrap() + .sst_dir + .is_none()); // Ensure restored ranges is loaded let retrieved_sstables = key_range.filter_sstables_by_key_range(smallest_key).await; diff --git a/src/tests/meta_test.rs b/src/tests/meta_test.rs index c4621d3..ae392db 100644 --- a/src/tests/meta_test.rs +++ b/src/tests/meta_test.rs @@ -57,14 +57,19 @@ mod tests { metadata.set_tail(new_tail); metadata.write().await.unwrap(); - let mut recovered_meta = Meta::new(path).await.unwrap(); let res = recovered_meta.recover().await; assert!(res.is_ok()); assert_eq!(recovered_meta.v_log_head, metadata.v_log_head); assert_eq!(recovered_meta.v_log_tail, metadata.v_log_tail); - assert_eq!(recovered_meta.created_at.timestamp_millis(), metadata.created_at.timestamp_millis()); - assert_eq!(recovered_meta.last_modified.timestamp_millis(), metadata.last_modified.timestamp_millis()); + assert_eq!( + recovered_meta.created_at.timestamp_millis(), + metadata.created_at.timestamp_millis() + ); + assert_eq!( + recovered_meta.last_modified.timestamp_millis(), + metadata.last_modified.timestamp_millis() + ); } #[tokio::test] @@ -77,9 +82,8 @@ mod tests { let new_head = 50; metadata.set_head(new_head); metadata.set_tail(new_tail); - - let expected_entry_len = SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U64; + let expected_entry_len = SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U64; let serialized_entry = metadata.serialize(); assert_eq!(serialized_entry.len(), expected_entry_len); diff --git a/src/tests/mod.rs b/src/tests/mod.rs index cda0feb..a1dfeef 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -1,10 +1,10 @@ mod bucket_test; mod gc_test; -mod store_test; -#[cfg(test)] -mod workload; mod key_range_test; -mod vlog; mod meta_test; -mod summary_test; mod sized_tier_test; +mod store_test; +mod summary_test; +mod vlog; +#[cfg(test)] +mod workload; diff --git a/src/tests/sized_tier_test.rs b/src/tests/sized_tier_test.rs index a22d505..c56f336 100644 --- a/src/tests/sized_tier_test.rs +++ b/src/tests/sized_tier_test.rs @@ -6,9 +6,9 @@ mod tests { use crate::key_range::KeyRange; use crate::memtable::Entry; use crate::tests::workload::SSTContructor; + use chrono::Utc; use std::sync::Arc; use std::time::Duration; - use chrono::Utc; use tempfile::tempdir; use tokio::sync::RwLock; use tokio::time::sleep; @@ -62,8 +62,11 @@ mod tests { filter_false_positive.to_owned(), ); - let new_sized_tier_compaction_runner = - SizedTierRunner::new(Arc::new(RwLock::new(bucket_map)), Arc::new(default_key_range), config); + let new_sized_tier_compaction_runner = SizedTierRunner::new( + Arc::new(RwLock::new(bucket_map)), + Arc::new(default_key_range), + config, + ); assert!(new_sized_tier_compaction_runner .bucket_map .read() @@ -78,7 +81,10 @@ mod tests { .is_empty()); assert!(new_sized_tier_compaction_runner.tombstones.is_empty()); assert_eq!(new_sized_tier_compaction_runner.config.use_ttl, use_ttl); - assert_eq!(new_sized_tier_compaction_runner.config.tombstone_ttl, ttl.tombstone_ttl); + assert_eq!( + new_sized_tier_compaction_runner.config.tombstone_ttl, + ttl.tombstone_ttl + ); assert_eq!(new_sized_tier_compaction_runner.config.entry_ttl, ttl.entry_ttl); assert_eq!( new_sized_tier_compaction_runner.config.flush_listener_interval, @@ -89,7 +95,9 @@ mod tests { intervals.background_interval ); assert_eq!( - new_sized_tier_compaction_runner.config.tombstone_compaction_interval, + new_sized_tier_compaction_runner + .config + .tombstone_compaction_interval, intervals.tombstone_compaction_interval ); assert_eq!( @@ -132,7 +140,8 @@ mod tests { bucket_map.buckets.insert(new_bucket3.id, new_bucket3); bucket_map.buckets.insert(new_bucket4.id, new_bucket4); - let imbalanced_buckets = SizedTierRunner::fetch_imbalanced_buckets(Arc::new(RwLock::new(bucket_map))).await; + let imbalanced_buckets = + SizedTierRunner::fetch_imbalanced_buckets(Arc::new(RwLock::new(bucket_map))).await; assert!(imbalanced_buckets.is_ok()); let (buckets, ssts_to_remove) = imbalanced_buckets.unwrap(); let mut expected_ssts_to_remove_in_buckets = 0; @@ -147,7 +156,10 @@ mod tests { for (_id, ssts) in ssts_to_remove { expected_ssts_to_remove_from_file += ssts.len(); } - assert_eq!(expected_ssts_to_remove_from_file, expected_ssts_to_remove_in_buckets); + assert_eq!( + expected_ssts_to_remove_from_file, + expected_ssts_to_remove_in_buckets + ); } #[tokio::test] @@ -179,9 +191,14 @@ mod tests { let default_key_range = KeyRange::default(); let config = &generate_config(); - let mut sized_tier_compaction_runner = - SizedTierRunner::new(Arc::new(RwLock::new(bucket_map)), Arc::new(default_key_range), config); - let merge_ssts = sized_tier_compaction_runner.merge_ssts_in_buckets(&[bucket]).await; + let mut sized_tier_compaction_runner = SizedTierRunner::new( + Arc::new(RwLock::new(bucket_map)), + Arc::new(default_key_range), + config, + ); + let merge_ssts = sized_tier_compaction_runner + .merge_ssts_in_buckets(&[bucket]) + .await; assert!(merge_ssts.is_ok()); assert!(!merge_ssts.as_ref().unwrap().is_empty()); for sst in merge_ssts.unwrap() { @@ -348,9 +365,17 @@ mod tests { let compaction_res = sized_tier_compaction_runner.run_compaction().await; assert!(compaction_res.is_ok()); assert!(sized_tier_compaction_runner.tombstones.is_empty()); - assert!(!sized_tier_compaction_runner.bucket_map.read().await.buckets.is_empty()); + assert!(!sized_tier_compaction_runner + .bucket_map + .read() + .await + .buckets + .is_empty()); // all sstables should have been compacted to 1 - assert_eq!(sized_tier_compaction_runner.bucket_map.read().await.buckets.len(), 1); + assert_eq!( + sized_tier_compaction_runner.bucket_map.read().await.buckets.len(), + 1 + ); assert_eq!( sized_tier_compaction_runner.bucket_map.read().await.buckets[0] .sstables @@ -367,7 +392,15 @@ mod tests { .await .is_empty()); // all sstables should have been compacted to 1 so we should have one range - assert_eq!(sized_tier_compaction_runner.key_range.key_ranges.read().await.len(), 1); + assert_eq!( + sized_tier_compaction_runner + .key_range + .key_ranges + .read() + .await + .len(), + 1 + ); } #[tokio::test] @@ -505,14 +538,20 @@ mod tests { let ssts_to_delete = &bucket_map.extract_imbalanced_buckets().await.unwrap().1; let bucket_map_ref = Arc::new(RwLock::new(bucket_map)); let key_range_ref = Arc::new(key_range); - let sized_tier_compaction_runner = SizedTierRunner::new(bucket_map_ref.clone(), key_range_ref.clone(), config); + let sized_tier_compaction_runner = + SizedTierRunner::new(bucket_map_ref.clone(), key_range_ref.clone(), config); let cleanup_res = sized_tier_compaction_runner .clean_up_after_compaction(bucket_map_ref.clone(), ssts_to_delete, key_range_ref.clone()) .await; assert!(cleanup_res.is_ok()); assert!(cleanup_res.unwrap().is_some()); - assert!(sized_tier_compaction_runner.bucket_map.read().await.buckets.is_empty()); + assert!(sized_tier_compaction_runner + .bucket_map + .read() + .await + .buckets + .is_empty()); assert!(sized_tier_compaction_runner .key_range .key_ranges @@ -528,26 +567,26 @@ mod tests { let bucket_map = BucketMap::new(path.to_owned()).await.unwrap(); let default_key_range = KeyRange::default(); let config = &generate_config(); - let mut sized_tier_compaction_runner = - SizedTierRunner::new(Arc::new(RwLock::new(bucket_map)), Arc::new(default_key_range), config); - + let mut sized_tier_compaction_runner = SizedTierRunner::new( + Arc::new(RwLock::new(bucket_map)), + Arc::new(default_key_range), + config, + ); - let not_tombstone = false; - let merged_entries = [ + let not_tombstone = false; + let merged_entries = [ Entry::new("key1", 100, Utc::now(), not_tombstone), Entry::new("key2", 200, Utc::now(), not_tombstone), Entry::new("key3", 300, Utc::now(), not_tombstone), - ]; - - let is_tombstone = true; - let to_insert = Entry::new("key4", 400, Utc::now(), is_tombstone); - - sized_tier_compaction_runner.tombstone_check(&to_insert, &mut merged_entries.to_vec()); - // length should not change since insertion is not be allowed - assert_eq!(merged_entries.len(), 3); - } + ]; + let is_tombstone = true; + let to_insert = Entry::new("key4", 400, Utc::now(), is_tombstone); + sized_tier_compaction_runner.tombstone_check(&to_insert, &mut merged_entries.to_vec()); + // length should not change since insertion is not be allowed + assert_eq!(merged_entries.len(), 3); + } #[tokio::test] async fn test_not_insert_tombstone_elements_found_in_tombstone_hashmap() { @@ -556,29 +595,31 @@ mod tests { let bucket_map = BucketMap::new(path.to_owned()).await.unwrap(); let default_key_range = KeyRange::default(); let config = &generate_config(); - let mut sized_tier_compaction_runner = - SizedTierRunner::new(Arc::new(RwLock::new(bucket_map)), Arc::new(default_key_range), config); - + let mut sized_tier_compaction_runner = SizedTierRunner::new( + Arc::new(RwLock::new(bucket_map)), + Arc::new(default_key_range), + config, + ); - let not_tombstone = false; - let merged_entries = [ + let not_tombstone = false; + let merged_entries = [ Entry::new("key1", 100, Utc::now(), not_tombstone), Entry::new("key2", 200, Utc::now(), not_tombstone), Entry::new("key3", 300, Utc::now(), not_tombstone), - ]; - sleep(Duration::from_secs(1)).await; - let is_tombstone = false; - let deletion_time = Utc::now(); - let to_insert = Entry::new("key3", 300, deletion_time, is_tombstone); - sized_tier_compaction_runner.tombstones.insert(to_insert.key.to_owned(), deletion_time); - - sized_tier_compaction_runner.tombstone_check(&to_insert, &mut merged_entries.to_vec()); - // length should not change since insertion is not be allowed - assert_eq!(merged_entries.len(), 3); + ]; + sleep(Duration::from_secs(1)).await; + let is_tombstone = false; + let deletion_time = Utc::now(); + let to_insert = Entry::new("key3", 300, deletion_time, is_tombstone); + sized_tier_compaction_runner + .tombstones + .insert(to_insert.key.to_owned(), deletion_time); + + sized_tier_compaction_runner.tombstone_check(&to_insert, &mut merged_entries.to_vec()); + // length should not change since insertion is not be allowed + assert_eq!(merged_entries.len(), 3); } - - #[tokio::test] async fn test_insert_valid_elements() { let root = tempdir().unwrap(); @@ -586,22 +627,24 @@ mod tests { let bucket_map = BucketMap::new(path.to_owned()).await.unwrap(); let default_key_range = KeyRange::default(); let config = &generate_config(); - let mut sized_tier_compaction_runner = - SizedTierRunner::new(Arc::new(RwLock::new(bucket_map)), Arc::new(default_key_range), config); - + let mut sized_tier_compaction_runner = SizedTierRunner::new( + Arc::new(RwLock::new(bucket_map)), + Arc::new(default_key_range), + config, + ); - let not_tombstone = false; - let mut merged_entries = vec![ + let not_tombstone = false; + let mut merged_entries = vec![ Entry::new("key1", 100, Utc::now(), not_tombstone), Entry::new("key2", 200, Utc::now(), not_tombstone), Entry::new("key3", 300, Utc::now(), not_tombstone), - ]; - - let not_tombstone = false; - let to_insert = Entry::new("key4", 400, Utc::now(), not_tombstone); - - sized_tier_compaction_runner.tombstone_check(&to_insert, &mut merged_entries); - // length should increase since insertion is allowed - assert_eq!(merged_entries.len(), 4); + ]; + + let not_tombstone = false; + let to_insert = Entry::new("key4", 400, Utc::now(), not_tombstone); + + sized_tier_compaction_runner.tombstone_check(&to_insert, &mut merged_entries); + // length should increase since insertion is allowed + assert_eq!(merged_entries.len(), 4); } } diff --git a/src/tests/store_test.rs b/src/tests/store_test.rs index 4a31354..8392349 100644 --- a/src/tests/store_test.rs +++ b/src/tests/store_test.rs @@ -26,8 +26,10 @@ mod tests { setup(); let path = PathBuf::new().join("src/tests/fixtures/data"); - let store = DataStore::open_without_background("test", path.clone()).await.unwrap(); - + let store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); + assert!(!store.buckets.read().await.buckets.is_empty()); assert!(!store.key_range.key_ranges.read().await.is_empty()); assert!(!store.active_memtable.entries.is_empty()); @@ -38,7 +40,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_2"); - let store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 10000; let key_len = 5; let val_len = 5; @@ -122,7 +126,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_4"); - let store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 1; let key_len = 5; let val_len = 5; @@ -160,7 +166,11 @@ mod tests { assert!(tokio_res.unwrap().unwrap()); } - let res = store_ref.read().await.get(std::str::from_utf8(key).unwrap()).await; + let res = store_ref + .read() + .await + .get(std::str::from_utf8(key).unwrap()) + .await; assert!(res.is_ok()); // Even though the write of thesame key happened concurrently, we expect the last entry to reflect assert_eq!(res.unwrap().unwrap().val, entry5.val); @@ -171,7 +181,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_5"); - let mut store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let mut store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 10000; let key_len = 5; let val_len = 5; @@ -192,7 +204,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_6"); - let mut store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let mut store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 5000; let key_len = 5; let val_len = 5; @@ -217,7 +231,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_7"); - let store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 10000; let key_len = 5; let val_len = 5; @@ -242,7 +258,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_8"); - let store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 10000; let key_len = 5; let val_len = 5; @@ -302,7 +320,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_9"); - let store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 10000; let key_len = 5; let val_len = 5; @@ -353,7 +373,9 @@ mod tests { setup(); let root = tempdir().unwrap(); let path = root.path().join("store_test_10"); - let store = DataStore::open_without_background("test", path.clone()).await.unwrap(); + let store = DataStore::open_without_background("test", path.clone()) + .await + .unwrap(); let workload_size = 10000; let key_len = 5; let val_len = 5; @@ -390,7 +412,4 @@ mod tests { assert!(res.is_ok()); assert!(res.unwrap().is_none()); } - - - } diff --git a/src/tests/summary_test.rs b/src/tests/summary_test.rs index 498d459..896d5a0 100644 --- a/src/tests/summary_test.rs +++ b/src/tests/summary_test.rs @@ -12,16 +12,14 @@ mod tests { let summary = Summary::new(path.to_owned()); - assert_eq!(summary.smallest_key, vec![]); assert_eq!(summary.biggest_key, vec![]); assert_eq!(summary.path, path.join(format!("{}.db", SUMMARY_FILE_NAME))); - } #[tokio::test] async fn test_summary_recover() { - let sst = SSTContructor::generate_ssts(1).await[0].to_owned(); + let sst = SSTContructor::generate_ssts(1).await[0].to_owned(); let mut recovered_summary = Summary::new(sst.dir); let res = recovered_summary.recover().await; @@ -46,10 +44,11 @@ mod tests { let path = root.path().join("summary_write"); let mut summary = Summary::new(path); - summary.biggest_key = vec![1,2,3]; - summary.smallest_key = vec![0,2,3]; + summary.biggest_key = vec![1, 2, 3]; + summary.smallest_key = vec![0, 2, 3]; - let expected_entry_len = SIZE_OF_U32 + SIZE_OF_U32 + summary.biggest_key.len() + summary.smallest_key.len(); + let expected_entry_len = + SIZE_OF_U32 + SIZE_OF_U32 + summary.biggest_key.len() + summary.smallest_key.len(); let serialized_entry = summary.serialize(); assert_eq!(serialized_entry.len(), expected_entry_len); diff --git a/src/tests/workload.rs b/src/tests/workload.rs index 4cc2a3b..d9a4ed0 100644 --- a/src/tests/workload.rs +++ b/src/tests/workload.rs @@ -120,7 +120,10 @@ impl Workload { pub struct FilterWorkload {} impl FilterWorkload { - pub fn from(false_pos: f64, entries: Arc, SkipMapValue>>) -> crate::filter::BloomFilter { + pub fn from( + false_pos: f64, + entries: Arc, SkipMapValue>>, + ) -> crate::filter::BloomFilter { let mut filter = crate::filter::BloomFilter::new(false_pos, entries.len()); filter.build_filter_from_entries(&entries); filter @@ -142,7 +145,13 @@ pub struct SSTContructor { } impl SSTContructor { - fn new + Send + Sync>(dir: P, data_path: P, index_path: P, filter_path: P, summary_path: P) -> Self { + fn new + Send + Sync>( + dir: P, + data_path: P, + index_path: P, + filter_path: P, + summary_path: P, + ) -> Self { return Self { dir: dir.as_ref().to_path_buf(), data_path: data_path.as_ref().to_path_buf(), @@ -273,7 +282,9 @@ SSTContructor::new( node: FileNode { file_path: sst_contructor[idx].data_path.to_owned(), file: Arc::new(RwLock::new( - File::open(sst_contructor[idx].data_path.to_owned()).await.unwrap(), + File::open(sst_contructor[idx].data_path.to_owned()) + .await + .unwrap(), )), file_type: FileType::Data, }, @@ -285,7 +296,9 @@ SSTContructor::new( node: FileNode { file_path: sst_contructor[idx].index_path.to_owned(), file: Arc::new(RwLock::new( - File::open(sst_contructor[idx].index_path.to_owned()).await.unwrap(), + File::open(sst_contructor[idx].index_path.to_owned()) + .await + .unwrap(), )), file_type: FileType::Index, }, diff --git a/src/util/mod.rs b/src/util/mod.rs index 20d14e1..4619478 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -8,7 +8,11 @@ use rand::{distributions::Alphanumeric, Rng}; #[cfg(test)] pub fn generate_random_id(length: usize) -> String { let rng = rand::thread_rng(); - let id: String = rng.sample_iter(&Alphanumeric).take(length).map(char::from).collect(); + let id: String = rng + .sample_iter(&Alphanumeric) + .take(length) + .map(char::from) + .collect(); id } @@ -49,7 +53,6 @@ pub fn float_from_le_bytes(bytes: &[u8]) -> Option { Some(float) } - #[cfg(test)] mod tests { use super::*; @@ -98,4 +101,4 @@ mod tests { let result = float_from_le_bytes(&invalid_bytes); assert_eq!(result, None); } -} \ No newline at end of file +} diff --git a/src/vlog/v_log.rs b/src/vlog/v_log.rs index b92a0ac..5eebd82 100644 --- a/src/vlog/v_log.rs +++ b/src/vlog/v_log.rs @@ -217,11 +217,11 @@ impl ValueLog { } /// Fetches an entry from value log using the `start_offset` - /// - /// + /// + /// /// This is used to fetch all entries that is yet to be flushed /// before crash happened - /// + /// /// # Error /// /// Returns error in case there is an IO error @@ -251,7 +251,7 @@ impl ValueLog { log::info!("{}", err); } } - self.size=0; + self.size = 0; self.tail_offset = 0; self.head_offset = 0; } @@ -289,7 +289,8 @@ impl ValueLogEntry { /// Converts value log entry to a byte vector pub(crate) fn serialize(&self) -> ByteSerializedEntry { - let entry_len = SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + self.key.len() + self.value.len() + SIZE_OF_U8; + let entry_len = + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + self.key.len() + self.value.len() + SIZE_OF_U8; let mut serialized_data = Vec::with_capacity(entry_len); serialized_data.extend_from_slice(&(self.key.len() as u32).to_le_bytes()); @@ -307,4 +308,3 @@ impl ValueLogEntry { serialized_data } } - diff --git a/tests/get.rs b/tests/get.rs index 575c854..cf35db6 100644 --- a/tests/get.rs +++ b/tests/get.rs @@ -30,10 +30,19 @@ async fn test_get() { let entry7 = store.get("***not_found_key**").await.unwrap(); assert_eq!(std::str::from_utf8(&entry1.unwrap().val).unwrap(), "tim cook"); - assert_eq!(std::str::from_utf8(&entry2.unwrap().val).unwrap(), "sundar pichai"); + assert_eq!( + std::str::from_utf8(&entry2.unwrap().val).unwrap(), + "sundar pichai" + ); assert_eq!(std::str::from_utf8(&entry3.unwrap().val).unwrap(), "jensen huang"); - assert_eq!(std::str::from_utf8(&entry4.unwrap().val).unwrap(), "satya nadella"); - assert_eq!(std::str::from_utf8(&entry5.unwrap().val).unwrap(), "mark zuckerberg"); + assert_eq!( + std::str::from_utf8(&entry4.unwrap().val).unwrap(), + "satya nadella" + ); + assert_eq!( + std::str::from_utf8(&entry5.unwrap().val).unwrap(), + "mark zuckerberg" + ); assert_eq!(std::str::from_utf8(&entry6.unwrap().val).unwrap(), "sam altman"); assert!(entry7.is_none()) }