Skip to content

Commit

Permalink
Added biggest key index
Browse files Browse the repository at this point in the history
  • Loading branch information
Gifted-s committed Mar 14, 2024
1 parent 5a35c05 commit af75726
Show file tree
Hide file tree
Showing 13 changed files with 208 additions and 59 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"./Cargo.toml",
"./Cargo.toml",
"./Cargo.toml",
"./Cargo.toml",
"./Cargo.toml"
]
}
6 changes: 3 additions & 3 deletions src/block/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
use tokio::{
fs::File,
io::{self, AsyncSeekExt, AsyncWriteExt},
io::{self, AsyncWriteExt},
};

use err::StorageEngineError::*;
Expand All @@ -59,7 +59,7 @@ use crate::{
consts::{SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8},
err::{self, StorageEngineError},
};
const BLOCK_SIZE: usize = 8 * 1024; // 4KB
const BLOCK_SIZE: usize = 64 * 1024; // 4KB

#[derive(Debug, Clone)]
pub struct Block {
Expand Down Expand Up @@ -129,7 +129,7 @@ impl Block {
}

pub async fn write_to_file(&self, file: &mut File) -> Result<(), StorageEngineError> {
for entry in &self.data {
for entry in &self.data {
let entry_len = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8;
let mut entry_vec = Vec::with_capacity(entry_len);

Expand Down
18 changes: 17 additions & 1 deletion src/bloom_filter/bf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use bit_vec::BitVec;
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
path::{self, PathBuf},
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
Expand Down Expand Up @@ -67,8 +68,23 @@ impl BloomFilter {
self.sstable_path = Some(sstable_path);
}

pub fn filter_by_sstable_paths<'a>(
bloom_filters: &'a Vec<BloomFilter>,
paths: Vec<&'a PathBuf>,
) -> Vec<&'a BloomFilter> {
let mut filtered_bfs = Vec::new();
paths.into_iter().for_each(|p| {
bloom_filters.iter().for_each(|b| {
if b.get_sstable_path().data_file_path.as_path() == p.as_path() {
filtered_bfs.push(b)
}
})
});
filtered_bfs
}

pub fn get_sstable_paths_that_contains_key<T: Hash>(
bloom_filters: &Vec<BloomFilter>,
bloom_filters: Vec<&BloomFilter>,
key: &T,
) -> Option<Vec<SSTablePath>> {
let mut sstables: Vec<SSTablePath> = Vec::new();
Expand Down
1 change: 1 addition & 0 deletions src/compaction/bucket_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use StorageEngineError::*;
pub trait IndexWithSizeInBytes {
fn get_index(&self) -> Arc<SkipMap<Vec<u8>, (usize, u64, bool)>>; // usize for value offset, u64 to store entry creation date in milliseconds
fn size(&self) -> usize;
fn find_biggest_key_from_table(&self) -> Result<Vec<u8>, StorageEngineError>;
}

impl Bucket {
Expand Down
42 changes: 34 additions & 8 deletions src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
bloom_filter::BloomFilter,
consts::TOMB_STONE_TTL,
err::StorageEngineError,
key_offseter::TableBiggestKeys,
memtable::Entry,
sstable::{SSTable, SSTablePath},
};
Expand Down Expand Up @@ -53,6 +54,7 @@ impl Compactor {
&mut self,
buckets: &mut BucketMap,
bloom_filters: &mut Vec<BloomFilter>,
biggest_key_index: &mut TableBiggestKeys,
) -> Result<bool, StorageEngineError> {
let mut number_of_compactions = 0;
// The compaction loop will keep running until there
Expand Down Expand Up @@ -92,26 +94,43 @@ impl Compactor {
sst_file_path.data_file_path, sst_file_path.index_file_path
);
// Step 4: Map this bloom filter to its sstable file path
let sstable_data_file_path = sst_file_path.get_data_file_path();
m.bloom_filter.set_sstable_path(sst_file_path);

// Step 5: Store the bloom filter in the bloom filters vector
bloom_filters.push(m.bloom_filter);
let biggest_key = m.sstable.find_biggest_key()?;
if biggest_key.is_empty() {
return Err(BiggestKeyIndexError);
}
biggest_key_index.set(sstable_data_file_path, biggest_key);
actual_number_of_sstables_written_to_disk += 1;
}
Err(err) => {
// Step 6: Trigger recovery in case compaction failed at any point

// Ensure that bloom filter is restored to the previous state by removing entries added so far in
// the compaction process and also remove merged sstables written to disk so far to prevent unstable state
while let Some(bf) = bloom_filters.pop() {
match fs::remove_dir_all(bf.get_sstable_path().dir.clone())
.await
{
Ok(()) => info!("Stale SSTable File successfully deleted."),
Err(e) => error!("Stale SSTable File not deleted. {}", e),
while actual_number_of_sstables_written_to_disk > 0 {
if let Some(bf) = bloom_filters.pop() {
match fs::remove_dir_all(bf.get_sstable_path().dir.clone())
.await
{
Ok(()) => {
biggest_key_index.remove(
bf.get_sstable_path().data_file_path.clone(),
);
info!("Stale SSTable File successfully deleted.")
}
Err(e) => {
error!("Stale SSTable File not deleted. {}", e)
}
}
}

actual_number_of_sstables_written_to_disk -= 1;
}

error!("merged SSTable was not written to disk {}", err);
return Err(CompactionFailed(err.to_string()));
}
Expand All @@ -130,6 +149,7 @@ impl Compactor {
buckets,
&sstables_files_to_remove,
bloom_filters,
biggest_key_index,
);
match bloom_filter_updated_opt.await {
Some(is_bloom_filter_updated) => {
Expand Down Expand Up @@ -159,9 +179,15 @@ impl Compactor {
buckets: &mut BucketMap,
sstables_to_delete: &Vec<(Uuid, Vec<SSTablePath>)>,
bloom_filters_with_both_old_and_new_sstables: &mut Vec<BloomFilter>,
biggest_key_index: &mut TableBiggestKeys,
) -> Option<bool> {
// Remove obsolete keys from biggest keys index
sstables_to_delete.iter().for_each(|(_, sstables)| {
sstables.iter().for_each(|s| {
biggest_key_index.remove(s.get_data_file_path());
})
});
let all_sstables_deleted = buckets.delete_sstables(sstables_to_delete).await;

// if all sstables were not deleted then don't remove the associated bloom filters
// although this can lead to redundancy bloom filters are in-memory and its also less costly
// since keys are represented in bits
Expand Down Expand Up @@ -217,7 +243,7 @@ impl Compactor {
)
.await
.map_err(|err| CompactionFailed(err.to_string()))?;

match sst_opt {
Some(sst) => {
merged_sstable = self
Expand Down
2 changes: 1 addition & 1 deletion src/consts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::storage_engine::SizeUnit;

pub const GC_THREAD_COUNT: u32 = 5;

pub const DEFAULT_MEMTABLE_CAPACITY: usize = SizeUnit::Kilobytes.to_bytes(30);
pub const DEFAULT_MEMTABLE_CAPACITY: usize = SizeUnit::Megabytes.to_bytes(1);

pub const DEFAULT_FALSE_POSITIVE_RATE: f64 = 1e-200;

Expand Down
12 changes: 9 additions & 3 deletions src/err/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,17 @@ pub enum StorageEngineError {
#[error("Index file write error")]
IndexFileWriteError(#[source] io::Error),

/// Error while reading from index file
#[error("Index file read error")]
IndexFileReadError(#[source] io::Error),
/// Error while reading from index file
#[error("Index file read error")]
IndexFileReadError(#[source] io::Error),

/// Error while flushing write to disk for index file
#[error("Index file flush error")]
IndexFileFlushError(#[source] io::Error),

#[error("Error finding biggest key in memtable (None was returned)")]
BiggestKeyIndexError,

#[error("All bloom filters return false for all sstables")]
KeyNotFoundByAnyBloomFilterError,
}
33 changes: 33 additions & 0 deletions src/key_offseter/key_offseter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::{cmp::Ordering, collections::HashMap, path::PathBuf};

#[derive(Clone, Debug)]
pub struct TableBiggestKeys {
pub sstables: HashMap<PathBuf, Vec<u8>>,
}
impl TableBiggestKeys {
pub fn new() -> Self {
Self {
sstables: HashMap::new(),
}
}

pub fn set(&mut self, sst_path: PathBuf, biggest_key: Vec<u8>) -> bool {
self.sstables.insert(sst_path, biggest_key).is_some()
}

pub fn remove(&mut self, sst_path: PathBuf) -> bool {
self.sstables.remove(&sst_path).is_some()
}

// Returns SSTables whose last key is greater than the supplied key parameter
pub fn filter_sstables_by_biggest_key(&self, key: &Vec<u8>) -> Vec<&PathBuf> {
self.sstables
.iter()
.filter(|(_, key_prefix)| {
key_prefix.as_slice().cmp(key) == Ordering::Greater
|| key_prefix.as_slice().cmp(key) == Ordering::Equal
})
.map(|(path, _)| return path)
.collect()
}
}
2 changes: 2 additions & 0 deletions src/key_offseter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod key_offseter;
pub use key_offseter::TableBiggestKeys;
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pub mod storage_engine;
pub mod value_log;
pub mod block;
pub mod garbage_collector;
pub mod sparse_index;
pub mod sparse_index;
pub mod key_offseter;
33 changes: 25 additions & 8 deletions src/memtable/inmemory.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::bloom_filter::BloomFilter;
use crate::compaction::IndexWithSizeInBytes;
use crate::consts::{DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MEMTABLE_CAPACITY};
use crate::consts::{
DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MEMTABLE_CAPACITY, SIZE_OF_U32, SIZE_OF_U64, SIZE_OF_U8,
};
use crate::err::StorageEngineError;
//use crate::memtable::val_option::ValueOption;
use crate::storage_engine::SizeUnit;
use chrono::{DateTime, Utc};
use crossbeam_skiplist::SkipMap;

use std::cmp;
use StorageEngineError::*;

use std::{hash::Hash, sync::Arc};

Expand Down Expand Up @@ -36,6 +38,9 @@ impl IndexWithSizeInBytes for InMemoryTable<Vec<u8>> {
fn size(&self) -> usize {
self.size
}
fn find_biggest_key_from_table(&self) -> Result<Vec<u8>, StorageEngineError> {
self.find_biggest_key()
}
}

impl Entry<Vec<u8>, usize> {
Expand All @@ -48,7 +53,6 @@ impl Entry<Vec<u8>, usize> {
}
}


pub fn has_expired(&self, ttl: u64) -> bool {
// Current time
let current_time = Utc::now();
Expand Down Expand Up @@ -104,7 +108,7 @@ impl InMemoryTable<Vec<u8>> {

// key length + value offset length + date created length
// it takes 4 bytes to store a 32 bit integer ande 1 byte for tombstone checker
let entry_length_byte = entry.key.len() + 4 + 8 + 1;
let entry_length_byte = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8;
self.size += entry_length_byte;
return Ok(());
}
Expand All @@ -115,7 +119,7 @@ impl InMemoryTable<Vec<u8>> {
);
// key length + value offset length + date created length
// it takes 4 bytes to store a 32 bit integer since 8 bits makes 1 byte
let entry_length_byte = entry.key.len() + 4 + 8 + 1;
let entry_length_byte = entry.key.len() + SIZE_OF_U32 + SIZE_OF_U64 + SIZE_OF_U8;
self.size += entry_length_byte;
Ok(())
}
Expand All @@ -131,7 +135,7 @@ impl InMemoryTable<Vec<u8>> {

pub fn update(&mut self, entry: &Entry<Vec<u8>, usize>) -> Result<(), StorageEngineError> {
if !self.bloom_filter.contains(&entry.key) {
return Err(StorageEngineError::KeyNotFoundInMemTable);
return Err(KeyNotFoundInMemTable);
}
// If the key already exist in the bloom filter then just insert into the entry alone
self.index.insert(
Expand All @@ -147,16 +151,29 @@ impl InMemoryTable<Vec<u8>> {

pub fn delete(&mut self, entry: &Entry<Vec<u8>, usize>) -> Result<(), StorageEngineError> {
if !self.bloom_filter.contains(&entry.key) {
return Err(StorageEngineError::KeyNotFoundInMemTable);
return Err(KeyNotFoundInMemTable);
}
let created_at = Utc::now();
// Insert thumb stone to indicate deletion
self.index.insert(
entry.key.to_vec(),
(entry.val_offset, created_at.timestamp_millis() as u64, entry.is_tombstone),
(
entry.val_offset,
created_at.timestamp_millis() as u64,
entry.is_tombstone,
),
);
Ok(())
}

// Find the biggest element in the skip list
pub fn find_biggest_key(&self) -> Result<Vec<u8>, StorageEngineError> {
let largest_entry = self.index.iter().next_back();
match largest_entry {
Some(e) => return Ok(e.key().to_vec()),
None => Err(BiggestKeyIndexError),
}
}
pub fn false_positive_rate(&mut self) -> f64 {
self.false_positive_rate
}
Expand Down
12 changes: 11 additions & 1 deletion src/sstable/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ impl IndexWithSizeInBytes for SSTable {
fn size(&self) -> usize {
self.size
}
fn find_biggest_key_from_table(&self) -> Result<Vec<u8>, StorageEngineError>{
self.find_biggest_key()
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -134,7 +137,14 @@ impl SSTable {
created_at: created_at.timestamp_millis() as u64,
}
}

// Find the biggest element in the skip list
pub fn find_biggest_key(&self) -> Result<Vec<u8>, StorageEngineError> {
let largest_entry = self.index.iter().next_back();
match largest_entry {
Some(e) => return Ok(e.key().to_vec()),
None => Err(BiggestKeyIndexError),
}
}
pub(crate) async fn write_to_file(&self) -> Result<(), StorageEngineError> {
// Open the file in write mode with the append flag.
let data_file_path = &self.data_file_path;
Expand Down
Loading

0 comments on commit af75726

Please sign in to comment.