Skip to content

Commit

Permalink
reorg code, move constants and more rocksdb-specifics to storage::roc…
Browse files Browse the repository at this point in the history
…ksdb
  • Loading branch information
luizirber committed Jul 14, 2024
1 parent 4231cf2 commit 6684126
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 84 deletions.
51 changes: 10 additions & 41 deletions src/core/src/index/revindex/disk_revindex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,23 @@ use std::sync::Arc;
use byteorder::{LittleEndian, WriteBytesExt};
use log::{info, trace};
use rayon::prelude::*;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options};
use rocksdb::MergeOperands;

use crate::collection::{Collection, CollectionSet};
use crate::encodings::{Color, Idx};
use crate::index::revindex::{
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, DB,
HASHES, MANIFEST, METADATA, STORAGE_SPEC, VERSION,
self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps,
MANIFEST, STORAGE_SPEC, VERSION,
};
use crate::index::{calculate_gather_stats, GatherResult, SigCounter};
use crate::manifest::Manifest;
use crate::prelude::*;
use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree};
use crate::sketch::Sketch;
use crate::storage::{rocksdb::STORAGE, InnerStorage, Storage};
use crate::storage::{
rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA, STORAGE},

Check warning on line 23 in src/core/src/index/revindex/disk_revindex.rs

View workflow job for this annotation

GitHub Actions / minimum_rust_version

unused import: `STORAGE`
InnerStorage, Storage,
};
use crate::Result;

const DB_VERSION: u8 = 1;
Expand All @@ -38,7 +41,7 @@ pub struct RevIndex {
path: PathBuf,
}

fn merge_datasets(
pub(crate) fn merge_datasets(
_: &[u8],
existing_val: Option<&[u8]>,
operands: &MergeOperands,
Expand All @@ -65,7 +68,7 @@ pub fn repair(path: &Path) {

impl RevIndex {
pub fn create(path: &Path, collection: CollectionSet) -> Result<module::RevIndex> {
let mut opts = module::RevIndex::db_options();
let mut opts = db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.prepare_for_bulk_load();
Expand Down Expand Up @@ -106,7 +109,7 @@ impl RevIndex {
read_only: bool,
storage_spec: Option<&str>,
) -> Result<module::RevIndex> {
let mut opts = module::RevIndex::db_options();
let mut opts = db_options();
if !read_only {
opts.prepare_for_bulk_load();
}
Expand Down Expand Up @@ -503,37 +506,3 @@ impl RevIndexOps for RevIndex {
*/
}
}

pub(crate) fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
cfopts.set_merge_operator_associative("datasets operator", merge_datasets);
cfopts.set_min_write_buffer_number_to_merge(10);

// Updated default from
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_storage = ColumnFamilyDescriptor::new(STORAGE, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

vec![cf_hashes, cf_metadata, cf_storage]
}
41 changes: 6 additions & 35 deletions src/core/src/index/revindex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,20 @@ use crate::prelude::*;
use crate::signature::Signature;
use crate::sketch::minhash::KmerMinHash;
use crate::sketch::Sketch;
use crate::storage::rocksdb::{db_options, COLORS, DB};
use crate::HashIntoType;
use crate::Result;

pub type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;
// DB metadata saved in the METADATA column family
const MANIFEST: &str = "manifest";
const STORAGE_SPEC: &str = "storage_spec";
const VERSION: &str = "version";

type QueryColors = HashMap<Color, Datasets>;
type HashToColorT = HashMap<HashIntoType, Color, BuildNoHashHasher<HashIntoType>>;
#[derive(Serialize, Deserialize)]
pub struct HashToColor(HashToColorT);

// Column families
const HASHES: &str = "hashes";
const COLORS: &str = "colors";
const METADATA: &str = "metadata";

// DB metadata saved in the METADATA column family
const MANIFEST: &str = "manifest";
const STORAGE_SPEC: &str = "storage_spec";
const VERSION: &str = "version";

#[enum_dispatch(RevIndexOps)]
pub enum RevIndex {
//Color(color_revindex::ColorRevIndex),
Expand Down Expand Up @@ -186,7 +180,7 @@ impl RevIndex {
}

pub fn open<P: AsRef<Path>>(index: P, read_only: bool, spec: Option<&str>) -> Result<Self> {
let opts = Self::db_options();
let opts = db_options();
let cfs = DB::list_cf(&opts, index.as_ref()).unwrap();

if cfs.into_iter().any(|c| c == COLORS) {
Expand All @@ -197,29 +191,6 @@ impl RevIndex {
disk_revindex::RevIndex::open(index, read_only, spec)
}
}

pub(crate) fn db_options() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.set_max_open_files(500);

// Updated defaults from
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options
opts.set_bytes_per_sync(1048576);
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_opts.set_format_version(6);
opts.set_block_based_table_factory(&block_opts);
// End of updated defaults

opts.increase_parallelism(rayon::current_num_threads() as i32);
//opts.max_background_jobs = 6;
// opts.optimize_level_style_compaction();
// opts.optimize_universal_style_compaction();

opts
}
}

pub fn prepare_query(search_sig: Signature, selection: &Selection) -> Option<KmerMinHash> {
Expand Down
11 changes: 9 additions & 2 deletions src/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{Arc, RwLock};

use camino::Utf8Path as Path;
use camino::Utf8PathBuf as PathBuf;
use cfg_if::cfg_if;
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -160,7 +161,14 @@ impl InnerStorage {
x if x.starts_with("memory") => InnerStorage::new(MemStorage::new()),
x if x.starts_with("rocksdb") => {
let path = x.split("://").last().expect("not a valid path");

Check warning on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / Check

unused variable: `path`

Check failure on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / Lints (beta)

unused variable: `path`

Check failure on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / Lints (stable)

unused variable: `path`

Check warning on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / test (macos)

unused variable: `path`

Check warning on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / Run tests under wasm32-wasi

unused variable: `path`

Check warning on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / Run tests under wasm32-wasi

unused variable: `path`

Check warning on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / test (stable)

unused variable: `path`

Check warning on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / test (beta)

unused variable: `path`

Check warning on line 163 in src/core/src/storage/mod.rs

View workflow job for this annotation

GitHub Actions / test (windows)

unused variable: `path`

Check warning on line 163 in src/core/src/storage/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage/mod.rs#L163

Added line #L163 was not covered by tests
InnerStorage::new(RocksDBStorage::from_path(path))

cfg_if! {
if #[cfg(feature = "branchwater")] {
InnerStorage::new(RocksDBStorage::from_path(path))

Check warning on line 167 in src/core/src/storage/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage/mod.rs#L167

Added line #L167 was not covered by tests
} else {
todo!("Must enable branchwater feature")
}
}
}
x if x.starts_with("zip") => {
let path = x.split("://").last().expect("not a valid path");
Expand Down Expand Up @@ -656,4 +664,3 @@ impl Storage for MemStorage {
"memory://".into()
}
}

80 changes: 74 additions & 6 deletions src/core/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,42 @@
use std::sync::Arc;

use rocksdb::{ColumnFamilyDescriptor, Options};

use crate::storage::{Storage, StorageArgs, StorageError};
use crate::{Error, Result};

Check warning on line 6 in src/core/src/storage/rocksdb.rs

View workflow job for this annotation

GitHub Actions / minimum_rust_version

unused import: `Error`

// Column families
pub(crate) const HASHES: &str = "hashes";
pub(crate) const COLORS: &str = "colors";
pub(crate) const METADATA: &str = "metadata";

// Column family for using rocksdb as a Storage
pub(crate) const STORAGE: &str = "storage";

pub type DB = rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>;

/// Store data in RocksDB
#[derive(Debug, Clone)]
pub struct RocksDBStorage {
db: Arc<crate::index::revindex::DB>,
db: Arc<DB>,
}

impl RocksDBStorage {
pub fn from_path(path: &str) -> Self {
let mut opts = crate::index::revindex::RevIndex::db_options();
let mut opts = db_options();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
opts.prepare_for_bulk_load();

Check warning on line 29 in src/core/src/storage/rocksdb.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage/rocksdb.rs#L25-L29

Added lines #L25 - L29 were not covered by tests

// prepare column family descriptors
let cfs = crate::index::revindex::disk_revindex::cf_descriptors();
let cfs = cf_descriptors();

Check warning on line 32 in src/core/src/storage/rocksdb.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage/rocksdb.rs#L32

Added line #L32 was not covered by tests

let db =
Arc::new(crate::index::revindex::DB::open_cf_descriptors(&opts, path, cfs).unwrap());
let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap());

Self { db }
}

pub fn from_db(db: Arc<crate::index::revindex::DB>) -> Self {
pub fn from_db(db: Arc<DB>) -> Self {

Check warning on line 39 in src/core/src/storage/rocksdb.rs

View check run for this annotation

Codecov / codecov/patch

src/core/src/storage/rocksdb.rs#L39

Added line #L39 was not covered by tests
Self { db: db.clone() }
}
}
Expand All @@ -55,3 +63,63 @@ impl Storage for RocksDBStorage {
"rocksdb://".into()
}
}

pub(crate) fn cf_descriptors() -> Vec<ColumnFamilyDescriptor> {
let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
cfopts.set_merge_operator_associative(
"datasets operator",
crate::index::revindex::disk_revindex::merge_datasets,
);
cfopts.set_min_write_buffer_number_to_merge(10);

// Updated default from
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

let cf_storage = ColumnFamilyDescriptor::new(STORAGE, cfopts);

let mut cfopts = Options::default();
cfopts.set_max_write_buffer_number(16);
// Updated default
cfopts.set_level_compaction_dynamic_level_bytes(true);

vec![cf_hashes, cf_metadata, cf_storage]
}

pub(crate) fn db_options() -> rocksdb::Options {
let mut opts = rocksdb::Options::default();
opts.set_max_open_files(500);

// Updated defaults from
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options
opts.set_bytes_per_sync(1048576);
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024);
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
block_opts.set_format_version(6);
opts.set_block_based_table_factory(&block_opts);
// End of updated defaults

opts.increase_parallelism(rayon::current_num_threads() as i32);
//opts.max_background_jobs = 6;
// opts.optimize_level_style_compaction();
// opts.optimize_universal_style_compaction();

opts
}

0 comments on commit 6684126

Please sign in to comment.