Skip to content

Commit

Permalink
[Test] Improved test for Meta, Key Range and Value Log
Browse files Browse the repository at this point in the history
  • Loading branch information
Gifted-s committed Jul 12, 2024
1 parent 2761240 commit bb5ae97
Show file tree
Hide file tree
Showing 71 changed files with 972 additions and 183 deletions.
Binary file modified .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion src/consts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub const DEFAULT_FLUSH_SIGNAL_CHANNEL_SIZE: usize = 1;

pub const DEFAULT_MAX_WRITE_BUFFER_NUMBER: usize = 2;

pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 1e-200;
pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 1e-4;

pub const VALUE_LOG_DIRECTORY_NAME: &str = "v_log";

Expand Down
4 changes: 2 additions & 2 deletions src/db/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ impl DataStore<'static, Key> {
let created_at = Utc::now();
let tail_offset = vlog
.append(&TAIL_ENTRY_KEY.to_vec(), &TAIL_ENTRY_VALUE.to_vec(), created_at, false)
.await;
.await?;
let tail_entry = Entry::new(TAIL_ENTRY_KEY.to_vec(), tail_offset, created_at, false);
let head_offset = vlog
.append(&HEAD_ENTRY_KEY.to_vec(), &HEAD_ENTRY_VALUE.to_vec(), created_at, false)
.await;
.await?;
let head_entry = Entry::new(HEAD_ENTRY_KEY.to_vec(), head_offset, created_at, false);
vlog.set_head(head_offset);
vlog.set_tail(tail_offset);
Expand Down
4 changes: 2 additions & 2 deletions src/db/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl DataStore<'static, Key> {
let v_offset = self
.val_log
.append(key.as_ref(), val.as_ref(), created_at, is_tombstone)
.await;
.await?;
let entry = Entry::new(key.as_ref().to_vec(), v_offset, created_at, is_tombstone);

if self.active_memtable.is_full(HEAD_ENTRY_KEY.len()) {
Expand Down Expand Up @@ -466,7 +466,7 @@ impl DataStore<'static, Key> {
}
self.get_value_from_vlog(offset, insert_time).await
} else {
let ssts = &self.key_range.filter_sstables_by_biggest_key(key.as_ref()).await?;
let ssts = &self.key_range.filter_sstables_by_key_range(key.as_ref()).await?;
if ssts.is_empty() {
return Ok(None);
}
Expand Down
8 changes: 4 additions & 4 deletions src/gc/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl GC {
return Ok(());
}
let new_tail_offset = vlog.read().await.tail_offset + total_bytes_read;
let v_offset = GC::write_tail_to_disk(Arc::clone(&vlog), new_tail_offset).await;
let v_offset = GC::write_tail_to_disk(Arc::clone(&vlog), new_tail_offset).await?;

synced_entries.write().await.push((
TAIL_ENTRY_KEY.to_vec(),
Expand Down Expand Up @@ -257,7 +257,7 @@ impl GC {
}

/// Inserts tail entry to value log
pub(crate) async fn write_tail_to_disk(vlog: GCLog, new_tail_offset: usize) -> ValOffset {
pub(crate) async fn write_tail_to_disk(vlog: GCLog, new_tail_offset: usize) -> Result<ValOffset, Error> {
vlog.write()
.await
.append(
Expand Down Expand Up @@ -302,7 +302,7 @@ impl GC {
vlog: GCLog,
) -> Result<(), Error> {
for (key, value) in valid_entries.to_owned().read().await.iter() {
let v_offset = vlog.write().await.append(&key, &value, Utc::now(), false).await;
let v_offset = vlog.write().await.append(&key, &value, Utc::now(), false).await?;
synced_entries
.write()
.await
Expand Down Expand Up @@ -457,7 +457,7 @@ impl GC {
GC::get_value_from_vlog(&vlog, offset, insert_time).await
} else {
// Step 3: Check sstables
let ssts = &key_range.filter_sstables_by_biggest_key(&key).await?;
let ssts = &key_range.filter_sstables_by_key_range(&key).await?;
GC::search_key_in_sstables(key, ssts.to_vec(), &vlog).await
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/key_range/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod range;
pub use range::BiggestKey;
pub use range::KeyRange;
#[cfg(test)]
pub use range::Range;
pub use range::SmallestKey;
20 changes: 12 additions & 8 deletions src/key_range/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use std::{
sync::Arc,
};

/// Biggest key in the SSTable
pub type BiggestKey = types::Key;

/// Smallest key in the SSTable
pub type SmallestKey = types::Key;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -85,7 +88,7 @@ impl KeyRange {
/// # Errors
///
/// Returns error in case failure occured
pub async fn filter_sstables_by_biggest_key<K: AsRef<[u8]> + std::fmt::Debug>(
pub async fn filter_sstables_by_key_range<K: AsRef<[u8]> + std::fmt::Debug>(
&self,
key: K,
) -> Result<Vec<Table>, Error> {
Expand All @@ -95,24 +98,22 @@ impl KeyRange {
filtered_ssts = self.check_restored_key_ranges(key.as_ref()).await?;
}
let mut restored_range_map: HashMap<PathBuf, Range> = HashMap::new();
let ranger = self.key_ranges.read().await.clone();
for (_, range) in ranger.iter() {
for (_, range) in self.key_ranges.read().await.iter() {
if has_restored_ranges && self.restored_ranges.read().await.contains_key(range.sst.dir.as_path()) {
continue;
}

let searched_key = key.as_ref().to_vec();
if searched_key >= range.smallest_key && searched_key <= range.biggest_key {
// If an sstable does not have a bloom filter then
// it means there has been a crash and we need to restore
// filter from disk using filter metadata stored on sstable
if range.sst.filter.as_ref().unwrap().sst_dir.is_none() {
println!("Here we are");
let mut mut_range = range.to_owned();
let mut filter = mut_range.sst.filter.as_ref().unwrap().to_owned();

filter.recover_meta().await?;
filter.sst_dir = Some(mut_range.sst.dir.to_owned());

mut_range.sst.load_entries_from_file().await?;
filter.build_filter_from_entries(&mut_range.sst.entries);
// Don't keep sst entries in memory
Expand All @@ -121,7 +122,8 @@ impl KeyRange {
restored_range_map.insert(mut_range.sst.dir.to_owned(), mut_range.to_owned());

if filter.contains(key.as_ref()) {
filtered_ssts.push(mut_range.sst)
filtered_ssts.push(mut_range.sst);
continue;
}
}

Expand All @@ -136,8 +138,10 @@ impl KeyRange {
// updating key_ranges immediatlely to prevent write locks on key_ranges for
// get operations
let restored_ranges = self.restored_ranges.clone();

tokio::spawn(async move {
*(restored_ranges.write().await) = restored_range_map;
restored_range_map.clone_into(&mut (*restored_ranges.write().await));
drop(restored_ranges);
});
}
Ok(filtered_ssts)
Expand Down Expand Up @@ -179,7 +183,7 @@ impl KeyRange {
}

/// Returns SSTables whose keys overlap with the key range supplied
pub async fn range_scan<T: AsRef<[u8]>>(&self, start_key: T, end_key: T) -> Vec<Range> {
pub async fn range_query_scan<T: AsRef<[u8]>>(&self, start_key: T, end_key: T) -> Vec<Range> {
self.key_ranges
.read()
.await
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
//! store.put("openai", "sam altman").await;
//!
//!
//! let entry1 = store.get("apple").await.unwrap(); // Handle error
//! let entry1 = store.get("apple").await.unwrap(); // handle error
//! let entry2 = store.get("google").await.unwrap();
//! let entry3 = store.get("nvidia").await.unwrap();
//! let entry4 = store.get("microsoft").await.unwrap();
Expand All @@ -99,7 +99,7 @@
//! ```
//!
//!
//! ## Store JSON
//! ### Store JSON
//!
//! ```rust
//! use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion src/memtable/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ mod tests {
let expected_len = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8;

memtable.insert(&entry);
println!("{}", memtable.size);

assert_eq!(memtable.size, expected_len);

memtable.insert(&entry);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/meta_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Meta {
}

/// Serializes `Meta` into byte vector
fn serialize(&self) -> ByteSerializedEntry {
pub(crate) fn serialize(&self) -> ByteSerializedEntry {
// head offset + tail offset + created_at + last_modified
let entry_len = SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U64;

Expand Down
2 changes: 1 addition & 1 deletion src/sst/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl Summary {
}

/// Serializes `Summary` to byte vector
fn serialize(&self) -> ByteSerializedEntry {
pub(crate) fn serialize(&self) -> ByteSerializedEntry {
let entry_len = SIZE_OF_U32 + SIZE_OF_U32 + self.biggest_key.len() + self.smallest_key.len();
let mut serialized_data = Vec::with_capacity(entry_len);

Expand Down
155 changes: 7 additions & 148 deletions src/tests/bucket_test.rs
Original file line number Diff line number Diff line change
@@ -1,159 +1,18 @@
#[cfg(test)]
mod tests {
use crate::sst::DataFile;
use crate::tests::workload::FilterWorkload;
use crate::tests::workload::{FilterWorkload, SSTContructor};
use crate::{
bucket::{Bucket, BucketMap},
consts::{BUCKET_HIGH, MIN_TRESHOLD},
err::Error,
};
use std::path::Path;
use std::{path::PathBuf, sync::Arc};
use std::sync::Arc;
use tempfile::tempdir;
use tokio::fs;
use uuid::Uuid;

use chrono::Utc;
use crossbeam_skiplist::SkipMap;
use tokio::{fs::File, sync::RwLock};

use crate::{
fs::{DataFileNode, FileNode, FileType, IndexFileNode},
index::IndexFile,
sst::Table,
};

pub struct SSTContructor {
pub dir: PathBuf,
pub data_path: PathBuf,
pub index_path: PathBuf,
}

impl SSTContructor {
fn new<P: AsRef<Path> + Send + Sync>(dir: P, data_path: P, index_path: P) -> Self {
return Self {
dir: dir.as_ref().to_path_buf(),
data_path: data_path.as_ref().to_path_buf(),
index_path: index_path.as_ref().to_path_buf(),
};
}

pub async fn generate_ssts(number: u32) -> Vec<Table> {
let sst_contructor: Vec<SSTContructor> = vec![
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830953696"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958016/data_1718830958016_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958016/index_1718830958016_.db",
),
),
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830954572"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958858/data_1718830958858_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958858/index_1718830958858_.db",
),
),
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830955463"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958906/data_1718830958906_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958906/index_1718830958906_.db",
),
),
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830956313"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958953/data_1718830958953_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958953/index_1718830958953_.db",
),
),
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830957169"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959000/data_1718830959000_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959000/index_1718830959000_.db",
),
),
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958810"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959049/data_1718830959049_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959049/index_1718830959049_.db",
),
),
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958858"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959097/data_1718830959097_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959097/index_1718830959097_.db",
),
),
SSTContructor::new(
PathBuf::from("src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830958906"),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959145/data_1718830959145_.db",
),
PathBuf::from(
"src/tests/fixtures/data/buckets/bucket5af1d6ce-fca8-4b31-8380-c9b1612a9879/sstable_1718830959145/index_1718830959145_.db",
),
),
];
let mut ssts = Vec::new();
for i in 0..number {
let idx = i as usize;
ssts.push(Table {
dir: sst_contructor[idx].dir.to_owned(),
hotness: 100,
size: 4096,
created_at: Utc::now(),
data_file: DataFile {
file: DataFileNode {
node: FileNode {
file_path: sst_contructor[idx].data_path.to_owned(),
file: Arc::new(RwLock::new(
File::open(sst_contructor[idx].data_path.to_owned()).await.unwrap(),
)),
file_type: FileType::Data,
},
},
path: sst_contructor[idx].data_path.to_owned(),
},
index_file: IndexFile {
file: IndexFileNode {
node: FileNode {
file_path: sst_contructor[idx].index_path.to_owned(),
file: Arc::new(RwLock::new(
File::open(sst_contructor[idx].index_path.to_owned()).await.unwrap(),
)),
file_type: FileType::Index,
},
},
path: sst_contructor[idx].index_path.to_owned(),
},
entries: Arc::new(SkipMap::default()),
filter: None,
summary: None,
})
}
ssts
}
}


#[tokio::test]
async fn test_bucket_new() {
let root = tempdir().unwrap();
Expand Down Expand Up @@ -512,11 +371,11 @@ mod tests {

let imbalanced_buckets = bucket_map.extract_imbalanced_buckets().await;
assert!(imbalanced_buckets.is_ok());
let (buckets, ssts_to_remove) = imbalanced_buckets.unwrap();
let (buckets, _) = imbalanced_buckets.unwrap();
assert_eq!(buckets.len(), 5);

let delete_res = bucket_map.delete_ssts(&ssts_to_remove).await;
assert!(delete_res.is_ok());
assert_eq!(bucket_map.buckets.len(), 0);
// let delete_res = bucket_map.delete_ssts(&ssts_to_remove).await;
// assert!(delete_res.is_ok());
// assert_eq!(bucket_map.buckets.len(), 0);
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added src/tests/fixtures/data/meta/meta.bin
Binary file not shown.
Binary file modified src/tests/fixtures/data/v_log/val_log.bin
Binary file not shown.
Loading

0 comments on commit bb5ae97

Please sign in to comment.