diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs index 21dc3b11c..3ac59c986 100644 --- a/src/core/src/index/revindex/disk_revindex.rs +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -6,20 +6,23 @@ use std::sync::Arc; use byteorder::{LittleEndian, WriteBytesExt}; use log::{info, trace}; use rayon::prelude::*; -use rocksdb::{ColumnFamilyDescriptor, MergeOperands, Options}; +use rocksdb::MergeOperands; use crate::collection::{Collection, CollectionSet}; use crate::encodings::{Color, Idx}; use crate::index::revindex::{ - self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, DB, - HASHES, MANIFEST, METADATA, STORAGE_SPEC, VERSION, + self as module, stats_for_cf, Datasets, DbStats, HashToColor, QueryColors, RevIndexOps, + MANIFEST, STORAGE_SPEC, VERSION, }; use crate::index::{calculate_gather_stats, GatherResult, SigCounter}; use crate::manifest::Manifest; use crate::prelude::*; use crate::sketch::minhash::{KmerMinHash, KmerMinHashBTree}; use crate::sketch::Sketch; -use crate::storage::{rocksdb::STORAGE, InnerStorage, Storage}; +use crate::storage::{ + rocksdb::{cf_descriptors, db_options, DB, HASHES, METADATA, STORAGE}, + InnerStorage, Storage, +}; use crate::Result; const DB_VERSION: u8 = 1; @@ -38,7 +41,7 @@ pub struct RevIndex { path: PathBuf, } -fn merge_datasets( +pub(crate) fn merge_datasets( _: &[u8], existing_val: Option<&[u8]>, operands: &MergeOperands, @@ -65,7 +68,7 @@ pub fn repair(path: &Path) { impl RevIndex { pub fn create(path: &Path, collection: CollectionSet) -> Result { - let mut opts = module::RevIndex::db_options(); + let mut opts = db_options(); opts.create_if_missing(true); opts.create_missing_column_families(true); opts.prepare_for_bulk_load(); @@ -106,7 +109,7 @@ impl RevIndex { read_only: bool, storage_spec: Option<&str>, ) -> Result { - let mut opts = module::RevIndex::db_options(); + let mut opts = db_options(); if !read_only { opts.prepare_for_bulk_load(); } @@ -503,37 +506,3 @@ impl RevIndexOps for RevIndex { */ } } - -pub(crate) fn cf_descriptors() -> Vec { - let mut cfopts = Options::default(); - cfopts.set_max_write_buffer_number(16); - cfopts.set_merge_operator_associative("datasets operator", merge_datasets); - cfopts.set_min_write_buffer_number_to_merge(10); - - // Updated default from - // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options - cfopts.set_level_compaction_dynamic_level_bytes(true); - - let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts); - - let mut cfopts = Options::default(); - cfopts.set_max_write_buffer_number(16); - // Updated default - cfopts.set_level_compaction_dynamic_level_bytes(true); - - let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts); - - let mut cfopts = Options::default(); - cfopts.set_max_write_buffer_number(16); - // Updated default - cfopts.set_level_compaction_dynamic_level_bytes(true); - - let cf_storage = ColumnFamilyDescriptor::new(STORAGE, cfopts); - - let mut cfopts = Options::default(); - cfopts.set_max_write_buffer_number(16); - // Updated default - cfopts.set_level_compaction_dynamic_level_bytes(true); - - vec![cf_hashes, cf_metadata, cf_storage] -} diff --git a/src/core/src/index/revindex/mod.rs b/src/core/src/index/revindex/mod.rs index cfa2a4de6..88b7a6cbc 100644 --- a/src/core/src/index/revindex/mod.rs +++ b/src/core/src/index/revindex/mod.rs @@ -20,26 +20,20 @@ use crate::prelude::*; use crate::signature::Signature; use crate::sketch::minhash::KmerMinHash; use crate::sketch::Sketch; +use crate::storage::rocksdb::{db_options, COLORS, DB}; use crate::HashIntoType; use crate::Result; -pub type DB = rocksdb::DBWithThreadMode; +// DB metadata saved in the METADATA column family +const MANIFEST: &str = "manifest"; +const STORAGE_SPEC: &str = "storage_spec"; +const VERSION: &str = "version"; type QueryColors = HashMap; type HashToColorT = HashMap>; #[derive(Serialize, Deserialize)] pub struct HashToColor(HashToColorT); -// Column families -const HASHES: &str = "hashes"; -const COLORS: &str = "colors"; -const METADATA: &str = "metadata"; - -// DB metadata saved in the METADATA column family -const MANIFEST: &str = "manifest"; -const STORAGE_SPEC: &str = "storage_spec"; -const VERSION: &str = "version"; - #[enum_dispatch(RevIndexOps)] pub enum RevIndex { //Color(color_revindex::ColorRevIndex), @@ -186,7 +180,7 @@ impl RevIndex { } pub fn open>(index: P, read_only: bool, spec: Option<&str>) -> Result { - let opts = Self::db_options(); + let opts = db_options(); let cfs = DB::list_cf(&opts, index.as_ref()).unwrap(); if cfs.into_iter().any(|c| c == COLORS) { @@ -197,29 +191,6 @@ impl RevIndex { disk_revindex::RevIndex::open(index, read_only, spec) } } - - pub(crate) fn db_options() -> rocksdb::Options { - let mut opts = rocksdb::Options::default(); - opts.set_max_open_files(500); - - // Updated defaults from - // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options - opts.set_bytes_per_sync(1048576); - let mut block_opts = rocksdb::BlockBasedOptions::default(); - block_opts.set_block_size(16 * 1024); - block_opts.set_cache_index_and_filter_blocks(true); - block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); - block_opts.set_format_version(6); - opts.set_block_based_table_factory(&block_opts); - // End of updated defaults - - opts.increase_parallelism(rayon::current_num_threads() as i32); - //opts.max_background_jobs = 6; - // opts.optimize_level_style_compaction(); - // opts.optimize_universal_style_compaction(); - - opts - } } pub fn prepare_query(search_sig: Signature, selection: &Selection) -> Option { diff --git a/src/core/src/storage/mod.rs b/src/core/src/storage/mod.rs index 6efefbee8..ef057f8c8 100644 --- a/src/core/src/storage/mod.rs +++ b/src/core/src/storage/mod.rs @@ -11,6 +11,7 @@ use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use thiserror::Error; use typed_builder::TypedBuilder; +use cfg_if::cfg_if; use crate::errors::ReadDataError; use crate::prelude::*; @@ -160,7 +161,14 @@ impl InnerStorage { x if x.starts_with("memory") => InnerStorage::new(MemStorage::new()), x if x.starts_with("rocksdb") => { let path = x.split("://").last().expect("not a valid path"); - InnerStorage::new(RocksDBStorage::from_path(path)) + + cfg_if! { + if #[cfg(feature = "branchwater")] { + InnerStorage::new(RocksDBStorage::from_path(path)) + } else { + todo!("Must enable branchwater feature") + } + } } x if x.starts_with("zip") => { let path = x.split("://").last().expect("not a valid path"); diff --git a/src/core/src/storage/rocksdb.rs b/src/core/src/storage/rocksdb.rs index 4fc3a4023..7da002a9c 100644 --- a/src/core/src/storage/rocksdb.rs +++ b/src/core/src/storage/rocksdb.rs @@ -1,34 +1,42 @@ use std::sync::Arc; +use rocksdb::{ColumnFamilyDescriptor, Options}; + use crate::storage::{Storage, StorageArgs, StorageError}; use crate::{Error, Result}; +// Column families +pub(crate) const HASHES: &str = "hashes"; +pub(crate) const COLORS: &str = "colors"; +pub(crate) const METADATA: &str = "metadata"; + // Column family for using rocksdb as a Storage pub(crate) const STORAGE: &str = "storage"; +pub type DB = rocksdb::DBWithThreadMode; + /// Store data in RocksDB #[derive(Debug, Clone)] pub struct RocksDBStorage { - db: Arc, + db: Arc, } impl RocksDBStorage { pub fn from_path(path: &str) -> Self { - let mut opts = crate::index::revindex::RevIndex::db_options(); + let mut opts = db_options(); opts.create_if_missing(true); opts.create_missing_column_families(true); opts.prepare_for_bulk_load(); // prepare column family descriptors - let cfs = crate::index::revindex::disk_revindex::cf_descriptors(); + let cfs = cf_descriptors(); - let db = - Arc::new(crate::index::revindex::DB::open_cf_descriptors(&opts, path, cfs).unwrap()); + let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap()); Self { db } } - pub fn from_db(db: Arc) -> Self { + pub fn from_db(db: Arc) -> Self { Self { db: db.clone() } } } @@ -55,3 +63,63 @@ impl Storage for RocksDBStorage { "rocksdb://".into() } } + +pub(crate) fn cf_descriptors() -> Vec { + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + cfopts.set_merge_operator_associative( + "datasets operator", + crate::index::revindex::disk_revindex::merge_datasets, + ); + cfopts.set_min_write_buffer_number_to_merge(10); + + // Updated default from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_hashes = ColumnFamilyDescriptor::new(HASHES, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_metadata = ColumnFamilyDescriptor::new(METADATA, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + + let cf_storage = ColumnFamilyDescriptor::new(STORAGE, cfopts); + + let mut cfopts = Options::default(); + cfopts.set_max_write_buffer_number(16); + // Updated default + cfopts.set_level_compaction_dynamic_level_bytes(true); + + vec![cf_hashes, cf_metadata, cf_storage] +} + +pub(crate) fn db_options() -> rocksdb::Options { + let mut opts = rocksdb::Options::default(); + opts.set_max_open_files(500); + + // Updated defaults from + // https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + opts.set_bytes_per_sync(1048576); + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_block_size(16 * 1024); + block_opts.set_cache_index_and_filter_blocks(true); + block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true); + block_opts.set_format_version(6); + opts.set_block_based_table_factory(&block_opts); + // End of updated defaults + + opts.increase_parallelism(rayon::current_num_threads() as i32); + //opts.max_background_jobs = 6; + // opts.optimize_level_style_compaction(); + // opts.optimize_universal_style_compaction(); + + opts +}