Skip to content

Commit

Permalink
[Feat] Block Index Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
Gifted-s committed Aug 6, 2024
1 parent b7c309f commit 9961e15
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 5 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,14 @@ According to the benchmarks presented in the WiscKey paper, implementations can
- **2.5x to 111x** for database loading
- **1.6x to 14x** for random lookups

## Addressing major concerns
- **Range Query**: Since keys are separate from values, won't that affect range queries performance. Well, we now how have internal parallelism in SSDs, as we fetch the keys from the LSM tree we can fetch the values in parallel from the vlog file. This [benchmark](https://github.com/Gifted-s/velarixdb/blob/main/bench.png) from the Wisckey Paper shows how for request size ≥ 64KB, the aggregate throughput of random reads with 32 threads matches the sequential read throughput.
- **More Disk IO for Reads**: Since keys are now seperate from values, we have to make extra disk IO to fetch values? Yes, but since the key density now increases for each level (since we are only storing keys and value offsets in the sstable), we will most likely search fewer levels compared to LevelDB or RocksDB for thesame query. A significant portion of the LSM tree can also be cached in memory.

## Designed for asynchronous runtime (unstable)
Based on the introduction and efficiency of async IO at the OS kernel level e.g **io_uring** for the Linux kernel, VelarixDB is designed for asynchronous runtime. In this case Tokio runtime.
Tokio allows for efficient and scalable asynchronous operations, making the most of modern multi-core processors. Frankly, most OS File System does not provide async API currently but Tokio uses a thread pool to offload blocking file system operations.
This means that even though the file system operations themselves are blocking at the OS level, Tokio can handle them without blocking the main async task executor.
Tokio might adopt [io_uring](https://docs.rs/tokio/latest/tokio/fs/index.html#:~:text=Currently%2C%20Tokio%20will%20always%20use%20spawn_blocking%20on%20all%20platforms%2C%20but%20it%20may%20be%20changed%20to%20use%20asynchronous%20file%20system%20APIs%20such%20as%20io_uring%20in%20the%20future.) in the future
This means that even though the file system operations themselves are blocking at the OS level, Tokio can handle them without blocking the main async task executor. Tokio might adopt [io_uring](https://docs.rs/tokio/latest/tokio/fs/index.html#:~:text=Currently%2C%20Tokio%20will%20always%20use%20spawn_blocking%20on%20all%20platforms%2C%20but%20it%20may%20be%20changed%20to%20use%20asynchronous%20file%20system%20APIs%20such%20as%20io_uring%20in%20the%20future.) in the future. (We haven't benchmarked the async version therefore this is unstable and might be removed in future versions)


## Disclaimer
Expand Down
Binary file added bench.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified doc_logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
11 changes: 8 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@
//! According to the benchmarks presented in the WiscKey paper, implementations can outperform LevelDB and RocksDB by:
//! - **2.5x to 111x** for database loading
//! - **1.6x to 14x** for random lookups
//!
//!
//! ## Addressing major concerns
//! - **Range Query**: Since keys are separate from values, won't that affect range queries performance. Well, we now how have internal parallelism in SSDs, as we fetch the keys from the LSM tree we can fetch the values in parallel from the vlog file. This [benchmark](https://github.com/Gifted-s/velarixdb/blob/main/bench.png) from the Wisckey Paper shows how for request size ≥ 64KB, the aggregate throughput of random reads with 32 threads matches the sequential read throughput.
//! - **More Disk IO for Reads**: Since keys are now seperate from values, we have to make extra disk IO to fetch values? Yes, but since the key density now increases for each level (since we are only storing keys and value offsets in the sstable), we will most likely search fewer levels compared to LevelDB or RocksDB for thesame query. A significant portion of the LSM tree can also be cached in memory.
//!
//! ## Designed for asynchronous runtime (unstable)
//!
//! Based on the introduction and efficiency of async IO at the OS kernel level e.g **io_uring** for the Linux kernel, VelarixDB is designed for asynchronous runtime. In this case Tokio runtime.
//! Based on the introduction and efficiency of async IO at the OS kernel level e.g **io_uring** for the Linux kernel, the experimental version of Velarixdb is designed for asynchronous runtime. In this case Tokio runtime.
//! Tokio allows for efficient and scalable asynchronous operations, making the most of modern multi-core processors. Frankly, most OS File System does not provide async API currently but Tokio uses a thread pool to offload blocking file system operations.
//! This means that even though the file system operations themselves are blocking at the OS level, Tokio can handle them without blocking the main async task executor.
//! Tokio might adopt [io_uring](https://docs.rs/tokio/latest/tokio/fs/index.html#:~:text=Currently%2C%20Tokio%20will%20always%20use%20spawn_blocking%20on%20all%20platforms%2C%20but%20it%20may%20be%20changed%20to%20use%20asynchronous%20file%20system%20APIs%20such%20as%20io_uring%20in%20the%20future.) in the future
//! Tokio might adopt [io_uring](https://docs.rs/tokio/latest/tokio/fs/index.html#:~:text=Currently%2C%20Tokio%20will%20always%20use%20spawn_blocking%20on%20all%20platforms%2C%20but%20it%20may%20be%20changed%20to%20use%20asynchronous%20file%20system%20APIs%20such%20as%20io_uring%20in%20the%20future.) in the future,
//! (We haven't benchmarked the async version therefore this is unstable and might be removed in future versions)
//!
//! ## Disclaimer
//!
Expand Down
1 change: 1 addition & 0 deletions v2/src/sst/block_index/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod block_handle;
mod top_level;
mod writer;

use self::block_handle::KeyedBlockHandle;
use super::block::Block;
Expand Down
158 changes: 158 additions & 0 deletions v2/src/sst/block_index/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::{
serde::Serializable,
sst::{block::header::Header as BlockHeader, meta::SegmentId},
UserKey,
};

use super::{IndexBlock, KeyedBlockHandle};

use std::{
fs::File,
io::{BufReader, BufWriter, Seek, Write},
path::{Path, PathBuf},
};

/// This functions reads from a file and append the bytes read to sink (a writer)
fn pipe_file_into_writer<P: AsRef<Path>, W: Write>(src_path: P, sink: &mut W) -> crate::Result<()> {
let reader = File::open(src_path)?;
let mut reader = BufReader::new(reader);
std::io::copy(&mut reader, sink)?;
sink.flush()?;

Ok(())
}

pub struct Writer {
pub index_block_tmp_file_path: PathBuf,
file_pos: u64,

prev_pos: (u64, u64),
block_writer: Option<BufWriter<File>>,
block_size: u32,
block_counter: u32,
block_handles: Vec<KeyedBlockHandle>,
tli_pointers: Vec<KeyedBlockHandle>,
}

impl Writer {
pub fn new<P: AsRef<Path>>(segment_id: SegmentId, folder: P, block_size: u32) -> crate::Result<Self> {
let index_block_tmp_file_path = folder.as_ref().join(format!("tmp_ib{segment_id}"));

let writer = File::create(&index_block_tmp_file_path)?;
let block_writer = BufWriter::with_capacity(u16::MAX.into(), writer);

Ok(Self {
index_block_tmp_file_path,
file_pos: 0,
prev_pos: (0, 0),
block_writer: Some(block_writer),
block_counter: 0,
block_size,
block_handles: Vec::with_capacity(1_000),
tli_pointers: Vec::with_capacity(1_000),
})
}

pub fn write_block(&mut self) -> crate::Result<()> {
let mut block_writer = self.block_writer.as_mut().expect("should exist");
let (header, data) = IndexBlock::to_bytes(&self.block_handles, self.prev_pos.0)?;
header.serialize(&mut block_writer)?;

block_writer.write_all(&data)?;

let bytes_written = (BlockHeader::serialized_len() + data.len()) as u64;

let last = self.block_handles.last().expect("Chunk should not be empty");

let index_block_handle = KeyedBlockHandle {
end_key: last.end_key.clone(),
offset: self.file_pos,
};

self.tli_pointers.push(index_block_handle);

self.block_counter = 0;
self.file_pos += bytes_written;

self.prev_pos.0 = self.prev_pos.1;
self.prev_pos.1 += bytes_written;

self.block_handles.clear();

Ok(())
}

pub fn register_block(&mut self, start_key: UserKey, offset: u64) -> crate::Result<()> {
let block_handle_size = (start_key.len() + std::mem::size_of::<KeyedBlockHandle>()) as u32;

let block_handle = KeyedBlockHandle {
end_key: start_key,
offset,
};

self.block_handles.push(block_handle);

self.block_counter += block_handle_size;
if self.block_counter >= self.block_size {
self.write_block()?;
}
Ok(())
}

fn write_top_level_index(
&mut self,
block_file_writer: &mut BufWriter<File>,
file_offset: u64,
) -> crate::Result<u64> {
// we need to drop the writer so the file is closed
// so it can be replaced when using Windows
self.block_writer = None;

pipe_file_into_writer(&self.index_block_tmp_file_path, block_file_writer)?;

let tli_ptr = block_file_writer.stream_position()?;

// since we piped index block to block_file_writer and
// file_offset is the last offset of block_file_writer
// before the pipe, all item offset in top level index should
// be shifted by offset
for item in &mut self.tli_pointers {
item.offset += file_offset;
}

let (header, data) = IndexBlock::to_bytes(&self.tli_pointers, 0)?;
header.serialize(block_file_writer)?;
block_file_writer.write_all(&data)?;

let bytes_written = BlockHeader::serialized_len() + data.len();

block_file_writer.flush()?;
block_file_writer.get_mut().sync_all()?;

log::trace!(
"Written top level index, with {} pointers ({} bytes)",
self.tli_pointers.len(),
bytes_written,
);
Ok(tli_ptr)
}

/// Returns the offset in the file to TLI
pub fn finish(&mut self, block_file_writer: &mut BufWriter<File>) -> crate::Result<u64> {
if self.block_counter > 0 {
self.write_block()?;
}
{
let block_writer = self.block_writer.as_mut().expect("should exist");
block_writer.flush()?;
block_writer.get_mut().sync_all()?;
}

let index_block_ptr = block_file_writer.stream_position()?;
let tli_ptr = self.write_top_level_index(block_file_writer, index_block_ptr)?;

std::fs::remove_file(&self.index_block_tmp_file_path)?;

Ok(tli_ptr)
}
}

0 comments on commit 9961e15

Please sign in to comment.