Skip to content

Commit

Permalink
Merge pull request #67 from Gifted-s/ft/file_sync
Browse files Browse the repository at this point in the history
[Feat] Data Block, Compression, LSM Entry
  • Loading branch information
Gifted-s authored Aug 3, 2024
2 parents ca54672 + ae97088 commit f696277
Show file tree
Hide file tree
Showing 14 changed files with 500 additions and 57 deletions.
1 change: 1 addition & 0 deletions v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"

[dependencies]
byteorder = "1.5.0"
crc32fast = "1.4.2"
crossbeam-skiplist = "0.1.3"
guardian = "1.1.0"
lz4_flex = "0.11.3"
Expand Down
30 changes: 30 additions & 0 deletions v2/src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[allow(clippy::module_name_repetitions)]
pub enum CompressionType {
Lz4,
}

impl From<CompressionType> for u8 {
fn from(val: CompressionType) -> Self {
match val {
CompressionType::Lz4 => 1,
}
}
}

impl TryFrom<u8> for CompressionType {
type Error = ();

fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
1 => Ok(Self::Lz4),
_ => Err(()),
}
}
}

impl std::fmt::Display for CompressionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "lz4")
}
}
23 changes: 23 additions & 0 deletions v2/src/either.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#[derive(Clone, Debug)]
pub enum Either<L, R> {
Left(L),
Right(R),
}

use Either::{Left, Right};

impl<L, R> Either<L, R> {
pub fn left(self) -> L {
match self {
Left(value) => value,
Right(_) => panic!("Accessed Right on Left value"),
}
}

pub fn right(self) -> R {
match self {
Right(value) => value,
Left(_) => panic!("Accessed Right on Left value"),
}
}
}
5 changes: 5 additions & 0 deletions v2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ pub enum Error {
Decompress(DecompressError),
}

impl From<DecompressError> for Error {
fn from(value: DecompressError) -> Self {
Self::Decompress(value)
}
}

/// Tree result
pub type Result<T> = std::result::Result<T, Error>;
58 changes: 58 additions & 0 deletions v2/src/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::{fs::File, io::Write, path::Path};

pub const LSM_MARKER: &str = "version";
pub const CONFIG_FILE: &str = "config";
pub const SEGMENTS_FOLDER: &str = "segments";
pub const LEVELS_MANIFEST_FILE: &str = "levels";

pub fn rewrite_atomic<P: AsRef<Path>>(path: P, content: &[u8]) -> std::io::Result<()> {
let path = path.as_ref();
let folder = path.parent().expect("should have a parent");
let mut temp_file = tempfile::NamedTempFile::new_in(folder)?;
temp_file.write_all(content)?;
temp_file.persist(path)?;

#[cfg(not(target_os = "windows"))]
{
let file = File::open(path)?;
file.sync_all()?;
}

Ok(())
}

#[cfg(not(target_os = "windows"))]
pub fn fsync_directory<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
let file = File::open(path)?;
debug_assert!(file.metadata()?.is_dir());
file.sync_all()
}

#[cfg(target_os = "windows")]
pub fn fsync_directory<P: AsRef<Path>>(path: P) -> std::io::Result<()> {
// Cannot fsync directory on Windows
Ok()
}

#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::Write;
use test_log::test;

use super::rewrite_atomic;
#[test]
fn atomic_write() -> std::io::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("test.txt");

let mut file = File::create(&path)?;
write!(file, "abcdefghijklmnop")?;

rewrite_atomic(&path, b"newcontent")?;

let content = std::fs::read_to_string(&path)?;
assert_eq!("newcontent", content);
Ok(())
}
}
8 changes: 6 additions & 2 deletions v2/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
mod bloom;
mod error;
mod serde;
mod value;
mod lsm_entry;
mod range;
mod memtable;
mod stop_signal;
mod time;
mod version;
mod file;
mod either;
mod sst;
mod compression;

pub use {
error::{Error, Result},
serde::{DeserializeError, SerializeError},
value::{SeqNo, UserKey, UserValue, Value, ValueType},
lsm_entry::{SeqNo, UserKey, UserValue, LSMEntry, ValueType},
};
34 changes: 17 additions & 17 deletions v2/src/value.rs → v2/src/lsm_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Ord for ParsedInternalKey {
/// NOTE: This represents values stored in
/// LSM-tree not in Value log
#[derive(Clone, PartialEq, Eq)]
pub struct Value {
pub struct LSMEntry {
/// User-define3d key - an arbitrary byte array
///
/// Supports up to 2^16 bytes for perfmance considerations
Expand All @@ -123,7 +123,7 @@ pub struct Value {
pub value_type: ValueType,
}

impl std::fmt::Debug for Value {
impl std::fmt::Debug for LSMEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
Expand All @@ -139,8 +139,8 @@ impl std::fmt::Debug for Value {
}
}

impl From<(ParsedInternalKey, ValueOffset)> for Value {
fn from(val: (ParsedInternalKey, ValueOffset)) -> Value {
impl From<(ParsedInternalKey, ValueOffset)> for LSMEntry {
fn from(val: (ParsedInternalKey, ValueOffset)) -> LSMEntry {
let key = val.0;

Self {
Expand All @@ -152,7 +152,7 @@ impl From<(ParsedInternalKey, ValueOffset)> for Value {
}
}

impl PartialOrd for Value {
impl PartialOrd for LSMEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
Expand All @@ -161,13 +161,13 @@ impl PartialOrd for Value {
// Order by user key, THEN by sequencce number
// This is important to ensure most resent keys are
// use, otherwise queries will bnehave unexpectedly
impl Ord for Value {
impl Ord for LSMEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
(&self.key, Reverse(self.seqno)).cmp(&(&other.key, Reverse(other.seqno)))
}
}

impl Value {
impl LSMEntry {
/// Creates a new [`Value`].
///
/// # Panics
Expand Down Expand Up @@ -217,8 +217,8 @@ impl Value {
}
}

impl From<Value> for ParsedInternalKey {
fn from(val: Value) -> Self {
impl From<LSMEntry> for ParsedInternalKey {
fn from(val: LSMEntry) -> Self {
Self {
user_key: val.key,
seqno: val.seqno,
Expand All @@ -228,7 +228,7 @@ impl From<Value> for ParsedInternalKey {
}


impl Serializable for Value {
impl Serializable for LSMEntry {
fn serialize<W: Write>(&self, writer: &mut W) -> Result<(), SerializeError> {
writer.write_u64::<BigEndian>(self.seqno)?;
writer.write_u8(u8::from(self.value_type))?;
Expand All @@ -246,7 +246,7 @@ impl Serializable for Value {
}
}

impl Deserializable for Value {
impl Deserializable for LSMEntry {
fn deserialize<R: Read>(reader: &mut R) -> Result<Self, DeserializeError> {
let seqno = reader.read_u64::<BigEndian>()?;
let value_type = reader.read_u8()?.into();
Expand All @@ -272,7 +272,7 @@ mod tests {
#[test]
fn value_raw() -> crate::Result<()> {
// Create an empty Value instance
let value = Value::new(vec![1, 2, 3], 10, 1, ValueType::Value);
let value = LSMEntry::new(vec![1, 2, 3], 10, 1, ValueType::Value);

#[rustfmt::skip]
let bytes = &[
Expand All @@ -290,7 +290,7 @@ mod tests {
];

// Deserialize the empty Value
let deserialized = Value::deserialize(&mut Cursor::new(bytes))?;
let deserialized = LSMEntry::deserialize(&mut Cursor::new(bytes))?;

// Check if deserialized Value is equivalent to the original empty Value
assert_eq!(value, deserialized);
Expand All @@ -301,14 +301,14 @@ mod tests {
#[test]
fn value_zero_value_offset() -> crate::Result<()> {
// Create an empty Value instance
let value = Value::new(vec![1, 2, 3], 0, 42, ValueType::Value);
let value = LSMEntry::new(vec![1, 2, 3], 0, 42, ValueType::Value);

// Serialize the empty Value
let mut serialized = Vec::new();
value.serialize(&mut serialized)?;

// Deserialize the empty Value
let deserialized = Value::deserialize(&mut &serialized[..])?;
let deserialized = LSMEntry::deserialize(&mut &serialized[..])?;

// Check if deserialized Value is equivalent to the original empty Value
assert_eq!(value, deserialized);
Expand All @@ -319,7 +319,7 @@ mod tests {
#[test]
fn value_with_value() -> crate::Result<()> {
// Create an empty Value instance
let value = Value::new(
let value = LSMEntry::new(
vec![1, 2, 3],
10,
42,
Expand All @@ -331,7 +331,7 @@ mod tests {
value.serialize(&mut serialized)?;

// Deserialize the empty Value
let deserialized = Value::deserialize(&mut &serialized[..])?;
let deserialized = LSMEntry::deserialize(&mut &serialized[..])?;

// Check if deserialized Value is equivalent to the original empty Value
assert_eq!(value, deserialized);
Expand Down
Loading

0 comments on commit f696277

Please sign in to comment.