diff --git a/.gitignore b/.gitignore index 4fffb2f..972b0c4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ -/target -/Cargo.lock +**/target +**/Cargo.lock diff --git a/README.md b/README.md index c077ead..4164c0d 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ According to the benchmarks presented in the WiscKey paper, implementations can - **2.5x to 111x** for database loading - **1.6x to 14x** for random lookups -## Designed for asynchronous runtime +## Designed for asynchronous runtime (unstable) Based on the introduction and efficiency of async IO at the OS kernel level e.g **io_uring** for the Linux kernel, VelarixDB is designed for asynchronous runtime. In this case Tokio runtime. Tokio allows for efficient and scalable asynchronous operations, making the most of modern multi-core processors. Frankly, most OS File System does not provide async API currently but Tokio uses a thread pool to offload blocking file system operations. This means that even though the file system operations themselves are blocking at the OS level, Tokio can handle them without blocking the main async task executor. diff --git a/src/lib.rs b/src/lib.rs index 6e3506f..7854737 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,7 +30,7 @@ //! - **2.5x to 111x** for database loading //! - **1.6x to 14x** for random lookups //! -//! ## Designed for asynchronous runtime +//! ## Designed for asynchronous runtime (unstable) //! //! Based on the introduction and efficiency of async IO at the OS kernel level e.g **io_uring** for the Linux kernel, VelarixDB is designed for asynchronous runtime. In this case Tokio runtime. //! Tokio allows for efficient and scalable asynchronous operations, making the most of modern multi-core processors. Frankly, most OS File System does not provide async API currently but Tokio uses a thread pool to offload blocking file system operations. diff --git a/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db b/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db index 51321bf..a245fe0 100644 Binary files a/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db and b/src/tests/fixtures/data/buckets/bucket1201eb6b-8903-4557-a5bd-d87cb725f1d8/sstable_1720785462309/summary.db differ diff --git a/v2/Cargo.toml b/v2/Cargo.toml new file mode 100644 index 0000000..2e5a25e --- /dev/null +++ b/v2/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "v2" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +byteorder = "1.5.0" +lz4_flex = "0.11.3" +nanoid = "0.4.0" +seahash = "4.1.0" +tempfile = "3.10.1" +test-log = "0.2.16" +thiserror = "1.0.63" diff --git a/v2/src/bloom/bit_array.rs b/v2/src/bloom/bit_array.rs new file mode 100644 index 0000000..04f9604 --- /dev/null +++ b/v2/src/bloom/bit_array.rs @@ -0,0 +1,107 @@ +/// Gets a bit from the byte +fn get_bit(byte: u8, idx: usize) -> bool { + let bit_mask = 0b1000_0000_u8; + let bit_mask = bit_mask >> idx; + + let masked = byte & bit_mask; + masked > 0 +} + +/// Sets a bit in the byte +fn set_bit(byte: u8, idx: usize, value: bool) -> u8 { + let bit_mask = 0b1000_0000_u8; + let bit_mask = bit_mask >> idx; + + if value { + byte | bit_mask + } else { + byte & !bit_mask + } +} + +/// Fixed-size bit array +#[derive(Debug, Eq, PartialEq)] +pub struct BitArray(Box<[u8]>); + +impl BitArray { + #[must_use] + pub fn with_capacity(bytes: usize) -> Self { + let vec = vec![0; bytes]; + Self(vec.into_boxed_slice()) + } + + #[must_use] + pub fn from_bytes(bytes: Box<[u8]>) -> Self { + Self(bytes) + } + + /// Size in bytes + #[must_use] + pub fn len(&self) -> usize { + self.0.len() + } + + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[must_use] + pub fn bytes(&self) -> &[u8] { + &self.0 + } + + /// Sets the i-th bit + pub fn set(&mut self, idx: usize, val: bool) { + let byte_idx = idx / 8; + let byte = self.0.get_mut(byte_idx).expect("should be in bounds"); + + let bit_idx = idx % 8; + *byte = set_bit(*byte, bit_idx, val); + } + + /// Gets the i-th bit + #[must_use] + pub fn get(&self, idx: usize) -> bool { + let byte_idx = idx / 8; + let byte = self.0.get(byte_idx).expect("should be in bounds"); + + let bit_idx = idx % 8; + get_bit(*byte, bit_idx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use test_log::test; + + #[test] + fn bit_set_true() { + assert_eq!(0b0000_0010, set_bit(0, 6, true)); + assert_eq!(0b1000_0000, set_bit(0, 0, true)); + assert_eq!(0b0100_0000, set_bit(0, 1, true)); + assert_eq!(0b0100_0110, set_bit(0b0000_0110, 1, true)); + } + + #[test] + fn bit_set_false() { + assert_eq!(0b1111_1101, set_bit(0xFF, 6, false)); + assert_eq!(0b0111_1111, set_bit(0xFF, 0, false)); + assert_eq!(0b1011_1111, set_bit(0xFF, 1, false)); + + assert_eq!(0b0000_0110, set_bit(0b0100_0110, 1, false)); + } + + #[test] + fn bit_set_get() { + assert_eq!(0b1111_1101, set_bit(0xFF, 6, false)); + assert_eq!(0b0111_1111, set_bit(0xFF, 0, false)); + assert_eq!(0b1011_1111, set_bit(0xFF, 1, false)); + + assert!(!get_bit(0b0100_0110, 0)); + assert!(get_bit(0b0100_0110, 1)); + assert!(get_bit(0b0100_0110, 6)); + assert!(!get_bit(0b0100_0110, 7)); + } +} \ No newline at end of file diff --git a/v2/src/bloom/mod.rs b/v2/src/bloom/mod.rs new file mode 100644 index 0000000..45cb773 --- /dev/null +++ b/v2/src/bloom/mod.rs @@ -0,0 +1,266 @@ +mod bit_array; + +use crate::serde::{Deserializable, Serializable}; +use crate::{DeserializeError, SerializeError}; +use bit_array::BitArray; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use seahash::SeaHasher; +use std::hash::Hasher; +use std::io::{Read, Write}; + +pub const BLOOM_HEADER_MAGIC: &[u8] = &[b'B', b'L', b'0', b'0', b'1']; + +pub type CompositeHash = (u64, u64); + +#[derive(Debug, Eq, PartialEq)] +pub struct BloomFilter { + inner: BitArray, + + no_of_bits: usize, + + no_of_hash_func: usize, +} + +impl Serializable for BloomFilter { + fn serialize(&self, writer: &mut W) -> Result<(), SerializeError> { + writer.write_all(BLOOM_HEADER_MAGIC)?; + + // Filter type (unsed for now) + writer.write_u8(0)?; + + writer.write_u64::(self.no_of_bits as u64)?; + writer.write_u64::(self.no_of_hash_func as u64)?; + writer.write_all(self.inner.bytes())?; + + Ok(()) + } +} + +impl Deserializable for BloomFilter { + fn deserialize(reader: &mut R) -> Result { + let mut magic = [0u8; BLOOM_HEADER_MAGIC.len()]; + reader.read_exact(&mut magic)?; + + if magic != BLOOM_HEADER_MAGIC { + return Err(DeserializeError::InvalidHeader("BloomFilter".to_string())); + } + + // Filter type not used for now + let _ = reader.read_u8()?; + + let no_of_bits = reader.read_u64::()? as usize; + let no_of_hash_func = reader.read_u64::()? as usize; + + let mut bytes = vec![0; no_of_bits / 8]; + reader.read_exact(&mut bytes)?; + + Ok(Self::from( + no_of_bits, + no_of_hash_func, + bytes.into_boxed_slice(), + )) + } +} + +impl BloomFilter { + fn from(no_of_bits: usize, no_of_hash_func: usize, bytes: Box<[u8]>) -> Self { + Self { + inner: BitArray::from_bytes(bytes), + no_of_bits, + no_of_hash_func, + } + } + + #[allow(dead_code)] + pub fn len(&self) -> usize { + self.inner.len() + } + + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub(crate) fn calculate_no_of_bits(n: usize, fp_rate: f32) -> usize { + use std::f32::consts::LN_2; + + let n = n as f32; + let ln2_squared = LN_2.powi(2); + + let m = -(n * fp_rate.ln() / ln2_squared); + ((m / 8.0).ceil() * 8.0) as usize + } + + // TODO: Compare this with get_no_hash_func_heuristic + // /// Calculates number of hash fuctions to be used by [`BloomFilter`] + // fn calculate_no_of_hash_func(no_of_bits: u32, no_of_elements: u32) -> u32 { + // let no_hash_func = (no_of_bits as f64 / no_of_elements as f64) * (2_f64.ln()).ceil(); + // no_hash_func as u32 + // } + + /// Heuristically get the somewhat-optimal k value for a given desired FPR + fn get_no_hash_func_heuristic(fp_rate: f32) -> usize { + match fp_rate { + _ if fp_rate > 0.4 => 1, + _ if fp_rate > 0.2 => 2, + _ if fp_rate > 0.1 => 3, + _ if fp_rate > 0.05 => 4, + _ if fp_rate > 0.03 => 5, + _ if fp_rate > 0.02 => 5, + _ if fp_rate > 0.01 => 7, + _ if fp_rate > 0.001 => 10, + _ if fp_rate > 0.000_1 => 13, + _ if fp_rate > 0.000_01 => 17, + _ => 20, + } + } + + #[must_use] + pub fn with_fp_rate(item_count: usize, fp_rate: f32) -> Self { + // NOTE: Some sensible minimum + let fp_rate = fp_rate.max(0.000_001); + + let no_of_hash_func = Self::get_no_hash_func_heuristic(fp_rate); + let no_of_bits = Self::calculate_no_of_bits(item_count, fp_rate); + + Self { + inner: BitArray::with_capacity(no_of_bits / 8), + no_of_bits, + no_of_hash_func, + } + } + + /// Adds the key to the filter + pub fn set_with_hash(&mut self, (mut h1, mut h2): CompositeHash) { + for i in 0..(self.no_of_hash_func as u64) { + let idx = h1 % (self.no_of_bits as u64); + + self.enable_bit(idx as usize); + + h1 = h1.wrapping_add(h2); + h2 = h2.wrapping_add(i); + } + } + + pub fn contains_hash(&self, hash: CompositeHash) -> bool { + let (mut h1, mut h2) = hash; + + for i in 0..(self.no_of_hash_func as u64) { + let idx = h1 % (self.no_of_bits as u64); + + if !self.inner.get(idx as usize) { + return false; + } + + h1 = h1.wrapping_add(h2); + h2 = h2.wrapping_add(i); + } + true + } + + /// Sets the bit at the given index to `true` + fn enable_bit(&mut self, idx: usize) { + self.inner.set(idx, true); + } + + pub fn contains(&self, key: &[u8]) -> bool { + self.contains_hash(Self::get_hash(key)) + } + + #[must_use] + pub fn get_hash(key: &[u8]) -> CompositeHash { + let mut hasher = SeaHasher::default(); + hasher.write(key); + let h1 = hasher.finish(); + + hasher.write(key); + let h2 = hasher.finish(); + + (h1, h2) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs::File; + use test_log::test; + + #[test] + fn bloom_serde_round_trip() -> crate::Result<()> { + let dir = tempfile::tempdir()?; + + let path = dir.path().join("bf"); + let mut file = File::create(&path)?; + + let mut filter = BloomFilter::with_fp_rate(10, 0.0001); + + for key in [ + b"item0", b"item1", b"item2", b"item3", b"item4", b"item5", b"item6", b"item7", + b"item8", b"item9", + ] { + filter.set_with_hash(BloomFilter::get_hash(key)); + } + + filter.serialize(&mut file)?; + file.sync_all()?; + drop(file); + + let mut file = File::open(&path)?; + let filter_copy = BloomFilter::deserialize(&mut file)?; + + assert_eq!(filter, filter_copy); + + Ok(()) + } + + #[test] + fn bloom_calculate_no_of_bits() { + assert_eq!(9_592, BloomFilter::calculate_no_of_bits(1_000, 0.01)); + assert_eq!(4_800, BloomFilter::calculate_no_of_bits(1_000, 0.1)); + assert_eq!(4_792_536, BloomFilter::calculate_no_of_bits(1_000_000, 0.1)); + } + + #[test] + fn bloom_basic() { + let mut filter = BloomFilter::with_fp_rate(10, 0.0001); + + for key in [ + b"item0", b"item1", b"item2", b"item3", b"item4", b"item5", b"item6", b"item7", + b"item8", b"item9", + ] { + assert!(!filter.contains(key)); + filter.set_with_hash(BloomFilter::get_hash(key)); + assert!(filter.contains(key)); + + assert!(!filter.contains(b"asdasdasdasdasdasdasd")); + } + } + + #[test] + fn bloom_fpr() { + let item_count = 1_000_000; + let fpr = 0.01; + + let mut filter = BloomFilter::with_fp_rate(item_count, fpr); + + for key in (0..item_count).map(|_| nanoid::nanoid!()) { + let key = key.as_bytes(); + + filter.set_with_hash(BloomFilter::get_hash(key)); + assert!(filter.contains(key)); + } + + let mut false_positives = 0; + + for key in (0..item_count).map(|_| nanoid::nanoid!()) { + let key = key.as_bytes(); + + if filter.contains(key) { + false_positives += 1; + } + } + + assert!((10_000 - false_positives) < 200); + } +} diff --git a/v2/src/error.rs b/v2/src/error.rs new file mode 100644 index 0000000..a38419d --- /dev/null +++ b/v2/src/error.rs @@ -0,0 +1,24 @@ +use crate::serde::{DeserializeError, SerializeError}; +use lz4_flex::block::DecompressError; +use std::io::Error as IoError; +use thiserror::Error; + +#[derive(Error, Debug)] +#[non_exhaustive] +pub enum Error { + #[error("IO error")] + Io(#[from] IoError), + + #[error("SerializeError({0})")] + Serialize(#[from] SerializeError), + + #[error("DeserializeError({0})")] + Deserialize(#[from] DeserializeError), + + #[error("DecompressError({0})")] + Decompress(DecompressError), +} + + +/// Tree result +pub type Result = std::result::Result; diff --git a/v2/src/lib.rs b/v2/src/lib.rs new file mode 100644 index 0000000..850c477 --- /dev/null +++ b/v2/src/lib.rs @@ -0,0 +1,8 @@ +mod bloom; +mod error; +mod serde; + +pub use { + error::{Error, Result}, + serde::{DeserializeError, SerializeError}, +}; diff --git a/v2/src/serde.rs b/v2/src/serde.rs new file mode 100644 index 0000000..1cd94dd --- /dev/null +++ b/v2/src/serde.rs @@ -0,0 +1,69 @@ +use std::io::{Read, Write, Error as IoError}; +use thiserror::Error; +/// Error during serialization +#[derive(Error, Debug)] +pub enum SerializeError { + #[error("IO Error: {0}")] + Io(IoError), +} + +impl From for SerializeError { + fn from(value: std::io::Error) -> Self { + Self::Io(value) + } +} + + +#[derive(Debug)] +pub struct InvalidTypeTuple(String, u8); + +impl std::fmt::Display for InvalidTypeTuple { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "({}, {})", self.0, self.1) + } +} + +/// Error during deserialization +#[derive(Error, Debug)] +pub enum DeserializeError { + #[error("IO Error: {0}")] + Io(IoError), + + #[error("UTF8 Error: {0}")] + Utf8(std::str::Utf8Error), + + #[error("Invalid Tag: {0}")] + InvalidTag(InvalidTypeTuple), + + #[error("InvalidTrailer")] + InvalidTrailer, + + #[error("InvalidHeader: {0}")] + InvalidHeader(String), +} + +impl From for DeserializeError { + fn from(value: std::io::Error) -> Self { + Self::Io(value) + } +} + +impl From for DeserializeError { + fn from(value: std::str::Utf8Error) -> Self { + Self::Utf8(value) + } +} + +/// Trait to serialize structs +pub trait Serializable { + /// Serialize to bytes + fn serialize(&self, writer: &mut W) -> Result<(), SerializeError>; +} + +/// Trait to deserialize structs +pub trait Deserializable { + /// Deserialize from bytes + fn deserialize(reader: &mut R) -> Result + where + Self: Sized; +}