diff --git a/Cargo.lock b/Cargo.lock index 30ce9d9a..e6b2455f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,6 +392,26 @@ version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e93abca9e28e0a1b9877922aacb20576e05d4679ffa78c3d6dc22a26a216659" +[[package]] +name = "bzip2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" +dependencies = [ + "bzip2-sys", + "libc", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.13+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225bff33b2141874fe80d71e07d6eec4f85c5c216453dd96388240f96e1acc14" +dependencies = [ + "cc", + "pkg-config", +] + [[package]] name = "cast" version = "0.3.0" @@ -405,6 +425,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac9fe6cdbb24b6ade63616c0a0688e45bb56732262c158df3c0c4bea4ca47cb7" dependencies = [ "find-msvc-tools", + "jobserver", + "libc", "shlex", ] @@ -1085,7 +1107,7 @@ dependencies = [ "parallel-processor", "parking_lot", "rayon", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "typenum", ] @@ -1115,7 +1137,7 @@ dependencies = [ "parking_lot", "rayon", "rustc-hash", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "typenum", ] @@ -1131,7 +1153,7 @@ dependencies = [ "ggcat_minimizer_bucketing", "ggcat_structs", "parallel-processor", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "typenum", ] @@ -1161,7 +1183,7 @@ dependencies = [ "parallel-processor", "parking_lot", "rayon", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "traitgraph-algo", "typenum", ] @@ -1211,6 +1233,7 @@ dependencies = [ "serde", "serde_json", "siphasher", + "zstd", ] [[package]] @@ -1245,7 +1268,7 @@ dependencies = [ "rustc-hash", "serde", "siphasher", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "typenum", ] @@ -1280,7 +1303,7 @@ dependencies = [ "parallel-processor", "parking_lot", "rayon", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "typenum", ] @@ -1308,6 +1331,7 @@ dependencies = [ "bitbound", "bstr", "byteorder", + "bzip2", "dynamic-dispatch", "flate2", "ggcat-logging", @@ -1321,8 +1345,10 @@ dependencies = [ "rand 0.8.5", "rustc-hash", "serde", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "typenum", + "xz2", + "zstd", ] [[package]] @@ -1340,7 +1366,7 @@ dependencies = [ "parallel-processor", "parking_lot", "replace_with", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "tokio", "typenum", ] @@ -1363,7 +1389,7 @@ dependencies = [ "parking_lot", "replace_with", "rustc-hash", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "tokio", "typenum", ] @@ -1392,8 +1418,9 @@ dependencies = [ "parallel-processor", "parking_lot", "rayon", - "streaming-libdeflate-rs 0.2.3", + "streaming-libdeflate-rs", "typenum", + "zstd", ] [[package]] @@ -1620,6 +1647,16 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jobserver" +version = "0.1.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +dependencies = [ + "getrandom 0.3.4", + "libc", +] + [[package]] name = "js-sys" version = "0.3.81" @@ -1712,6 +1749,17 @@ dependencies = [ "libc", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "make-cmd" version = "0.1.0" @@ -2074,6 +2122,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "plotters" version = "0.3.7" @@ -2588,18 +2642,6 @@ dependencies = [ "rand 0.8.5", ] -[[package]] -name = "streaming-libdeflate-rs" -version = "0.2.2" -dependencies = [ - "crc32fast", - "filebuffer", - "nightly-quirks", - "rayon", - "static_assertions", - "structopt", -] - [[package]] name = "streaming-libdeflate-rs" version = "0.2.3" @@ -3334,6 +3376,15 @@ dependencies = [ "tap", ] +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "zerocopy" version = "0.8.27" @@ -3353,3 +3404,31 @@ dependencies = [ "quote 1.0.41", "syn 2.0.108", ] + +[[package]] +name = "zstd" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91ee311a569c327171651566e07972200e76fcfe2242a4fa446149a3881c08a" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "7.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f49c4d5f0abb602a93fb8736af2a4f4dd9512e36f7f570d66e65ff867ed3b9d" +dependencies = [ + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "2.0.16+zstd.1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/README.md b/README.md index d61e1e68..04b09611 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,12 @@ conda install -c conda-forge -c bioconda ggcat ## Tool usage +Top-level commands: + +``` +ggcat --help +``` + ### Build a new graph To build a new graph with a specified k of some input files, run: @@ -31,6 +37,13 @@ Or if you have a file with a list of input files: ggcat build -k -j -l -o ``` +Output compression is inferred from the output extension: +- `.lz4` (default) +- `.gz` +- `.zst` / `.zstd` + +Use `--output-compression-level ` to tune the final output compression level. + #### Building a colored graph To build a colored graph, add the `-c` flag to the above commands @@ -54,6 +67,28 @@ Then the graph can be built with the command: ggcat build -k -j -c -d color_mapping.in -o ``` +### Direct colored FASTA export (sorted by color bitsets) + +GGCAT can export a sorted colored FASTA directly from input reads/genomes in one command (without first writing/re-reading a graph FASTA): + +``` +ggcat build-colored-fasta -k -j -c -o +``` + +It can also post-process an existing colored graph FASTA: + +``` +ggcat dump-colored-fasta -o +``` + +The output headers contain explicit bitsets: + +``` +>0 BS:Z:00101 +``` + +Both commands support `.zst/.zstd` output and `--output-compression-level`. + #### Building links To build links between maximal unitigs in BCALM2 like format, use the `-e` flag @@ -94,6 +129,8 @@ Options: Maximum suggested memory usage (GB) The tool will try use only up to this GB of memory to store temporary files without writing to disk. This usage does not include the needed memory for the processing steps. GGCAT can allocate extra memory for files if the current memory is not enough to complete the current operation [default: 2] -p, --prefer-memory Use all the given memory before writing to disk + --output-compression-level + Compression level for final output files (applies to .lz4/.gz/.zst/.zstd) [default: 2] -h, --help Print help @@ -173,6 +210,8 @@ Options: Maximum suggested memory usage (GB) The tool will try use only up to this GB of memory to store temporary files without writing to disk. This usage does not include the needed memory for the processing steps. GGCAT can allocate extra memory for files if the current memory is not enough to complete the current operation [default: 2] -p, --prefer-memory Use all the given memory before writing to disk + --output-compression-level + Compression level for final output files (applies to .lz4/.gz/.zst/.zstd) [default: 2] -h, --help Print help diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index aa795f7b..2a2d3ea0 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -10,6 +10,7 @@ use config::{KEEP_FILES, MEMORY_THRESHOLD_CLEAR_START_OFFSET, MINIMUM_GLOBAL_MEM pub use ggcat_logging::MessageLevel; use ggcat_logging::{UnrecoverableErrorLogging, info, warn}; use io::concurrent::structured_sequences::StructuredSequenceBackendWrapper; +use io::concurrent::structured_sequences::color_records::ColorRecordsWriterWrapper; use io::concurrent::structured_sequences::fasta::FastaWriterWrapper; use io::concurrent::structured_sequences::gfa::{GFAWriterWrapperV1, GFAWriterWrapperV2}; use io::sequences_stream::GenericSequencesStream; @@ -321,6 +322,85 @@ impl GGCATInstance { Ok(output_file) } + /// Builds a colored graph writing raw color-records as output. + /// This is intended as an internal fast path for post-processing into + /// custom colored FASTA exports. + pub fn build_graph_color_records( + &self, + input_streams: Vec, + output_file: PathBuf, + color_names: Option<&[String]>, + kmer_length: usize, + threads_count: usize, + forward_only: bool, + minimizer_length: Option, + min_multiplicity: usize, + extra_elab: ExtraElaboration, + ) -> anyhow::Result { + let merging_hash_dispatch = utils::get_hash_static_id( + debug::DEBUG_HASH_TYPE.lock().clone(), + kmer_length, + forward_only, + ); + + let colors_hash = ColorBundleMultifileBuilding::dynamic_dispatch_id(); + let output_mode = ColorRecordsWriterWrapper::dynamic_dispatch_id(); + + let first_step = debug::DEBUG_ASSEMBLER_FIRST_STEP.lock().clone(); + let last_step = debug::DEBUG_ASSEMBLER_LAST_STEP.lock().clone(); + let temp_dir = if first_step == AssemblerPhase::default() { + create_tempdir(self.0.temp_dir.clone()) + } else { + Some( + self.0 + .temp_dir + .clone() + .expect("while testing successive phases the temp dir must be specified"), + ) + }; + + if let Some(temp_dir) = &temp_dir { + info!("Temporary directory: {}", temp_dir.display()); + } else { + warn!("No temporary directory specified, writing to cwd!"); + } + + let output_file = assembler::dynamic_dispatch::run_assembler( + (merging_hash_dispatch, colors_hash, output_mode), + kmer_length, + minimizer_length.unwrap_or(::utils::compute_best_m(kmer_length)), + first_step, + last_step, + input_streams, + color_names.unwrap_or(&[]), + output_file, + temp_dir.clone(), + threads_count, + min_multiplicity, + *debug::BUCKETS_COUNT_LOG_FORCE.lock(), + self.0.intermediate_compression_level, + extra_elab == ExtraElaboration::UnitigLinks, + match extra_elab { + ExtraElaboration::GreedyMatchtigs => Some(assembler::MatchtigMode::GreedyTigs), + ExtraElaboration::Eulertigs => Some(assembler::MatchtigMode::EulerTigs), + ExtraElaboration::Pathtigs => Some(assembler::MatchtigMode::PathTigs), + ExtraElaboration::FastSimplitigs => Some(assembler::MatchtigMode::FastSimpliTigs), + ExtraElaboration::FastEulertigs => Some(assembler::MatchtigMode::FastEulerTigs), + _ => None, + }, + debug::DEBUG_ONLY_BSTATS.load(Ordering::Relaxed), + forward_only, + )?; + + if last_step == AssemblerPhase::FinalStep && !KEEP_FILES.load(Ordering::Relaxed) { + remove_tempdir(temp_dir); + } else { + info!("Keeping temp dir at {:?}", temp_dir); + } + + Ok(output_file) + } + /// Queries a (optionally) colored graph with a specific set of sequences as queries pub fn query_graph( &self, diff --git a/crates/assembler/src/lib.rs b/crates/assembler/src/lib.rs index 8aa970be..a63e095a 100644 --- a/crates/assembler/src/lib.rs +++ b/crates/assembler/src/lib.rs @@ -29,6 +29,7 @@ use io::concurrent::structured_sequences::StructuredSequenceBackendWrapper; use io::concurrent::structured_sequences::StructuredSequenceWriter; use io::concurrent::structured_sequences::binary::StructSeqBinaryWriter; use io::concurrent::structured_sequences::binary::StructSeqBinaryWriterWrapper; +use io::concurrent::structured_sequences::color_records::ColorRecordsWriterWrapper; use io::concurrent::structured_sequences::fasta::FastaWriterWrapper; use io::concurrent::structured_sequences::gfa::GFAWriterWrapperV1; use io::concurrent::structured_sequences::gfa::GFAWriterWrapperV2; @@ -76,6 +77,7 @@ pub use assembler_pipeline::compute_matchtigs::MatchtigMode; colors::non_colored::NonColoredManager, ], OutputMode = [ FastaWriterWrapper, + ColorRecordsWriterWrapper, #[cfg(feature = "enable-gfa")] GFAWriterWrapperV1, #[cfg(feature = "enable-gfa")] GFAWriterWrapperV2 ])] diff --git a/crates/assembler_kmers_merge/src/lib.rs b/crates/assembler_kmers_merge/src/lib.rs index 845d306f..06cfdf04 100644 --- a/crates/assembler_kmers_merge/src/lib.rs +++ b/crates/assembler_kmers_merge/src/lib.rs @@ -16,6 +16,7 @@ use config::{ use hashes::HashFunctionFactory; use hashes::default::MNHFactory; use io::concurrent::structured_sequences::binary::StructSeqBinaryWriterWrapper; +use io::concurrent::structured_sequences::color_records::ColorRecordsWriterWrapper; use io::concurrent::structured_sequences::fasta::FastaWriterWrapper; use io::concurrent::structured_sequences::gfa::{GFAWriterWrapperV1, GFAWriterWrapperV2}; use io::concurrent::structured_sequences::{ @@ -153,6 +154,7 @@ impl< ], OM = [ StructSeqBinaryWriterWrapper, FastaWriterWrapper, + ColorRecordsWriterWrapper, #[cfg(feature = "enable-gfa")] GFAWriterWrapperV1, #[cfg(feature = "enable-gfa")] GFAWriterWrapperV2, ])] diff --git a/crates/assembler_pipeline/src/lib.rs b/crates/assembler_pipeline/src/lib.rs index 08335603..f87b0e56 100644 --- a/crates/assembler_pipeline/src/lib.rs +++ b/crates/assembler_pipeline/src/lib.rs @@ -4,11 +4,12 @@ use std::{ any::Any, path::{Path, PathBuf}, sync::Arc, + sync::atomic::Ordering, }; use ::dynamic_dispatch::dynamic_dispatch; use colors::colors_manager::{ColorsManager, color_types::PartialUnitigsColorStructure}; -use config::{SwapPriority, get_compression_level_info, get_memory_mode}; +use config::{OUTPUT_COMPRESSION_LEVEL, SwapPriority, get_compression_level_info, get_memory_mode}; use hashes::HashFunctionFactory; use io::{ concurrent::{ @@ -16,6 +17,7 @@ use io::{ IdentSequenceWriter, StructuredSequenceBackend, StructuredSequenceBackendInit, StructuredSequenceBackendWrapper, StructuredSequenceWriter, binary::StructSeqBinaryWriter, + color_records::ColorRecordsWriterWrapper, fasta::FastaWriterWrapper, gfa::{GFAWriterWrapperV1, GFAWriterWrapperV2}, }, @@ -73,10 +75,15 @@ pub fn get_final_output_writer< >( output_file: &Path, ) -> W { + let output_compression_level = OUTPUT_COMPRESSION_LEVEL.load(Ordering::Relaxed); + match output_file.extension() { Some(ext) => match ext.to_string_lossy().to_string().as_str() { - "lz4" => W::new_compressed_lz4(&output_file, 2), - "gz" => W::new_compressed_gzip(&output_file, 2), + "lz4" => W::new_compressed_lz4(&output_file, output_compression_level.min(16)), + "gz" => W::new_compressed_gzip(&output_file, output_compression_level.min(9)), + "zst" | "zstd" => { + W::new_compressed_zstd(&output_file, output_compression_level.min(22)) + } _ => W::new_plain(&output_file), }, None => W::new_plain(&output_file), @@ -99,6 +106,7 @@ pub fn get_final_output_writer< colors::non_colored::NonColoredManager, ], OutputMode = [ FastaWriterWrapper, + ColorRecordsWriterWrapper, #[cfg(feature = "enable-gfa")] GFAWriterWrapperV1, #[cfg(feature = "enable-gfa")] GFAWriterWrapperV2 ])] diff --git a/crates/cmdline/Cargo.toml b/crates/cmdline/Cargo.toml index 787aed82..03e4d049 100644 --- a/crates/cmdline/Cargo.toml +++ b/crates/cmdline/Cargo.toml @@ -15,6 +15,7 @@ byteorder = "1.5.0" itertools = "0.13.0" lazy_static = "1.5.0" lz4 = "1.25.0" +zstd = "0.13.2" rayon = "1.10.0" serde = "1.0.203" hashbrown = "0.14.5" diff --git a/crates/cmdline/src/main.rs b/crates/cmdline/src/main.rs index d888b2cd..81e27efc 100644 --- a/crates/cmdline/src/main.rs +++ b/crates/cmdline/src/main.rs @@ -11,13 +11,19 @@ use colors::DefaultColorsSerializer; use colors::colors_manager::ColorMapReader; use colors::storage::deserializer::ColorsDeserializer; use config::ColorIndexType; +use config::OUTPUT_COMPRESSION_LEVEL; use ggcat_api::{ExtraElaboration, GGCATConfig, GGCATInstance, GfaVersion}; use ggcat_logging::UnrecoverableErrorLogging; +use io::sequences_stream::GenericSequencesStream; +use io::sequences_stream::fasta::FastaFileSequencesStream; use io::sequences_stream::general::GeneralSequenceBlockData; use parallel_processor::memory_fs::MemoryFs; -use std::fs::File; +use rayon::slice::ParallelSliceMut; +use std::cmp::Ordering as CmpOrdering; +use std::collections::BinaryHeap; +use std::fs::{self, File}; use std::io::BufRead; -use std::io::{BufReader, BufWriter, Write}; +use std::io::{BufReader, BufWriter, Read, Write}; use std::panic; use std::path::{Path, PathBuf}; use std::process::exit; @@ -57,8 +63,12 @@ pub enum HashType { enum CliArgs { /// Build a new compacted graph Build(AssemblerArgs), + /// Build and directly export sorted colored FASTA with explicit BS:Z bitsets (single command, no dump/reparse step) + BuildColoredFasta(AssemblerArgs), /// Query a compacted graph Query(QueryArgs), + /// Dump graph sequences to FASTA with explicit BS:Z color bitsets, sorted by colors + DumpColoredFasta(DumpColoredFastaArgs), /// Dump all color names from a colormap DumpColors(DumpColorsArgs), Matches(MatchesArgs), @@ -134,6 +144,10 @@ struct CommonArgs { )] pub intermediate_compression_level: Option, + /// Compression level for final output files (applies to .lz4/.gz/.zst/.zstd) + #[arg(long = "output-compression-level", default_value = "2")] + pub output_compression_level: u32, + #[arg(hide = true, long = "only-bstats")] pub only_bstats: bool, } @@ -148,6 +162,7 @@ struct AssemblerArgs { #[arg(short = 'l', long = "input-lists")] pub input_lists: Vec, + /// Final output path. Compression is inferred from extension: .lz4, .gz, .zst, .zstd #[arg(short = 'o', long = "output-file", default_value = "output.fasta.lz4")] pub output_file: PathBuf, @@ -240,6 +255,35 @@ struct DumpColorsArgs { output_file: PathBuf, } +#[derive(Parser, Debug)] +struct DumpColoredFastaArgs { + /// The input graph + pub input_graph: PathBuf, + + /// The output FASTA file (.zst/.zstd supported) + #[arg( + short = 'o', + long = "output-file", + default_value = "output.colored.fasta" + )] + pub output_file: PathBuf, + + /// Directory for temporary files + #[arg(short = 't', long = "temp-dir", default_value = ".temp_files")] + pub temp_dir: PathBuf, + + /// Keep intermediate temporary files for debugging purposes + #[arg(long = "keep-temp-files", help_heading = "Advanced Options")] + pub keep_temp_files: bool, + + #[arg(short = 'j', long, default_value = "16")] + pub threads_count: usize, + + /// Compression level for output compression (.zst/.zstd) + #[arg(long = "output-compression-level", default_value = "2")] + pub output_compression_level: u32, +} + /// Format of the queries output #[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] pub enum ColoredQueryOutputFormat { @@ -259,6 +303,7 @@ struct QueryArgs { #[arg(short, long)] pub colors: bool, + /// Output prefix/path. For colored output, compression is inferred from extension: .lz4, .gz, .zst, .zstd #[arg(short = 'o', long = "output-file-prefix", default_value = "output")] pub output_file_prefix: PathBuf, @@ -293,6 +338,7 @@ fn initialize(args: &CommonArgs, out_file: &PathBuf) -> &'static GGCATInstance { ggcat_api::debug::DEBUG_KEEP_FILES.store(args.keep_temp_files, Ordering::Relaxed); *ggcat_api::debug::BUCKETS_COUNT_LOG_FORCE.lock() = args.buckets_count_log; ggcat_api::debug::DEBUG_ONLY_BSTATS.store(args.only_bstats, Ordering::Relaxed); + OUTPUT_COMPRESSION_LEVEL.store(args.output_compression_level, Ordering::Relaxed); *ggcat_api::debug::DEBUG_HASH_TYPE.lock() = match args.hash_type { HashType::Auto => ggcat_api::HashType::Auto, HashType::SeqHash => ggcat_api::HashType::SeqHash, @@ -327,7 +373,26 @@ fn convert_assembler_step(step: AssemblerStartingStep) -> utils::assembler_phase } } +fn get_extra_elab(args: &AssemblerArgs) -> ExtraElaboration { + if args.generate_maximal_unitigs_links { + ExtraElaboration::UnitigLinks + } else if args.greedy_matchtigs { + ExtraElaboration::GreedyMatchtigs + } else if args.old_eulertigs { + ExtraElaboration::Eulertigs + } else if args.old_simplitigs { + ExtraElaboration::Pathtigs + } else if args.simplitigs { + ExtraElaboration::FastSimplitigs + } else if args.eulertigs { + ExtraElaboration::FastEulertigs + } else { + ExtraElaboration::None + } +} + fn run_assembler_from_args(instance: &GGCATInstance, args: AssemblerArgs) { + let extra_elab = get_extra_elab(&args); let mut inputs: Vec<_> = args.input.iter().cloned().map(|f| (f, None)).collect(); if (args.input_lists.len() > 0 || args.input.len() > 0) && args.colored_input_lists.len() > 0 { @@ -444,8 +509,10 @@ fn run_assembler_from_args(instance: &GGCATInstance, args: AssemblerArgs) { .map(|x| GeneralSequenceBlockData::FASTA(x)) .collect(); - *ggcat_api::debug::DEBUG_ASSEMBLER_FIRST_STEP.lock() = convert_assembler_step(args.step); - *ggcat_api::debug::DEBUG_ASSEMBLER_LAST_STEP.lock() = convert_assembler_step(args.last_step); + *ggcat_api::debug::DEBUG_ASSEMBLER_FIRST_STEP.lock() = + convert_assembler_step(args.step.clone()); + *ggcat_api::debug::DEBUG_ASSEMBLER_LAST_STEP.lock() = + convert_assembler_step(args.last_step.clone()); let output_file = instance .build_graph( @@ -458,21 +525,7 @@ fn run_assembler_from_args(instance: &GGCATInstance, args: AssemblerArgs) { args.common_args.minimizer_length, args.colors, args.min_multiplicity, - if args.generate_maximal_unitigs_links { - ExtraElaboration::UnitigLinks - } else if args.greedy_matchtigs { - ExtraElaboration::GreedyMatchtigs - } else if args.old_eulertigs { - ExtraElaboration::Eulertigs - } else if args.old_simplitigs { - ExtraElaboration::Pathtigs - } else if args.simplitigs { - ExtraElaboration::FastSimplitigs - } else if args.eulertigs { - ExtraElaboration::FastEulertigs - } else { - ExtraElaboration::None - }, + extra_elab, if args.gfa_output_v1 { Some(GfaVersion::V1) } else if args.gfa_output_v2 { @@ -488,6 +541,161 @@ fn run_assembler_from_args(instance: &GGCATInstance, args: AssemblerArgs) { println!("Final output saved to: {}", output_file.display()); } +fn run_build_colored_fasta_from_args(instance: &GGCATInstance, args: AssemblerArgs) { + let mut inputs: Vec<_> = args.input.iter().cloned().map(|f| (f, None)).collect(); + + if (args.input_lists.len() > 0 || args.input.len() > 0) && args.colored_input_lists.len() > 0 { + println!("Cannot specify both colored input lists and other files/lists"); + exit(1); + } + + if args.gfa_output_v1 || args.gfa_output_v2 { + println!("GFA output is not compatible with build-colored-fasta"); + exit(1); + } + + for list in args.input_lists.clone() { + for input in BufReader::new( + File::open(&list) + .log_unrecoverable_error_with_data( + "Error while opening input list file", + list.display(), + ) + .unwrap(), + ) + .lines() + { + if let Ok(input) = input { + if input.trim().is_empty() { + continue; + } + + let input = if >::as_ref(&input).is_relative() { + list.parent().unwrap().join(input) + } else { + PathBuf::from(input) + }; + + inputs.push((input, None)); + } + } + } + + let color_names: Vec<_> = if args.colored_input_lists.is_empty() { + inputs + .iter() + .map(|f| f.0.file_name().unwrap().to_string_lossy().to_string()) + .collect() + } else { + let mut colors = HashMap::default(); + let mut next_index = 0; + + for list in args.colored_input_lists.clone() { + for input in BufReader::new( + File::open(&list) + .map_err(|e| { + panic!( + "Error while opening colored input list file {}: {}", + list.display(), + e + ) + }) + .unwrap(), + ) + .lines() + { + if let Ok(input) = input { + if input.trim().is_empty() { + continue; + } + + let parts = input.split("\t").collect::>(); + if parts.len() < 2 { + println!("Invalid line in colored input list: {}", input); + exit(1); + } + + let color_name = parts[0..parts.len() - 1].join("\t"); + let file_name = parts.last().unwrap().to_string(); + + let index = *colors.entry(color_name).or_insert_with(|| { + let index = next_index; + next_index += 1; + index + }); + + println!( + "Add index with color {} => {}", + parts[0..parts.len() - 1].join("\t"), + index + ); + + let file_name = if >::as_ref(&file_name).is_relative() { + list.parent().unwrap().join(file_name) + } else { + PathBuf::from(file_name) + }; + + inputs.push((file_name, Some(index))); + } + } + } + let mut colors: Vec<_> = colors.into_iter().collect(); + colors.sort_by_key(|(_, i)| *i); + colors.into_iter().map(|(c, _)| c).collect() + }; + + if inputs.is_empty() { + println!("ERROR: No input files specified!"); + exit(1); + } + + let inputs: Vec<_> = inputs + .into_iter() + .map(GeneralSequenceBlockData::FASTA) + .collect(); + + let extra_elab = get_extra_elab(&args); + *ggcat_api::debug::DEBUG_ASSEMBLER_FIRST_STEP.lock() = + convert_assembler_step(args.step.clone()); + *ggcat_api::debug::DEBUG_ASSEMBLER_LAST_STEP.lock() = + convert_assembler_step(args.last_step.clone()); + + let raw_records_file = args.output_file.with_extension("records.tmp"); + let output_file = instance + .build_graph_color_records( + inputs, + raw_records_file.clone(), + Some(&color_names), + args.common_args.kmer_length, + args.common_args.threads_count, + args.common_args.forward_only, + args.common_args.minimizer_length, + args.min_multiplicity, + extra_elab, + ) + .unwrap(); + + let colormap_file = GGCATInstance::get_colormap_file(&output_file); + convert_color_records_to_bitset_fasta( + &output_file, + &colormap_file, + &args.output_file, + &args.common_args.temp_dir, + args.common_args.keep_temp_files, + args.common_args.threads_count, + args.common_args.output_compression_level, + ); + + if !args.common_args.keep_temp_files { + let _ = fs::remove_file(&output_file); + let _ = fs::remove_file(&colormap_file); + } + + ggcat_logging::stats::write_stats(&args.output_file.with_extension("elab.stats.json")); + println!("Final output saved to: {}", args.output_file.display()); +} + fn convert_querier_step(step: QuerierStartingStep) -> querier::QuerierStartingStep { match step { QuerierStartingStep::MinimizerBucketing => querier::QuerierStartingStep::MinimizerBucketing, @@ -525,6 +733,430 @@ fn run_querier_from_args(instance: &GGCATInstance, args: QueryArgs) -> PathBuf { .unwrap() } +#[derive(Eq, PartialEq)] +struct SortedRecord { + key: Vec, + seq: Vec, +} + +#[derive(Eq, PartialEq)] +struct MergeHeapItem { + record: SortedRecord, + chunk_index: usize, +} + +impl Ord for MergeHeapItem { + fn cmp(&self, other: &Self) -> CmpOrdering { + other + .record + .key + .cmp(&self.record.key) + .then_with(|| other.record.seq.cmp(&self.record.seq)) + } +} + +impl PartialOrd for MergeHeapItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +fn parse_color_subsets_from_ident(ident: &[u8], out: &mut Vec) { + out.clear(); + for token in ident.split(|b| *b == b' ') { + if token.len() < 4 || token[0] != b'C' || token[1] != b':' { + continue; + } + + let Some(color_sep) = token[2..].iter().position(|b| *b == b':') else { + continue; + }; + let color_hex = &token[2..(2 + color_sep)]; + let color_hex = std::str::from_utf8(color_hex).unwrap(); + let color_subset = ColorIndexType::from_str_radix(color_hex, 16).unwrap(); + out.push(color_subset); + } + + out.sort_unstable(); + out.dedup(); +} + +fn write_record(writer: &mut BufWriter, record: &SortedRecord) { + writer + .write_all(&(record.key.len() as u32).to_le_bytes()) + .unwrap(); + writer + .write_all(&(record.seq.len() as u32).to_le_bytes()) + .unwrap(); + writer.write_all(&record.key).unwrap(); + writer.write_all(&record.seq).unwrap(); +} + +fn read_record(reader: &mut BufReader) -> Option { + let mut len_buffer = [0u8; 4]; + match reader.read_exact(&mut len_buffer) { + Ok(_) => {} + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => return None, + Err(err) => panic!("Cannot read chunk key length: {err}"), + } + let key_len = u32::from_le_bytes(len_buffer) as usize; + + reader.read_exact(&mut len_buffer).unwrap(); + let seq_len = u32::from_le_bytes(len_buffer) as usize; + + let mut key = vec![0u8; key_len]; + reader.read_exact(&mut key).unwrap(); + + let mut seq = vec![0u8; seq_len]; + reader.read_exact(&mut seq).unwrap(); + + Some(SortedRecord { key, seq }) +} + +fn read_color_record(reader: &mut BufReader) -> Option<(Vec, Vec)> { + let mut len_buffer = [0u8; 4]; + match reader.read_exact(&mut len_buffer) { + Ok(_) => {} + Err(err) if err.kind() == std::io::ErrorKind::UnexpectedEof => return None, + Err(err) => panic!("Cannot read color-record subset length: {err}"), + } + let subset_count = u32::from_le_bytes(len_buffer) as usize; + + reader.read_exact(&mut len_buffer).unwrap(); + let seq_len = u32::from_le_bytes(len_buffer) as usize; + + let mut subsets = vec![0u32; subset_count]; + for subset in &mut subsets { + reader.read_exact(&mut len_buffer).unwrap(); + *subset = u32::from_le_bytes(len_buffer); + } + + let mut seq = vec![0u8; seq_len]; + reader.read_exact(&mut seq).unwrap(); + + Some((subsets, seq)) +} + +fn make_output_writer( + output_file: impl AsRef, + output_compression_level: u32, +) -> BufWriter> { + let output_file = output_file.as_ref(); + let file = File::create(output_file).unwrap(); + + match output_file.extension().and_then(|e| e.to_str()) { + Some("zst") | Some("zstd") => { + let level = output_compression_level.min(22) as i32; + let encoder = zstd::stream::write::Encoder::new(file, level) + .unwrap() + .auto_finish(); + BufWriter::with_capacity(8 * 1024 * 1024, Box::new(encoder)) + } + _ => BufWriter::with_capacity(8 * 1024 * 1024, Box::new(file)), + } +} + +fn flush_sorted_chunk( + records: &mut Vec, + chunk_files: &mut Vec, + chunk_dir: &Path, +) { + if records.is_empty() { + return; + } + + records.par_sort_unstable_by(|left, right| { + left.key + .cmp(&right.key) + .then_with(|| left.seq.cmp(&right.seq)) + }); + + let chunk_path = chunk_dir.join(format!("chunk_{:08}.bin", chunk_files.len())); + let mut chunk_writer = + BufWriter::with_capacity(8 * 1024 * 1024, File::create(&chunk_path).unwrap()); + for record in records.iter() { + write_record(&mut chunk_writer, record); + } + chunk_writer.flush().unwrap(); + chunk_files.push(chunk_path); + records.clear(); +} + +fn convert_color_records_to_bitset_fasta( + records_file: impl AsRef, + colormap_file: impl AsRef, + output_file: impl AsRef, + temp_dir: impl AsRef, + keep_temp_files: bool, + threads_count: usize, + output_compression_level: u32, +) { + const CHUNK_TARGET_BYTES: usize = 128 * 1024 * 1024; + + let _ = rayon::ThreadPoolBuilder::new() + .num_threads(threads_count) + .build_global(); + + let mut colors_deserializer = + ColorsDeserializer::::new(colormap_file, true).unwrap(); + let colors_count = colors_deserializer.colors_count(); + if colors_count == 0 { + panic!("Input graph has no colors"); + } + + let temp_chunks_dir = temp_dir.as_ref().join(format!( + "dump_colored_fasta_{}_{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + )); + fs::create_dir_all(&temp_chunks_dir).unwrap(); + + let mut subset_cache: HashMap> = HashMap::default(); + let mut bitset = vec![b'0'; colors_count]; + let mut records = Vec::new(); + let mut chunk_files = Vec::new(); + let mut current_chunk_bytes = 0usize; + let mut total_sequences = 0u64; + let mut skipped_sequences = 0u64; + + let mut reader = + BufReader::with_capacity(8 * 1024 * 1024, File::open(records_file.as_ref()).unwrap()); + while let Some((subsets, seq)) = read_color_record(&mut reader) { + total_sequences += 1; + bitset.fill(b'0'); + + if subsets.is_empty() { + if colors_count == 1 { + bitset[0] = b'1'; + } else { + skipped_sequences += 1; + continue; + } + } else { + for subset in subsets { + let subset_colors = subset_cache.entry(subset).or_insert_with(|| { + let mut colors = Vec::new(); + colors_deserializer.get_color_mappings(subset, &mut colors); + colors + }); + for color in subset_colors.iter().copied() { + bitset[color as usize] = b'1'; + } + } + } + + let record = SortedRecord { + key: bitset.clone(), + seq, + }; + current_chunk_bytes += record.key.len() + record.seq.len(); + records.push(record); + + if current_chunk_bytes >= CHUNK_TARGET_BYTES { + flush_sorted_chunk(&mut records, &mut chunk_files, &temp_chunks_dir); + current_chunk_bytes = 0; + } + } + + flush_sorted_chunk(&mut records, &mut chunk_files, &temp_chunks_dir); + + let mut output_writer = make_output_writer(output_file.as_ref(), output_compression_level); + + let mut chunk_readers: Vec<_> = chunk_files + .iter() + .map(|path| BufReader::with_capacity(8 * 1024 * 1024, File::open(path).unwrap())) + .collect(); + + let mut heap = BinaryHeap::new(); + for (chunk_index, reader) in chunk_readers.iter_mut().enumerate() { + if let Some(record) = read_record(reader) { + heap.push(MergeHeapItem { + record, + chunk_index, + }); + } + } + + let mut output_index = 0u64; + while let Some(item) = heap.pop() { + write!(output_writer, ">{} BS:Z:", output_index).unwrap(); + output_writer.write_all(&item.record.key).unwrap(); + output_writer.write_all(b"\n").unwrap(); + output_writer.write_all(&item.record.seq).unwrap(); + output_writer.write_all(b"\n").unwrap(); + output_index += 1; + + if let Some(record) = read_record(&mut chunk_readers[item.chunk_index]) { + heap.push(MergeHeapItem { + record, + chunk_index: item.chunk_index, + }); + } + } + output_writer.flush().unwrap(); + + if total_sequences == 0 { + panic!("Input graph contains no sequences"); + } + if output_index == 0 { + panic!("No colored records were produced"); + } + if skipped_sequences > 0 { + println!( + "Warning: skipped {} sequence(s) without color annotations", + skipped_sequences + ); + } + + if !keep_temp_files { + for chunk in chunk_files { + fs::remove_file(chunk).unwrap(); + } + fs::remove_dir(temp_chunks_dir).unwrap(); + } else { + println!("Temporary chunks kept at: {}", temp_chunks_dir.display()); + } +} + +fn run_dump_colored_fasta_from_args(args: DumpColoredFastaArgs) -> PathBuf { + const CHUNK_TARGET_BYTES: usize = 128 * 1024 * 1024; + let _ = rayon::ThreadPoolBuilder::new() + .num_threads(args.threads_count) + .build_global(); + + let colors_file = args.input_graph.with_extension("colors.dat"); + let mut colors_deserializer = + ColorsDeserializer::::new(colors_file, true).unwrap(); + let colors_count = colors_deserializer.colors_count(); + if colors_count == 0 { + panic!("Input graph has no colors"); + } + + let temp_chunks_dir = args.temp_dir.join(format!( + "dump_colored_fasta_{}_{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() + )); + fs::create_dir_all(&temp_chunks_dir).unwrap(); + + let mut subset_cache: HashMap> = HashMap::default(); + let mut subset_ids = Vec::new(); + let mut bitset = vec![b'0'; colors_count]; + + let mut records = Vec::new(); + let mut chunk_files = Vec::new(); + let mut current_chunk_bytes = 0usize; + let mut total_sequences = 0u64; + let mut skipped_sequences = 0u64; + + FastaFileSequencesStream::new().read_block( + &(args.input_graph.clone(), None), + true, + None, + |seq, _| { + total_sequences += 1; + parse_color_subsets_from_ident(seq.ident_data, &mut subset_ids); + if subset_ids.is_empty() { + skipped_sequences += 1; + return; + } + + bitset.fill(b'0'); + for subset in subset_ids.iter().copied() { + let subset_colors = subset_cache.entry(subset).or_insert_with(|| { + let mut colors = Vec::new(); + colors_deserializer.get_color_mappings(subset, &mut colors); + colors + }); + for color in subset_colors.iter().copied() { + bitset[color as usize] = b'1'; + } + } + + let record = SortedRecord { + key: bitset.clone(), + seq: seq.seq.to_vec(), + }; + current_chunk_bytes += record.key.len() + record.seq.len(); + records.push(record); + + if current_chunk_bytes >= CHUNK_TARGET_BYTES { + flush_sorted_chunk(&mut records, &mut chunk_files, &temp_chunks_dir); + current_chunk_bytes = 0; + } + }, + ); + + flush_sorted_chunk(&mut records, &mut chunk_files, &temp_chunks_dir); + + let mut output_writer = make_output_writer(&args.output_file, args.output_compression_level); + + let mut chunk_readers: Vec<_> = chunk_files + .iter() + .map(|path| BufReader::with_capacity(8 * 1024 * 1024, File::open(path).unwrap())) + .collect(); + + let mut heap = BinaryHeap::new(); + for (chunk_index, reader) in chunk_readers.iter_mut().enumerate() { + if let Some(record) = read_record(reader) { + heap.push(MergeHeapItem { + record, + chunk_index, + }); + } + } + + let mut output_index = 0u64; + while let Some(item) = heap.pop() { + write!(output_writer, ">{} BS:Z:", output_index).unwrap(); + output_writer.write_all(&item.record.key).unwrap(); + output_writer.write_all(b"\n").unwrap(); + output_writer.write_all(&item.record.seq).unwrap(); + output_writer.write_all(b"\n").unwrap(); + output_index += 1; + + if let Some(record) = read_record(&mut chunk_readers[item.chunk_index]) { + heap.push(MergeHeapItem { + record, + chunk_index: item.chunk_index, + }); + } + } + output_writer.flush().unwrap(); + + if total_sequences == 0 { + panic!("Input graph contains no sequences"); + } + if output_index == 0 { + panic!( + "Input graph does not contain color annotations in FASTA headers (no C:: tags found)" + ); + } + if skipped_sequences > 0 { + println!( + "Warning: skipped {} sequence(s) without color annotations", + skipped_sequences + ); + } + + if !args.keep_temp_files { + for chunk in chunk_files { + fs::remove_file(chunk).unwrap(); + } + fs::remove_dir(temp_chunks_dir).unwrap(); + } else { + println!("Temporary chunks kept at: {}", temp_chunks_dir.display()); + } + + args.output_file +} + instrumenter::global_setup_instrumenter!(); fn main() { @@ -553,6 +1185,16 @@ fn main() { run_assembler_from_args(&instance, args); } + CliArgs::BuildColoredFasta(args) => { + let _guard = instrumenter::initialize_tracing( + args.output_file.with_extension("tracing.json"), + &["ix86arch::INSTRUCTION_RETIRED", "ix86arch::LLC_MISSES"], + ); + + let instance = initialize(&args.common_args, &args.output_file); + + run_build_colored_fasta_from_args(&instance, args); + } CliArgs::Matches(args) => { let colors_file = args.input_file.with_extension("colors.dat"); let mut colors_deserializer = @@ -590,6 +1232,11 @@ fn main() { let output_file_name = run_querier_from_args(&instance, args); println!("Final output saved to: {}", output_file_name.display()); } + CliArgs::DumpColoredFasta(args) => { + let output_file_name = run_dump_colored_fasta_from_args(args); + println!("Final output saved to: {}", output_file_name.display()); + return; // Skip final memory deallocation + } CliArgs::DumpColors(args) => { let output_file_name = args.output_file.with_extension("jsonl"); diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index e9ec9406..ee88ae36 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -138,6 +138,7 @@ impl SwapPriority { pub static KEEP_FILES: AtomicBool = AtomicBool::new(false); pub static INTERMEDIATE_COMPRESSION_LEVEL_SLOW: AtomicU32 = AtomicU32::new(3); pub static INTERMEDIATE_COMPRESSION_LEVEL_FAST: AtomicU32 = AtomicU32::new(0); +pub static OUTPUT_COMPRESSION_LEVEL: AtomicU32 = AtomicU32::new(2); pub static PREFER_MEMORY: AtomicBool = AtomicBool::new(false); pub fn get_memory_mode(swap_priority: usize) -> MemoryFileMode { diff --git a/crates/io/Cargo.toml b/crates/io/Cargo.toml index 51bbac84..25e13f2e 100644 --- a/crates/io/Cargo.toml +++ b/crates/io/Cargo.toml @@ -26,6 +26,9 @@ hashes = { package = "ggcat_hashes", path = "../hashes" } parking_lot = "0.12.3" byteorder = "1.5.0" lz4 = "1.25.0" +bzip2 = "0.4.4" +xz2 = "0.1.7" +zstd = "0.13.2" flate2 = "1.0.30" bstr = "1.9.1" ggcat-logging = { version = "2.0.0", path = "../logging" } diff --git a/crates/io/src/concurrent/structured_sequences.rs b/crates/io/src/concurrent/structured_sequences.rs index c3b72e26..c6d0cced 100644 --- a/crates/io/src/concurrent/structured_sequences.rs +++ b/crates/io/src/concurrent/structured_sequences.rs @@ -17,6 +17,7 @@ use { }; pub mod binary; +pub mod color_records; pub mod concurrent; pub mod fasta; pub mod gfa; @@ -120,6 +121,10 @@ pub trait StructuredSequenceBackendInit: Sync + Send + Sized { unimplemented!() } + fn new_compressed_zstd(_path: impl AsRef, _level: u32) -> Self { + unimplemented!() + } + fn new_plain(_path: impl AsRef) -> Self { unimplemented!() } diff --git a/crates/io/src/concurrent/structured_sequences/color_records.rs b/crates/io/src/concurrent/structured_sequences/color_records.rs new file mode 100644 index 00000000..40d6be95 --- /dev/null +++ b/crates/io/src/concurrent/structured_sequences/color_records.rs @@ -0,0 +1,155 @@ +use crate::concurrent::structured_sequences::{ + IdentSequenceWriter, StructuredSequenceBackend, StructuredSequenceBackendInit, + StructuredSequenceBackendWrapper, +}; +use crate::concurrent::temp_reads::extra_data::{SequenceExtraData, SequenceExtraDataConsecutiveCompression}; +use config::{ColorIndexType, DEFAULT_OUTPUT_BUFFER_SIZE, DEFAULT_PER_CPU_BUFFER_SIZE}; +use dynamic_dispatch::dynamic_dispatch; +use std::fs::File; +use std::io::{BufWriter, Write}; +use std::marker::PhantomData; +use std::path::{Path, PathBuf}; + +pub struct ColorRecordsWriterWrapper; + +#[dynamic_dispatch] +impl StructuredSequenceBackendWrapper for ColorRecordsWriterWrapper { + type Backend< + ColorInfo: IdentSequenceWriter + SequenceExtraDataConsecutiveCompression, + LinksInfo: IdentSequenceWriter + SequenceExtraData, + > = ColorRecordsWriter; +} + +pub struct ColorRecordsWriter { + writer: BufWriter, + path: PathBuf, + _phantom: PhantomData<(ColorInfo, LinksInfo)>, +} + +pub struct ColorRecordsTempBuffer { + data: Vec, + ident: Vec, + subsets: Vec, +} + +fn parse_hex_u32(bytes: &[u8]) -> Option { + if bytes.is_empty() { + return None; + } + + let mut value = 0u32; + for b in bytes { + let nibble = match *b { + b'0'..=b'9' => b - b'0', + b'a'..=b'f' => 10 + (b - b'a'), + b'A'..=b'F' => 10 + (b - b'A'), + _ => return None, + }; + value = value.checked_mul(16)?.checked_add(nibble as u32)?; + } + Some(value) +} + +fn parse_color_subsets_from_ident(ident: &[u8], out: &mut Vec) { + out.clear(); + + for token in ident.split(|b| *b == b' ') { + if token.len() < 4 || token[0] != b'C' || token[1] != b':' { + continue; + } + + let Some(color_sep) = token[2..].iter().position(|b| *b == b':') else { + continue; + }; + let color_hex = &token[2..(2 + color_sep)]; + if let Some(color_subset) = parse_hex_u32(color_hex) { + out.push(color_subset as ColorIndexType); + } + } + + out.sort_unstable(); + out.dedup(); +} + +impl StructuredSequenceBackendInit + for ColorRecordsWriter +{ + fn new_compressed_gzip(path: impl AsRef, _level: u32) -> Self { + Self::new_plain(path) + } + + fn new_compressed_lz4(path: impl AsRef, _level: u32) -> Self { + Self::new_plain(path) + } + + fn new_compressed_zstd(path: impl AsRef, _level: u32) -> Self { + Self::new_plain(path) + } + + fn new_plain(path: impl AsRef) -> Self { + Self { + writer: BufWriter::with_capacity( + DEFAULT_OUTPUT_BUFFER_SIZE, + File::create(&path).unwrap(), + ), + path: path.as_ref().to_path_buf(), + _phantom: PhantomData, + } + } +} + +impl + StructuredSequenceBackend for ColorRecordsWriter +{ + type SequenceTempBuffer = ColorRecordsTempBuffer; + + fn alloc_temp_buffer(_: usize) -> Self::SequenceTempBuffer { + Self::SequenceTempBuffer { + data: Vec::with_capacity(DEFAULT_PER_CPU_BUFFER_SIZE.as_bytes()), + ident: Vec::with_capacity(256), + subsets: Vec::with_capacity(8), + } + } + + fn write_sequence( + _k: usize, + buffer: &mut Self::SequenceTempBuffer, + _sequence_index: u64, + sequence: &[u8], + + color_info: ColorInfo, + _links_info: LinksInfo, + extra_buffers: &(ColorInfo::TempBuffer, LinksInfo::TempBuffer), + + #[cfg(feature = "support_kmer_counters")] _abundance: super::SequenceAbundance, + ) { + buffer.ident.clear(); + color_info.write_as_ident(&mut buffer.ident, &extra_buffers.0); + parse_color_subsets_from_ident(&buffer.ident, &mut buffer.subsets); + + let subset_count = buffer.subsets.len() as u32; + let seq_len = sequence.len() as u32; + + buffer.data.extend_from_slice(&subset_count.to_le_bytes()); + buffer.data.extend_from_slice(&seq_len.to_le_bytes()); + for subset in buffer.subsets.iter().copied() { + buffer.data.extend_from_slice(&subset.to_le_bytes()); + } + buffer.data.extend_from_slice(sequence); + } + + fn get_path(&self) -> PathBuf { + self.path.clone() + } + + fn flush_temp_buffer(&mut self, buffer: &mut Self::SequenceTempBuffer) { + self.writer.write_all(&buffer.data).unwrap(); + buffer.data.clear(); + buffer.ident.clear(); + buffer.subsets.clear(); + } + + fn finalize(mut self) { + self.writer.flush().unwrap(); + } +} diff --git a/crates/io/src/concurrent/structured_sequences/fasta.rs b/crates/io/src/concurrent/structured_sequences/fasta.rs index b7c87750..fb7a2096 100644 --- a/crates/io/src/concurrent/structured_sequences/fasta.rs +++ b/crates/io/src/concurrent/structured_sequences/fasta.rs @@ -85,6 +85,23 @@ impl StructuredS } } + fn new_compressed_zstd(path: impl AsRef, level: u32) -> Self { + let compress_stream = zstd::stream::write::Encoder::new( + BufWriter::with_capacity(DEFAULT_OUTPUT_BUFFER_SIZE, File::create(&path).unwrap()), + level as i32, + ) + .unwrap(); + + FastaWriter { + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( + DEFAULT_OUTPUT_BUFFER_SIZE, + compress_stream, + ))), + path: path.as_ref().to_path_buf(), + _phantom: PhantomData, + } + } + fn new_plain(path: impl AsRef) -> Self { FastaWriter { writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( diff --git a/crates/io/src/concurrent/structured_sequences/gfa.rs b/crates/io/src/concurrent/structured_sequences/gfa.rs index e32b32f3..87a59d48 100644 --- a/crates/io/src/concurrent/structured_sequences/gfa.rs +++ b/crates/io/src/concurrent/structured_sequences/gfa.rs @@ -98,6 +98,23 @@ impl, level: u32) -> Self { + let compress_stream = zstd::stream::write::Encoder::new( + BufWriter::with_capacity(DEFAULT_OUTPUT_BUFFER_SIZE, File::create(&path).unwrap()), + level as i32, + ) + .unwrap(); + + GFAWriter { + writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( + DEFAULT_OUTPUT_BUFFER_SIZE, + compress_stream, + ))), + path: path.as_ref().to_path_buf(), + _phantom: PhantomData, + } + } + fn new_plain(path: impl AsRef) -> Self { GFAWriter { writer: Box::new(SequencesWriterWrapper::new(BufWriter::with_capacity( diff --git a/crates/io/src/concurrent/structured_sequences/stream_finish.rs b/crates/io/src/concurrent/structured_sequences/stream_finish.rs index 219f5f10..a7babdba 100644 --- a/crates/io/src/concurrent/structured_sequences/stream_finish.rs +++ b/crates/io/src/concurrent/structured_sequences/stream_finish.rs @@ -1,16 +1,18 @@ use flate2::write::GzEncoder; use std::{ - fmt::Debug, fs::File, io::{BufWriter, Write}, }; -pub(crate) trait SequencesFileFinish: Write + Debug { +pub(crate) trait SequencesFileFinish: Write { fn finalize(self); } impl SequencesFileFinish for BufWriter { fn finalize(self) { - self.into_inner().unwrap().finalize(); + match self.into_inner() { + Ok(writer) => writer.finalize(), + Err(_) => panic!("Cannot finalize buffered writer"), + } } } impl SequencesFileFinish for File { @@ -32,6 +34,13 @@ impl SequencesFileFinish for GzEncoder { } } +impl<'a, W: SequencesFileFinish> SequencesFileFinish for zstd::stream::write::Encoder<'a, W> { + fn finalize(self) { + let w = self.finish().unwrap(); + w.finalize(); + } +} + pub(crate) struct SequencesWriterWrapper { writer: Option, } diff --git a/crates/io/src/lines_reader.rs b/crates/io/src/lines_reader.rs index 86a34cf7..8e53aa11 100644 --- a/crates/io/src/lines_reader.rs +++ b/crates/io/src/lines_reader.rs @@ -88,6 +88,45 @@ impl LinesReader { path.as_ref().display() ); }); + } else if path.as_ref().extension().filter(|x| *x == "bz2").is_some() { + let file = bzip2::read::BzDecoder::new( + File::open(&path).expect(&format!("Cannot open file {}", path.as_ref().display())), + ); + self.read_stream_buffered(file, callback) + .unwrap_or_else(|_| { + ggcat_logging::error!( + "WARNING: Error while reading file {}", + path.as_ref().display() + ); + }); + } else if path.as_ref().extension().filter(|x| *x == "xz").is_some() { + let file = xz2::read::XzDecoder::new( + File::open(&path).expect(&format!("Cannot open file {}", path.as_ref().display())), + ); + self.read_stream_buffered(file, callback) + .unwrap_or_else(|_| { + ggcat_logging::error!( + "WARNING: Error while reading file {}", + path.as_ref().display() + ); + }); + } else if path + .as_ref() + .extension() + .filter(|x| *x == "zst" || *x == "zstd") + .is_some() + { + let file = zstd::stream::read::Decoder::new( + File::open(&path).expect(&format!("Cannot open file {}", path.as_ref().display())), + ) + .unwrap(); + self.read_stream_buffered(file, callback) + .unwrap_or_else(|_| { + ggcat_logging::error!( + "WARNING: Error while reading file {}", + path.as_ref().display() + ); + }); } else { let file = File::open(&path).expect(&format!("Cannot open file {}", path.as_ref().display())); diff --git a/crates/io/src/sequences_stream/fasta.rs b/crates/io/src/sequences_stream/fasta.rs index 20856c31..0cf5ff06 100644 --- a/crates/io/src/sequences_stream/fasta.rs +++ b/crates/io/src/sequences_stream/fasta.rs @@ -19,7 +19,7 @@ impl FastaFileSequencesStream { let file_bases_count = if file .extension() - .map(|x| x == "gz" || x == "lz4") + .map(|x| x == "gz" || x == "lz4" || x == "bz2" || x == "xz" || x == "zst" || x == "zstd") .unwrap_or(false) { (length as f64 * COMPRESSED_READS_RATIO) as u64 diff --git a/crates/querier/Cargo.toml b/crates/querier/Cargo.toml index bdecc071..3a7ff12d 100644 --- a/crates/querier/Cargo.toml +++ b/crates/querier/Cargo.toml @@ -33,6 +33,7 @@ csv = "1.3.0" parking_lot = "0.12.3" lz4 = "1.25.0" flate2 = "1.0.30" +zstd = "0.13.2" ggcat-logging = { version = "2.0.0", path = "../logging" } anyhow = "1.0.89" typenum = "1.18.0" diff --git a/crates/querier/src/pipeline/colored_query_output.rs b/crates/querier/src/pipeline/colored_query_output.rs index 150ee439..8c4f5d55 100644 --- a/crates/querier/src/pipeline/colored_query_output.rs +++ b/crates/querier/src/pipeline/colored_query_output.rs @@ -3,8 +3,8 @@ use crate::structs::query_colored_counters::{ColorsRange, QueryColoredCountersSe use colors::colors_manager::ColorMapReader; use colors::colors_manager::{ColorsManager, ColorsMergeManager}; use config::{ - ColorIndexType, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES, QUERIES_COUNT_MIN_BATCH, SwapPriority, - get_compression_level_info, get_memory_mode, + ColorIndexType, DEFAULT_PREFETCH_AMOUNT, KEEP_FILES, OUTPUT_COMPRESSION_LEVEL, + QUERIES_COUNT_MIN_BATCH, SwapPriority, get_compression_level_info, get_memory_mode, }; use flate2::Compression; use ggcat_logging::UnrecoverableErrorLogging; @@ -32,6 +32,7 @@ enum QueryOutputFileWriter { Plain(File), LZ4Compressed(lz4::Encoder), GzipCompressed(flate2::write::GzEncoder), + ZstdCompressed(zstd::stream::write::Encoder<'static, File>), } impl Write for QueryOutputFileWriter { @@ -40,6 +41,7 @@ impl Write for QueryOutputFileWriter { QueryOutputFileWriter::Plain(w) => w.write(buf), QueryOutputFileWriter::LZ4Compressed(w) => w.write(buf), QueryOutputFileWriter::GzipCompressed(w) => w.write(buf), + QueryOutputFileWriter::ZstdCompressed(w) => w.write(buf), } } @@ -48,6 +50,7 @@ impl Write for QueryOutputFileWriter { QueryOutputFileWriter::Plain(w) => w.flush(), QueryOutputFileWriter::LZ4Compressed(w) => w.flush(), QueryOutputFileWriter::GzipCompressed(w) => w.flush(), + QueryOutputFileWriter::ZstdCompressed(w) => w.flush(), } } } @@ -85,17 +88,29 @@ pub fn colored_query_output( let query_output_file = File::create(&output_file) .log_unrecoverable_error_with_data("Cannot create output file", output_file.display())?; + let output_compression_level = OUTPUT_COMPRESSION_LEVEL.load(Ordering::Relaxed); + let query_output = Mutex::new(( BufWriter::new( match output_file.extension().map(|e| e.to_str()).flatten() { Some("lz4") => QueryOutputFileWriter::LZ4Compressed( lz4::EncoderBuilder::new() - .level(4) + .level(output_compression_level.min(16)) .build(query_output_file) .unwrap(), ), Some("gz") => QueryOutputFileWriter::GzipCompressed( - flate2::GzBuilder::new().write(query_output_file, Compression::default()), + flate2::GzBuilder::new().write( + query_output_file, + Compression::new(output_compression_level.min(9)), + ), + ), + Some("zst") | Some("zstd") => QueryOutputFileWriter::ZstdCompressed( + zstd::stream::write::Encoder::new( + query_output_file, + output_compression_level.min(22) as i32, + ) + .unwrap(), ), _ => QueryOutputFileWriter::Plain(query_output_file), },