Skip to content

Commit

Permalink
[Feat] CreateOrRecoverStoreParams to construct handle_empty_vlog and …
Browse files Browse the repository at this point in the history
…recover params
  • Loading branch information
Gifted-s committed Oct 21, 2024
1 parent 590cf37 commit 9aa9f59
Show file tree
Hide file tree
Showing 34 changed files with 559 additions and 246 deletions.
15 changes: 12 additions & 3 deletions examples/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,19 @@ async fn main() {
let entry7 = store.get("***not_found_key**").await.unwrap();

assert_eq!(std::str::from_utf8(&entry1.unwrap().val).unwrap(), "tim cook");
assert_eq!(std::str::from_utf8(&entry2.unwrap().val).unwrap(), "sundar pichai");
assert_eq!(
std::str::from_utf8(&entry2.unwrap().val).unwrap(),
"sundar pichai"
);
assert_eq!(std::str::from_utf8(&entry3.unwrap().val).unwrap(), "jensen huang");
assert_eq!(std::str::from_utf8(&entry4.unwrap().val).unwrap(), "satya nadella");
assert_eq!(std::str::from_utf8(&entry5.unwrap().val).unwrap(), "mark zuckerberg");
assert_eq!(
std::str::from_utf8(&entry4.unwrap().val).unwrap(),
"satya nadella"
);
assert_eq!(
std::str::from_utf8(&entry5.unwrap().val).unwrap(),
"mark zuckerberg"
);
assert_eq!(std::str::from_utf8(&entry6.unwrap().val).unwrap(), "sam altman");
assert!(entry7.is_none())
}
8 changes: 0 additions & 8 deletions learnings.txt

This file was deleted.

9 changes: 7 additions & 2 deletions src/bucket/bucket_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::consts::{BUCKET_DIRECTORY_PREFIX, BUCKET_HIGH, BUCKET_LOW, MAX_TRESHOLD, MIN_SSTABLE_SIZE, MIN_TRESHOLD};
use crate::consts::{
BUCKET_DIRECTORY_PREFIX, BUCKET_HIGH, BUCKET_LOW, MAX_TRESHOLD, MIN_SSTABLE_SIZE, MIN_TRESHOLD,
};
use crate::err::Error;
use crate::filter::BloomFilter;
use crate::fs::{FileAsync, FileNode};
Expand Down Expand Up @@ -115,7 +117,10 @@ impl Bucket {
.iter()
.map(|s| tokio::spawn(fs::metadata(s.data_file.path.clone())));
for meta_task in fetch_files_meta {
let meta_data = meta_task.await.map_err(|err| GetFileMetaData(err.into()))?.unwrap();
let meta_data = meta_task
.await
.map_err(|err| GetFileMetaData(err.into()))?
.unwrap();
size += meta_data.len() as usize;
}
Ok(size / ssts.len() as u64 as usize)
Expand Down
30 changes: 21 additions & 9 deletions src/cfg/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::{
compactors,
consts::{
DEFAULT_ALLOW_PREFETCH, DEFAULT_COMPACTION_FLUSH_LISTNER_INTERVAL, DEFAULT_COMPACTION_INTERVAL,
DEFAULT_ENABLE_TTL, DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MAX_WRITE_BUFFER_NUMBER, DEFAULT_ONLINE_GC_INTERVAL,
DEFAULT_PREFETCH_SIZE, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL, DEFAULT_TOMBSTONE_TTL, ENTRY_TTL, GC_CHUNK_SIZE,
WRITE_BUFFER_SIZE,
DEFAULT_ENABLE_TTL, DEFAULT_FALSE_POSITIVE_RATE, DEFAULT_MAX_WRITE_BUFFER_NUMBER,
DEFAULT_ONLINE_GC_INTERVAL, DEFAULT_PREFETCH_SIZE, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL,
DEFAULT_TOMBSTONE_TTL, ENTRY_TTL, GC_CHUNK_SIZE, WRITE_BUFFER_SIZE,
},
};
use crate::{
Expand Down Expand Up @@ -74,7 +74,6 @@ fn get_open_file_limit() -> usize {
return 150;
}


impl Default for Config {
fn default() -> Self {
Config {
Expand Down Expand Up @@ -126,7 +125,10 @@ impl DataStore<'static, Key> {
/// Sets the write buffer size in kilobytes.
/// The size must be at least 50 kilobytes.
pub fn with_write_buffer_size(mut self, size: usize) -> Self {
assert!(size >= 50, "write_buffer_size should not be less than 50 Kilobytes");
assert!(
size >= 50,
"write_buffer_size should not be less than 50 Kilobytes"
);
self.config.write_buffer_size = SizeUnit::Kilobytes.as_bytes(size);
self
}
Expand Down Expand Up @@ -358,7 +360,9 @@ mod tests {
#[tokio::test]
async fn test_with_tombstone_ttl() {
let ds = create_datastore();
let ds = ds.await.with_tombstone_ttl(Duration::from_secs(15 * 24 * 60 * 60)); // 15 days
let ds = ds
.await
.with_tombstone_ttl(Duration::from_secs(15 * 24 * 60 * 60)); // 15 days
assert_eq!(ds.config.tombstone_ttl, Duration::from_secs(15 * 24 * 60 * 60));
}

Expand All @@ -375,11 +379,16 @@ mod tests {
async fn test_with_compactor_flush_listener_interval() {
let ds = create_datastore().await;
let ds = ds.with_compactor_flush_listener_interval(Duration::from_secs(3 * 60)); // 3 minutes
assert_eq!(ds.config.compactor_flush_listener_interval, Duration::from_secs(3 * 60));
assert_eq!(
ds.config.compactor_flush_listener_interval,
Duration::from_secs(3 * 60)
);
}

#[tokio::test]
#[should_panic(expected = "background_compaction_interval should not be less than 5 minutes to prevent overloads")]
#[should_panic(
expected = "background_compaction_interval should not be less than 5 minutes to prevent overloads"
)]
async fn test_with_background_compaction_interval_invalid() {
let ds = create_datastore().await;
ds.with_background_compaction_interval(Duration::from_secs(4 * 60)); // 4 minutes
Expand All @@ -389,7 +398,10 @@ mod tests {
async fn test_with_background_compaction_interval() {
let ds = create_datastore().await;
let ds = ds.with_background_compaction_interval(Duration::from_secs(6 * 60)); // 6 minutes
assert_eq!(ds.config.background_compaction_interval, Duration::from_secs(6 * 60));
assert_eq!(
ds.config.background_compaction_interval,
Duration::from_secs(6 * 60)
);
}

#[tokio::test]
Expand Down
16 changes: 12 additions & 4 deletions src/compactors/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,10 @@ pub struct MergedSSTable {
impl Clone for MergedSSTable {
fn clone(&self) -> Self {
Self {
sstable: Box::new(super::TableInsertor::from(self.sstable.get_entries(), &self.filter)),
sstable: Box::new(super::TableInsertor::from(
self.sstable.get_entries(),
&self.filter,
)),
hotness: self.hotness,
filter: self.filter.clone(),
}
Expand Down Expand Up @@ -249,7 +252,8 @@ impl Compactor {
*state = CompState::Active;
drop(state);
if let Err(err) =
Compactor::handle_compaction(Arc::clone(&bucket_map), Arc::clone(&key_range), &cfg).await
Compactor::handle_compaction(Arc::clone(&bucket_map), Arc::clone(&key_range), &cfg)
.await
{
log::info!("{}", Error::CompactionFailed(Box::new(err)));
continue;
Expand Down Expand Up @@ -292,7 +296,8 @@ impl Compactor {
) -> Result<(), Error> {
match cfg.strategy {
Strategy::STCS => {
let mut runner = super::sized::SizedTierRunner::new(Arc::clone(&buckets), Arc::clone(&key_range), cfg);
let mut runner =
super::sized::SizedTierRunner::new(Arc::clone(&buckets), Arc::clone(&key_range), cfg);
runner.run_compaction().await
} // LCS, UCS and TWS will be added later
}
Expand Down Expand Up @@ -336,7 +341,10 @@ mod tests {
assert_eq!(compactor.config.use_ttl, use_ttl);
assert_eq!(compactor.config.entry_ttl, ttl.entry_ttl);
assert_eq!(compactor.config.tombstone_ttl, ttl.tombstone_ttl);
assert_eq!(compactor.config.background_interval, intervals.background_interval);
assert_eq!(
compactor.config.background_interval,
intervals.background_interval
);
assert_eq!(
compactor.config.flush_listener_interval,
intervals.flush_listener_interval
Expand Down
6 changes: 3 additions & 3 deletions src/compactors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ mod sized;
pub use compact::CompState;
pub use compact::CompactionReason;
pub use compact::Compactor;
pub use compact::Config;
pub use compact::IntervalParams;
pub use compact::MergedSSTable;
pub use compact::Strategy;
pub use compact::IntervalParams;
pub use compact::TtlParams;
pub use sized::SizedTierRunner;
pub use compact::Config;
pub use insertor::TableInsertor;
pub use sized::SizedTierRunner;
16 changes: 13 additions & 3 deletions src/compactors/sized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ pub struct SizedTierRunner<'a> {

impl<'a> SizedTierRunner<'a> {
/// creates new instance of `SizedTierRunner`
pub fn new(bucket_map: BucketMapHandle, key_range: KeyRangeHandle, config: &'a Config) -> SizedTierRunner<'a> {
pub fn new(
bucket_map: BucketMapHandle,
key_range: KeyRangeHandle,
config: &'a Config,
) -> SizedTierRunner<'a> {
Self {
tombstones: HashMap::new(),
bucket_map,
Expand Down Expand Up @@ -111,7 +115,9 @@ impl<'a> SizedTierRunner<'a> {
.await;
match clean_up_successful {
Ok(None) => {
return Err(Error::CompactionPartiallyFailed(Box::new(CompactionCleanupPartial)));
return Err(Error::CompactionPartiallyFailed(Box::new(
CompactionCleanupPartial,
)));
}
Err(err) => {
return Err(Error::CompactionCleanup(Box::new(err)));
Expand Down Expand Up @@ -281,7 +287,11 @@ impl<'a> SizedTierRunner<'a> {
/// and prevented from being inserted
///
/// Returns true if entry should be inserted or false otherwise
pub(crate) fn tombstone_check(&mut self, entry: &Entry<Key, usize>, merged_entries: &mut Vec<Entry<Key, usize>>) {
pub(crate) fn tombstone_check(
&mut self,
entry: &Entry<Key, usize>,
merged_entries: &mut Vec<Entry<Key, usize>>,
) {
let mut should_insert = false;
if self.tombstones.contains_key(&entry.key) {
let tomb_insert_time = *self.tombstones.get(&entry.key).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/consts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub const DEFAULT_PREFETCH_SIZE: usize = 10;
pub const EOF: &str = "EOF";

pub const HEAD_ENTRY_KEY: &[u8; 4] = b"head";

pub const HEAD_KEY_SIZE: usize = 4; // "head"

pub const TAIL_ENTRY_KEY: &[u8; 4] = b"tail";
Expand Down
92 changes: 67 additions & 25 deletions src/db/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::bucket::{Bucket, BucketID, BucketMap};
use crate::cfg::Config;
use crate::compactors::{self, Compactor, IntervalParams, TtlParams};
use crate::consts::{
DEFAULT_DB_NAME, DEFAULT_FLUSH_SIGNAL_CHANNEL_SIZE, HEAD_ENTRY_KEY, HEAD_ENTRY_VALUE, SIZE_OF_U32, SIZE_OF_U64,
SIZE_OF_U8, TAIL_ENTRY_KEY, TAIL_ENTRY_VALUE,
DEFAULT_DB_NAME, DEFAULT_FLUSH_SIGNAL_CHANNEL_SIZE, HEAD_ENTRY_KEY, HEAD_ENTRY_VALUE, SIZE_OF_U32,
SIZE_OF_U64, SIZE_OF_U8, TAIL_ENTRY_KEY, TAIL_ENTRY_VALUE,
};
use crate::err::Error;
use crate::err::Error::*;
Expand All @@ -31,21 +31,36 @@ use std::sync::Arc;
use tokio::fs::read_dir;
use tokio::sync::RwLock;

/// Parameters to create an empty ['DataStore'] or recover exisiting one from ['ValueLog']
pub struct CreateOrRecoverStoreParams<'a, P> {
pub dir: DirPath,
pub buckets_path: P,
pub vlog: ValueLog,
pub key_range: KeyRange,
pub config: &'a Config,
pub size_unit: SizeUnit,
pub meta: Meta,
}

impl DataStore<'static, Key> {
/// Recovers [`DataStore`] state after crash
///
/// Errors
///
/// Returns error incase there is an IO error
pub async fn recover<P: AsRef<Path> + Send + Sync + Clone>(
dir: DirPath,
buckets_path: P,
mut vlog: ValueLog,
key_range: KeyRange,
config: &Config,
size_unit: SizeUnit,
mut meta: Meta,
params: CreateOrRecoverStoreParams<'_, P>,
) -> Result<DataStore<'static, Key>, Error> {
let (buckets_path, dir, mut vlog, key_range, config, size_unit, mut meta) = (
params.buckets_path,
params.dir,
params.vlog,
params.key_range,
params.config,
params.size_unit,
params.meta,
);

let mut recovered_buckets: IndexMap<BucketID, Bucket> = IndexMap::new();
// Get bucket diretories streams
let mut buckets_stream = open_dir_stream!(buckets_path.as_ref().to_path_buf());
Expand Down Expand Up @@ -105,12 +120,18 @@ impl DataStore<'static, Key> {
if let Some(b) = recovered_buckets.get(&bucket_uuid) {
let temp_sstables = b.sstables.clone();
temp_sstables.write().await.push(table.clone());
let updated_bucket =
Bucket::from(bucket_dir.path(), bucket_uuid, temp_sstables.read().await.clone(), 0).await?;
let updated_bucket = Bucket::from(
bucket_dir.path(),
bucket_uuid,
temp_sstables.read().await.clone(),
0,
)
.await?;
recovered_buckets.insert(bucket_uuid, updated_bucket);
} else {
// Create new bucket
let updated_bucket = Bucket::from(bucket_dir.path(), bucket_uuid, vec![table.clone()], 0).await?;
let updated_bucket =
Bucket::from(bucket_dir.path(), bucket_uuid, vec![table.clone()], 0).await?;
recovered_buckets.insert(bucket_uuid, updated_bucket);
}

Expand Down Expand Up @@ -230,7 +251,8 @@ impl DataStore<'static, Key> {
head_offset: usize,
) -> Result<(MemTable<Key>, ImmutableMemTablesLockFree<Key>), Error> {
let read_only_memtables: ImmutableMemTablesLockFree<Key> = SkipMap::new();
let mut active_memtable = MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate);
let mut active_memtable =
MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate);
let mut vlog = ValueLog::new(vlog_path.as_ref()).await?;
let mut most_recent_offset = head_offset;
let entries = vlog.recover(head_offset).await?;
Expand All @@ -244,7 +266,10 @@ impl DataStore<'static, Key> {
if active_memtable.is_full(e.key.len()) {
// Make memtable read only
active_memtable.read_only = true;
read_only_memtables.insert(MemTable::generate_table_id(), Arc::new(active_memtable.to_owned()));
read_only_memtables.insert(
MemTable::generate_table_id(),
Arc::new(active_memtable.to_owned()),
);
active_memtable =
MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate);
}
Expand All @@ -264,24 +289,41 @@ impl DataStore<'static, Key> {
/// Creates new [`DataStore`]
/// Used in case there is no recovery needed
pub async fn handle_empty_vlog<P: AsRef<Path> + Send + Sync>(
dir: DirPath,
buckets_path: P,
mut vlog: ValueLog,
key_range: KeyRange,
config: &Config,
size_unit: SizeUnit,
meta: Meta,
params: CreateOrRecoverStoreParams<'_, P>,
) -> Result<DataStore<'static, Key>, Error> {
let mut active_memtable =
MemTable::with_specified_capacity_and_rate(size_unit, config.write_buffer_size, config.false_positive_rate);
let (buckets_path, dir, mut vlog, key_range, config, size_unit, meta) = (
params.buckets_path,
params.dir,
params.vlog,
params.key_range,
params.config,
params.size_unit,
params.meta,
);

let mut active_memtable = MemTable::with_specified_capacity_and_rate(
size_unit,
config.write_buffer_size,
config.false_positive_rate,
);
// if ValueLog is empty then we want to insert both tail and head
let created_at = Utc::now();
let tail_offset = vlog
.append(&TAIL_ENTRY_KEY.to_vec(), &TAIL_ENTRY_VALUE.to_vec(), created_at, false)
.append(
&TAIL_ENTRY_KEY.to_vec(),
&TAIL_ENTRY_VALUE.to_vec(),
created_at,
false,
)
.await?;
let tail_entry = Entry::new(TAIL_ENTRY_KEY.to_vec(), tail_offset, created_at, false);
let head_offset = vlog
.append(&HEAD_ENTRY_KEY.to_vec(), &HEAD_ENTRY_VALUE.to_vec(), created_at, false)
.append(
&HEAD_ENTRY_KEY.to_vec(),
&HEAD_ENTRY_VALUE.to_vec(),
created_at,
false,
)
.await?;
let head_entry = Entry::new(HEAD_ENTRY_KEY.to_vec(), head_offset, created_at, false);
vlog.set_head(head_offset);
Expand Down
Loading

0 comments on commit 9aa9f59

Please sign in to comment.