diff --git a/src/core/src/index/revindex/disk_revindex.rs b/src/core/src/index/revindex/disk_revindex.rs index 520072170..9f324189f 100644 --- a/src/core/src/index/revindex/disk_revindex.rs +++ b/src/core/src/index/revindex/disk_revindex.rs @@ -1,5 +1,5 @@ use std::hash::{BuildHasher, BuildHasherDefault, Hash, Hasher}; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -38,7 +38,6 @@ fn compute_color(idxs: &Datasets) -> Color { pub struct RevIndex { db: Arc, collection: Arc, - path: PathBuf, } pub(crate) fn merge_datasets( @@ -82,7 +81,6 @@ impl RevIndex { let index = Self { db, collection: Arc::new(collection), - path: path.into(), }; index.collection.par_iter().for_each(|(dataset_id, _)| { @@ -108,7 +106,7 @@ impl RevIndex { read_only: bool, storage_spec: Option<&str>, ) -> Result { - let mut opts = db_options(); + let opts = db_options(); // prepare column family descriptors let cfs = cf_descriptors(); @@ -129,11 +127,7 @@ impl RevIndex { storage_spec, )?); - Ok(module::RevIndex::Plain(Self { - db, - collection, - path: path.as_ref().into(), - })) + Ok(module::RevIndex::Plain(Self { db, collection })) } fn load_collection_from_rocksdb( @@ -471,11 +465,14 @@ impl RevIndexOps for RevIndex { let new_storage = RocksDBStorage::from_db(self.db.clone()); // use manifest to copy from current storage to new one - self.collection().par_iter().for_each(|(_, record)| { - let path = record.internal_location().as_str(); - let sig_data = self.collection.storage().load(path).unwrap(); - new_storage.save(path, &sig_data); - }); + self.collection() + .par_iter() + .try_for_each(|(_, record)| -> Result<()> { + let path = record.internal_location().as_str(); + let sig_data = self.collection.storage().load(path).unwrap(); + new_storage.save(path, &sig_data)?; + Ok(()) + })?; // Replace storage for collection. // Using unchecked version because we just used the manifest diff --git a/src/core/src/index/revindex/mod.rs b/src/core/src/index/revindex/mod.rs index b46db02b3..d2a620716 100644 --- a/src/core/src/index/revindex/mod.rs +++ b/src/core/src/index/revindex/mod.rs @@ -426,6 +426,7 @@ mod test { use crate::collection::Collection; use crate::prelude::*; use crate::selection::Selection; + use crate::storage::{InnerStorage, RocksDBStorage}; use crate::Result; use super::{prepare_query, RevIndex, RevIndexOps}; @@ -931,7 +932,7 @@ mod test { assert_eq!(matches_external, matches_internal); } let new_path = outdir.path().join("new_index_path"); - std::fs::rename(output.as_path(), new_path.as_path()); + std::fs::rename(output.as_path(), new_path.as_path())?; let index = RevIndex::open(new_path, false, None)?; @@ -949,4 +950,75 @@ mod test { Ok(()) } + + #[test] + fn rocksdb_storage_from_path() -> Result<()> { + let basedir = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + + let mut zip_collection = basedir.clone(); + zip_collection.push("../../tests/test-data/track_abund/track_abund.zip"); + + let outdir = TempDir::new()?; + + let zip_copy = PathBuf::from( + outdir + .path() + .join("sigs.zip") + .into_os_string() + .into_string() + .unwrap(), + ); + std::fs::copy(zip_collection, zip_copy.as_path())?; + + let selection = Selection::builder().ksize(31).scaled(10000).build(); + let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?; + let output = outdir.path().join("index"); + + // Step 1: create an index + let index = RevIndex::create(output.as_path(), collection.try_into()?, false)?; + + // Step 2: internalize the storage for the index + { + let mut index = index; + index + .internalize_storage() + .expect("Error internalizing storage"); + } + + // Step 3: load rocksdb storage from path + // should have the same content as zipfile + + // Iter thru collection, make sure all records are present + let collection = Collection::from_zipfile(zip_copy.as_path())?.select(&selection)?; + assert_eq!(collection.len(), 2); + let col_storage = collection.storage(); + + let spec; + { + let rdb_storage = RocksDBStorage::from_path(output.as_os_str().to_str().unwrap()); + spec = rdb_storage.spec(); + collection.iter().for_each(|(_, r)| { + assert_eq!( + rdb_storage.load(r.internal_location().as_str()).unwrap(), + col_storage.load(r.internal_location().as_str()).unwrap() + ); + }); + } + + // Step 4: verify rocksdb storage spec + assert_eq!( + spec, + format!("rocksdb://{}", output.as_os_str().to_str().unwrap()) + ); + + let storage = InnerStorage::from_spec(spec)?; + collection.iter().for_each(|(_, r)| { + assert_eq!( + storage.load(r.internal_location().as_str()).unwrap(), + col_storage.load(r.internal_location().as_str()).unwrap() + ); + }); + + Ok(()) + } } diff --git a/src/core/src/storage/rocksdb.rs b/src/core/src/storage/rocksdb.rs index a849a6763..4145ab4c3 100644 --- a/src/core/src/storage/rocksdb.rs +++ b/src/core/src/storage/rocksdb.rs @@ -33,8 +33,6 @@ impl RocksDBStorage { let db = Arc::new(DB::open_cf_descriptors(&opts, path, cfs).unwrap()); - // TODO: save storage_args - Self { db } } @@ -62,7 +60,7 @@ impl Storage for RocksDBStorage { } fn spec(&self) -> String { - "rocksdb://".into() + format!("rocksdb://{}", self.db.path().display()) } }