Skip to content

Commit

Permalink
Merge pull request #49 from Gifted-s/ft/refactor-v1
Browse files Browse the repository at this point in the history
Migrate read-only memtable structure to lock-free skipmap from crossbeam
  • Loading branch information
Gifted-s authored Jul 8, 2024
2 parents dfae212 + 4e4fec3 commit 0b2bc94
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 58 deletions.
15 changes: 9 additions & 6 deletions src/consts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@

use std::time::Duration;

use crate::db::SizeUnit;

pub const KB: usize = 1024;
Expand Down Expand Up @@ -47,23 +50,23 @@ pub static GC_CHUNK_SIZE: usize = SizeUnit::Kilobytes.to_bytes(1);
pub const WRITE_BUFFER_SIZE: usize = SizeUnit::Kilobytes.to_bytes(50);

/// 5 days
pub const DEFAULT_TOMBSTONE_COMPACTION_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5 * 86400000);
pub const DEFAULT_TOMBSTONE_COMPACTION_INTERVAL: Duration = Duration::from_millis(5 * 86400000);

// 1 Hour
pub const DEFAULT_COMPACTION_INTERVAL: std::time::Duration = std::time::Duration::from_millis(1000 * 60 * 60);
pub const DEFAULT_COMPACTION_INTERVAL: Duration = Duration::from_millis(1000 * 60 * 60);

/// 5 Min
pub const DEFAULT_COMPACTION_FLUSH_LISTNER_INTERVAL: std::time::Duration = std::time::Duration::from_millis(5 * 1000 * 60);
pub const DEFAULT_COMPACTION_FLUSH_LISTNER_INTERVAL: Duration = Duration::from_millis(1000 * 60 * 5);

/// 10 hours
pub const DEFAULT_ONLINE_GC_INTERVAL: std::time::Duration = std::time::Duration::from_millis(10 * 1000 * 60 * 60);
pub const DEFAULT_ONLINE_GC_INTERVAL: Duration = Duration::from_millis(10 * 1000 * 60 * 60);

/// If entry TTL enabled, it is automatically deleted after 1 year
pub const ENTRY_TTL: std::time::Duration = std::time::Duration::from_millis(365 * 86400000);
pub const ENTRY_TTL: Duration = Duration::from_millis(365 * 86400000);

/// Tombstone should only be removed after 120 days to guarantee that obsolete data don't
/// resurrect by prematurelly deleting tombstone
pub const DEFAULT_TOMBSTONE_TTL: std::time::Duration = std::time::Duration::from_millis(120 * 86400000);
pub const DEFAULT_TOMBSTONE_TTL: Duration = Duration::from_millis(120 * 86400000);

pub const DEFAULT_ENABLE_TTL: bool = false;

Expand Down
10 changes: 5 additions & 5 deletions src/db/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl DataStore<'static, Key> {
let buckets = Arc::new(RwLock::new(buckets_map.to_owned()));
let filters = Arc::new(RwLock::new(filters));
let key_range = Arc::new(RwLock::new(key_range.to_owned()));
let read_only_memtables = Arc::new(RwLock::new(read_only_memtables));
let read_only_memtables = Arc::new(read_only_memtables);
let gc_table = Arc::new(RwLock::new(active_memtable.to_owned()));
let gc_log = Arc::new(RwLock::new(vlog.to_owned()));
let flusher = Flusher::new(
Expand Down Expand Up @@ -230,7 +230,7 @@ impl DataStore<'static, Key> {
vlog_path: P,
head_offset: usize,
) -> Result<(MemTable<Key>, ImmutableMemTablesLockFree<Key>), Error> {
let mut read_only_memtables: ImmutableMemTablesLockFree<Key> = IndexMap::new();
let read_only_memtables: ImmutableMemTablesLockFree<Key> = SkipMap::new();
let mut active_memtable = MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate);
let mut vlog = ValueLog::new(vlog_path.as_ref()).await?;
let mut most_recent_offset = head_offset;
Expand All @@ -247,7 +247,7 @@ impl DataStore<'static, Key> {
active_memtable.read_only = true;
read_only_memtables.insert(
MemTable::generate_table_id(),
Arc::new(RwLock::new(active_memtable.to_owned())),
Arc::new(active_memtable.to_owned()),
);
active_memtable =
MemTable::with_specified_capacity_and_rate(size_unit, capacity, false_positive_rate);
Expand Down Expand Up @@ -296,11 +296,11 @@ impl DataStore<'static, Key> {
active_memtable.insert(&head_entry.to_owned());
let buckets = BucketMap::new(buckets_path).await?;
let (flush_signal_tx, flush_signal_rx) = broadcast(DEFAULT_FLUSH_SIGNAL_CHANNEL_SIZE);
let read_only_memtables = IndexMap::new();
let read_only_memtables = SkipMap::new();
let filters = Arc::new(RwLock::new(Vec::new()));
let buckets = Arc::new(RwLock::new(buckets.to_owned()));
let key_range = Arc::new(RwLock::new(key_range));
let read_only_memtables = Arc::new(RwLock::new(read_only_memtables));
let read_only_memtables = Arc::new(read_only_memtables);
let gc_table = Arc::new(RwLock::new(active_memtable.to_owned()));
let gc_log = Arc::new(RwLock::new(vlog.to_owned()));
let flusher = Flusher::new(
Expand Down
57 changes: 25 additions & 32 deletions src/db/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ where
/// Flush listeners receives signals from this channel
pub(crate) flush_signal_rx: async_broadcast::Receiver<FlushSignal>,

/// Stores valid entries gotten from garbage collection but yet to be synced with
/// Stores valid entries gotten from garbage collection but yet to be synced with
/// memtable
pub(crate) gc_updated_entries: GCUpdatedEntries<Key>,

Expand Down Expand Up @@ -216,11 +216,11 @@ impl DataStore<'static, Key> {
Ok(true)
}

/// Moves active memtable to read-only memtables
/// Moves active memtable to read-only memtables
///
/// Marks the active memtable as read only,
/// updates store metadata and moves the memtable
/// to read-only memtables
/// to read-only memtables
pub(crate) async fn migrate_memtable_to_read_only(&mut self) {
let head_offset = self.active_memtable.get_most_recent_offset();

Expand All @@ -237,14 +237,12 @@ impl DataStore<'static, Key> {
self.active_memtable.insert(&head_entry);
self.active_memtable.mark_readonly();
self.update_meta_background();
self.read_only_memtables.write().await.insert(
MemTable::generate_table_id(),
Arc::new(RwLock::new(self.active_memtable.to_owned())),
);
self.read_only_memtables
.insert(MemTable::generate_table_id(), Arc::new(self.active_memtable.to_owned()));

if self.read_only_memtables.read().await.len() >= self.config.max_buffer_write_number {
if self.read_only_memtables.len() >= self.config.max_buffer_write_number {
// Background
let _ = self.flush_read_only_memtables().await;
let _ = self.flush_read_only_memtables();
}
self.reset_memtables();
}
Expand Down Expand Up @@ -298,24 +296,23 @@ impl DataStore<'static, Key> {
}

/// Flushes read-only memtable to disk using a background tokio task
pub(crate) async fn flush_read_only_memtables(&mut self) {
let tables = self.read_only_memtables.read().await;
for (table_id, table) in tables.iter() {
if self.flush_stream.contains(table_id) {
pub(crate) fn flush_read_only_memtables(&mut self) {
for table in self.read_only_memtables.iter() {
let key = table.key().to_owned();
let value = table.value().clone();
if self.flush_stream.contains(&key) {
continue;
}
let table_inner = Arc::clone(table);
let id = table_id.clone();
let mut flusher = self.flusher.clone();
let tx = self.flush_signal_tx.clone();
// NOTE: If the put method returns before the code inside tokio::spawn finishes executing,
// the tokio::spawn task will continue to run independently of the original function call.
// This is because tokio::spawn creates a new asynchronous task that is managed by the Tokio runtime.
// The spawned task is executed concurrently and its lifecycle is not tied to the function that spawned it.
// TODO: See if we can introduce semaphors to prevent overloading the system
self.flush_stream.insert(id.to_owned());
self.flush_stream.insert(key.to_vec());
tokio::spawn(async move {
flusher.flush_handler(id, table_inner, tx);
flusher.flush_handler(key, value, tx);
});
}
}
Expand Down Expand Up @@ -357,8 +354,8 @@ impl DataStore<'static, Key> {
self.get_value_from_vlog(value.val_offset, value.created_at).await
} else {
let mut is_deleted = false;
for (_, table) in self.read_only_memtables.read().await.iter() {
if let Some(value) = table.read().await.get(key.as_ref()) {
for table in self.read_only_memtables.iter() {
if let Some(value) = table.value().get(key.as_ref()) {
if value.created_at > insert_time {
offset = value.val_offset;
insert_time = value.created_at;
Expand Down Expand Up @@ -493,8 +490,6 @@ impl DataStore<'static, Key> {
Err(crate::err::Error::KeyNotFoundInValueLogError)
}



/// Flushes all memtable (active and read-only) to disk
///
///
Expand All @@ -503,30 +498,28 @@ impl DataStore<'static, Key> {
/// Returns error, if an IO error occurs or key was not found
#[cfg(test)]
pub(crate) async fn flush_all_memtables(&mut self) -> Result<(), crate::err::Error> {
use indexmap::IndexMap;
use crossbeam_skiplist::SkipMap;

self.active_memtable.read_only = true;

self.read_only_memtables.write().await.insert(
MemTable::generate_table_id(),
Arc::new(RwLock::new(self.active_memtable.to_owned())),
);
let immutable_tables = self.read_only_memtables.read().await.to_owned();
self.read_only_memtables
.insert(MemTable::generate_table_id(), Arc::new(self.active_memtable.to_owned()));
let immutable_tables = self.read_only_memtables.to_owned();
let mut flusher = Flusher::new(
Arc::clone(&self.read_only_memtables),
Arc::clone(&self.buckets),
Arc::clone(&self.filters),
Arc::clone(&self.key_range),
);
for (id, table) in immutable_tables.iter() {
if self.flush_stream.contains(id) {
for table in immutable_tables.iter() {
if self.flush_stream.contains(table.key()) {
continue;
}
self.flush_stream.insert(id.to_vec());
flusher.flush(table.to_owned()).await?;
self.flush_stream.insert(table.key().to_vec());
flusher.flush(table.value().to_owned()).await?;
}
self.active_memtable.clear();
self.read_only_memtables = Arc::new(RwLock::new(IndexMap::new()));
self.read_only_memtables = Arc::new(SkipMap::new());
Ok(())
}

Expand Down
12 changes: 4 additions & 8 deletions src/flush/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use crate::types::{self, BloomFilterHandle, BucketMapHandle, FlushSignal, Immuta
use crate::{err::Error, memtable::MemTable};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::sync::RwLock;

type K = types::Key;
pub type InActiveMemtable = Arc<RwLock<MemTable<K>>>;
pub type InActiveMemtable = Arc<MemTable<K>>;

/// Responsible for flushing memtables to disk
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -41,13 +40,13 @@ impl Flusher {
/// `KeyRange` with the new sstable
pub async fn flush(&mut self, table: InActiveMemtable) -> Result<(), Error> {
let flush_data = self;
let table_reader = table.read().await;
let table_reader = table;
if table_reader.entries.is_empty() {
return Err(Error::FailedToInsertToBucket("Cannot flush an empty table".to_string()));
}
let mut bucket_lock = flush_data.bucket_map.write().await;
let sst = bucket_lock
.insert_to_appropriate_bucket(Arc::new(Box::new(table_reader.to_owned())))
.insert_to_appropriate_bucket(Arc::new(Box::new(table_reader.as_ref().to_owned())))
.await?;
drop(table_reader);
if sst.summary.is_none() {
Expand Down Expand Up @@ -89,10 +88,7 @@ impl Flusher {
let mut flusher = Flusher::new(read_only_memtable.clone(), buckets, filters, key_range);
match flusher.flush(table_to_flush).await {
Ok(_) => {
read_only_memtable
.write()
.await
.shift_remove(&table_id.as_ref().to_vec());
read_only_memtable.remove(&table_id.as_ref().to_vec());
if let Err(err) = tx.try_broadcast(FLUSH_SIGNAL) {
match err {
async_broadcast::TrySendError::Full(_) => {
Expand Down
4 changes: 2 additions & 2 deletions src/gc/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,8 +437,8 @@ impl GC {
} else {
// Step 2: Check the read-only memtables
let mut is_deleted = false;
for (_, table) in read_only_memtables.read().await.iter() {
if let Some(value) = table.read().await.get(&key) {
for table in read_only_memtables.iter() {
if let Some(value) = table.value().get(&key) {
if value.created_at > insert_time {
offset = value.val_offset;
insert_time = value.created_at;
Expand Down
2 changes: 1 addition & 1 deletion src/tests/store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mod tests {
async fn datastore_test_put_and_get() {
setup();
let root = tempdir().unwrap();
let path = PathBuf::from(root.path().join("store_test_3"));
let path = PathBuf::new().join(root.path().join("store_test_3"));
let store = DataStore::open_without_background("test", path).await.unwrap();
let workload_size = 50000;
let key_len = 5;
Expand Down
7 changes: 3 additions & 4 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use crate::{
};
use chrono::{DateTime, Utc};
use crossbeam_skiplist::SkipMap;
use indexmap::IndexMap;
use std::sync::Arc;
use tokio::sync::RwLock;

/// Contains type aliases to help with readability
/// Represents a key
/// Represents a key
pub type Key = Vec<u8>;

/// Represents a value
Expand Down Expand Up @@ -51,10 +50,10 @@ pub type BloomFilterHandle = Arc<RwLock<Vec<BloomFilter>>>;
pub type KeyRangeHandle = Arc<RwLock<KeyRange>>;

/// Represents read-only MemTables
pub type ImmutableMemTables<K> = Arc<RwLock<IndexMap<K, Arc<RwLock<MemTable<K>>>>>>;
pub type ImmutableMemTables<K> = Arc<SkipMap<K, Arc<MemTable<K>>>>;

/// Represents read-only memtables without lock
pub type ImmutableMemTablesLockFree<K> = IndexMap<MemtableId, Arc<RwLock<MemTable<K>>>>;
pub type ImmutableMemTablesLockFree<K> = SkipMap<MemtableId, Arc<MemTable<K>>>;

/// Alias for a boolean value
pub type Bool = bool;
Expand Down

0 comments on commit 0b2bc94

Please sign in to comment.