diff --git a/src/mastiff_manygather.rs b/src/mastiff_manygather.rs index 03e054dc..c9fd2aad 100644 --- a/src/mastiff_manygather.rs +++ b/src/mastiff_manygather.rs @@ -34,6 +34,18 @@ pub fn mastiff_manygather( allow_failed_sigpaths, )?; + eprintln!( + "queries_file: {}\nquery_collection.len(): {}", + // "\nquery_collection.manifest().internal_locations(): {}", + queries_file, + query_collection.len(), + // query_collection.manifest().internal_locations().cloned().collect::>().join(", ") + ); + + query_collection.manifest().internal_locations().for_each(|location| + eprintln!("query collection, internal location: {}", location) + ); + // set up a multi-producer, single-consumer channel. let (send, recv) = std::sync::mpsc::sync_channel::(rayon::current_num_threads()); diff --git a/src/python/tests/test_gather.py b/src/python/tests/test_gather.py index 4ab4c6de..69399eb6 100644 --- a/src/python/tests/test_gather.py +++ b/src/python/tests/test_gather.py @@ -221,7 +221,7 @@ def test_bad_against(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 search paths failed to load. See error messages above." in captured.err @@ -247,7 +247,7 @@ def test_bad_against_2(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "Sketch loading error: File is too short, less than five bytes" in captured.err + # assert "Sketch loading error: File is too short, less than five bytes" in captured.err assert "WARNING: could not load sketches from path" in captured.err assert "WARNING: 1 search paths failed to load. See error messages above." in captured.err @@ -307,7 +307,7 @@ def test_against_multisigfile(runtmp, zip_against): print(df) else: print(df) - assert len(df) == 1 + assert len(df) == 3 # @CTB this is a bug :(. It should load multiple sketches properly! diff --git a/src/python/tests/test_index.py b/src/python/tests/test_index.py index f490d2aa..fc9a5a54 100644 --- a/src/python/tests/test_index.py +++ b/src/python/tests/test_index.py @@ -123,6 +123,32 @@ def test_index_manifest(runtmp, capfd): assert 'index is done' in runtmp.last_result.err +def test_index_manifest_zip_files(runtmp, capfd): + # test index with text file of multiple zip files + sig2 = get_test_data('2.fa.sig.gz') + sig47 = get_test_data('47.fa.sig.gz') + sig63 = get_test_data('63.fa.sig.gz') + sigs = [sig2, sig47, sig63] + manifests = [] + for sig in sigs: + sig_mf = runtmp.output(os.path.basename(sig) + ".mf.zip") + runtmp.sourmash("sig", "cat", sig, "-o", sig_mf) + manifests.append(sig_mf) + + # assert False + manifests_zips_list = runtmp.output('manifest_zips.txt') + make_file_list(manifests_zips_list, manifests) + + output = runtmp.output('out.db') + runtmp.sourmash('scripts', 'index', manifests_zips_list, + '-o', output) + + captured = capfd.readouterr() + print(captured.err) + print(runtmp.last_result.err) + assert 'index is done' in runtmp.last_result.err + + def test_index_bad_siglist_2(runtmp, capfd): # test with a bad siglist (containing a missing file) against_list = runtmp.output('against.txt') @@ -140,7 +166,7 @@ def test_index_bad_siglist_2(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err def test_index_empty_siglist(runtmp, capfd): diff --git a/src/python/tests/test_multigather.py b/src/python/tests/test_multigather.py index 1316d52a..15c2becf 100644 --- a/src/python/tests/test_multigather.py +++ b/src/python/tests/test_multigather.py @@ -396,7 +396,7 @@ def test_missing_query(runtmp, capfd, indexed): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 query paths failed to load. See error messages above." @@ -492,7 +492,7 @@ def test_bad_against(runtmp, capfd): against_list = runtmp.output('against.txt') sig2 = get_test_data('2.fa.sig.gz') - make_file_list(against_list, [sig2, "no exist"]) + make_file_list(against_list, [sig2, "no-exist"]) # should succeed, but with error output. runtmp.sourmash('scripts', 'fastmultigather', query_list, against_list, @@ -501,7 +501,7 @@ def test_bad_against(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 search paths failed to load. See error messages above." in captured.err @@ -548,7 +548,7 @@ def test_empty_against(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "Sketch loading error: No such file or directory" in captured.err + assert "WARNING: path '' does not exist in 'against.txt'" in captured.err assert "No search signatures loaded, exiting." in captured.err diff --git a/src/python/tests/test_multisearch.py b/src/python/tests/test_multisearch.py index 611b0f81..417b778f 100644 --- a/src/python/tests/test_multisearch.py +++ b/src/python/tests/test_multisearch.py @@ -312,7 +312,7 @@ def test_bad_query(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 query paths failed to load. See error messages above." in captured.err @@ -416,7 +416,7 @@ def test_bad_against(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 search paths failed to load. See error messages above." in captured.err diff --git a/src/python/tests/test_pairwise.py b/src/python/tests/test_pairwise.py index 3869b3d4..e4dcb639 100644 --- a/src/python/tests/test_pairwise.py +++ b/src/python/tests/test_pairwise.py @@ -220,7 +220,7 @@ def test_bad_query(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 analysis paths failed to load. See error messages above." in captured.err diff --git a/src/python/tests/test_search.py b/src/python/tests/test_search.py index 8e08e714..fc443643 100644 --- a/src/python/tests/test_search.py +++ b/src/python/tests/test_search.py @@ -350,7 +350,7 @@ def test_bad_query_2(runtmp, capfd, indexed): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 query paths failed to load. See error messages above." in captured.err @@ -449,7 +449,7 @@ def test_bad_against(runtmp, capfd): captured = capfd.readouterr() print(captured.err) - assert "WARNING: could not load sketches from path 'no-exist'" in captured.err + assert "WARNING: path 'no-exist' does not exist" in captured.err assert "WARNING: 1 search paths failed to load. See error messages above." in captured.err diff --git a/src/utils.rs b/src/utils.rs index 2569c085..309aab83 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -20,6 +20,7 @@ use std::sync::atomic::AtomicUsize; use sourmash::ani_utils::{ani_ci_from_containment, ani_from_containment}; use sourmash::collection::Collection; use sourmash::manifest::{Manifest, Record}; +use sourmash::prelude::Storage; use sourmash::selection::Selection; use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::KmerMinHash; @@ -542,6 +543,59 @@ pub fn collection_from_zipfile(sigpath: &Path, report_type: &ReportType) -> Resu } } +// Make a collection from anything except a pathlist, as this is called +// from collection_from_pathlist +fn collection_from_any_zipfile_or_signature_or_manifest( + sigpath: &Path, + report_type: &ReportType, + allow_failed: bool, +) -> Result, Option> { + // returns collection and number of failures + let mut last_error = None; + let collection = if sigpath.extension().map_or(false, |ext| ext == "zip") { + match collection_from_zipfile(&sigpath, &report_type) { + Ok(coll) => Some(coll), + Err(e) => { + last_error = Some(e); + None + } + } + } else { + None + }; + + let collection = + collection.or_else(|| match collection_from_manifest(&sigpath, &report_type) { + Ok(coll) => Some(coll), + Err(e) => { + last_error = Some(e); + None + } + }); + + let collection = + collection.or_else(|| match collection_from_signature(&sigpath, &report_type) { + Ok(coll) => Some(coll), + Err(e) => { + last_error = Some(e); + None + } + }); + + match collection { + Some(collection) => Ok(Some(collection)), + None => { + if let Some(e) = last_error { + eprintln!("WARNING: could not load sketches from path '{}'", sigpath); + Err(Some(e)) + } else { + // Should never get here + Err(anyhow!("Unable to load the collection for an unknown reason.").into()) + } + } + } +} + fn collection_from_manifest( sigpath: &Path, report_type: &ReportType, @@ -577,66 +631,139 @@ fn collection_from_manifest( fn collection_from_pathlist( sigpath: &Path, report_type: &ReportType, + allow_failed: bool, ) -> Result<(Collection, usize), anyhow::Error> { + // load sketches from paths in parallel. + let n_failed = AtomicUsize::new(0); + let file = File::open(sigpath).with_context(|| { + let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); format!( "Failed to open {} pathlist file: '{}'", report_type, sigpath ) })?; - let reader = BufReader::new(file); + + let reader_new = BufReader::new(file); + let mut last_error: std::option::Option = None; // load list of paths - let lines: Vec<_> = reader + let path_bufs: Vec<_> = reader_new .lines() .filter_map(|line| match line { - Ok(path) => Some(path), - Err(_err) => None, + Ok(l) => { + let path_buf = PathBuf::from(l.clone()); + match path_buf.exists() { + true => Some(path_buf), + false => { + eprintln!("WARNING: path '{}' does not exist in '{}'", l, sigpath); + let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); + None + } + } + } + Err(err) => { + let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); + None + } }) .collect(); - // load sketches from paths in parallel. - let n_failed = AtomicUsize::new(0); - let records: Vec = lines + // Load all entries as collections + let collections = path_bufs .par_iter() - .filter_map(|path| match Signature::from_path(path) { - Ok(signatures) => { - let recs: Vec = signatures - .into_iter() - .flat_map(|v| Record::from_sig(&v, path)) - .collect(); - Some(recs) + .filter_map(|path| { + match collection_from_any_zipfile_or_signature_or_manifest( + &path, + &report_type, + allow_failed, + ) { + Ok(collection) => Some(collection), + Err(err) => { + // eprintln!("collection_from_any_zipfile_or_signature_or_manifest WARNING: could not load sketches from path '{}'", path); + let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); + None + } } + }) + .flatten() + .collect::>(); + + // get all the signatures from each collection + let flattened_signatures = collections + .par_iter() + .filter_map(|collection| match collection_to_signatures(&collection) { + Ok(pathlist) => Some(pathlist), Err(err) => { - eprintln!("Sketch loading error: {}", err); - eprintln!("WARNING: could not load sketches from path '{}'", path); + eprintln!("WARNING: could not get signatures for collection "); let _ = n_failed.fetch_add(1, atomic::Ordering::SeqCst); None } }) .flatten() - .collect(); + .collect::>(); - if records.is_empty() { - eprintln!( - "No valid signatures found in {} pathlist '{}'", + // // Now load the path filenames as one big collection + + let collection = Collection::from_sigs(flattened_signatures).with_context(|| { + format!( + "Loaded {} signatures but failed to load as collection: '{}'", report_type, sigpath - ); - } + ) + }); - let manifest: Manifest = records.into(); - let collection = Collection::new( - manifest, - InnerStorage::new( - FSStorage::builder() - .fullpath("".into()) - .subdir("".into()) - .build(), - ), - ); let n_failed = n_failed.load(atomic::Ordering::SeqCst); - Ok((collection, n_failed)) + Ok((collection?, n_failed)) +} + +// Convert a collection into a list of paths that can then be read to create a collection +fn collection_to_signatures(collection: &Collection) -> Result, anyhow::Error> { + // For each record in the collection, get its path filename + let signatures: Vec = collection + .manifest() + .iter() + .filter_map(|record| match collection.sig_from_record(&record) { + Ok(sigstore) => { + eprintln!( + "\n:) Hello in collection_to_signatures, record.internal_location(): {}, record.filename(): {}", + record.internal_location(), record.filename() + ); + eprintln!("record.md5sum(): {}", record.md5()); + let sig = Signature::from(sigstore); + eprintln!( + ":) Hello in collection_to_signatures, created signature, signature filename: {}", + sig.filename() + ); + eprintln!("sig.md5sum(): {}", sig.md5sum()); + Some(sig) + } + Err(_err) => { + eprintln!( + "\n:( Hello in collection_to_signatures Error (None), record.internal_location(): {}, record.filename(): {}", + record.internal_location(), record.filename() + ); + None + }, + // record.filename() + }) + // .map(|record| record.filename()); + // internal_location should create Vec> + .collect::>(); + + eprintln!("signatures, {}", signatures.len()); + + // Collection::from_sigs(signatures); + + // let pathlist = &filenames + // .iter() + // .map(|filename| PathBuf::from(&filename)) + // .flatten(); + + // let random_strings =vec!["abc".to_string(), "def".to_string()]; + + // Ok(random_strings) + Ok(signatures) } fn collection_from_signature(sigpath: &Path, report_type: &ReportType) -> Result { @@ -705,14 +832,15 @@ pub fn load_collection( } }); - let collection = - collection.or_else(|| match collection_from_pathlist(&sigpath, &report_type) { + let collection = collection.or_else(|| { + match collection_from_pathlist(&sigpath, &report_type, allow_failed) { Ok((coll, n_failed)) => Some((coll, n_failed)), Err(e) => { last_error = Some(e); None } - }); + } + }); match collection { Some((coll, n_failed)) => {