From df2c92517c4da4495e9d8138d9a49cba5f8406c7 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Thu, 22 Aug 2024 16:36:34 +1000 Subject: [PATCH 1/8] util/alignment/async: add async alignment writer --- noodles-util/src/alignment/async/io.rs | 2 + noodles-util/src/alignment/async/io/writer.rs | 83 ++++++++ .../src/alignment/async/io/writer/builder.rs | 183 ++++++++++++++++++ .../src/alignment/io/writer/builder.rs | 4 +- 4 files changed, 270 insertions(+), 2 deletions(-) create mode 100644 noodles-util/src/alignment/async/io/writer.rs create mode 100644 noodles-util/src/alignment/async/io/writer/builder.rs diff --git a/noodles-util/src/alignment/async/io.rs b/noodles-util/src/alignment/async/io.rs index 17b3f262f..311b39c1e 100644 --- a/noodles-util/src/alignment/async/io.rs +++ b/noodles-util/src/alignment/async/io.rs @@ -1,5 +1,7 @@ //! Async alignment format I/O. pub mod reader; +pub mod writer; pub use self::reader::Reader; +pub use self::writer::Writer; diff --git a/noodles-util/src/alignment/async/io/writer.rs b/noodles-util/src/alignment/async/io/writer.rs new file mode 100644 index 000000000..84dfeed5f --- /dev/null +++ b/noodles-util/src/alignment/async/io/writer.rs @@ -0,0 +1,83 @@ +//! Async alignment writer. + +mod builder; + +use noodles_bam as bam; +use noodles_cram as cram; +use noodles_sam as sam; +use tokio::io::{self, AsyncWrite}; + +pub use self::builder::Builder; + +/// An async alignment writer. +pub enum Writer { + /// SAM. + Sam(sam::r#async::io::Writer), + /// BAM. + Bam(bam::r#async::io::Writer), + /// CRAM. + Cram(cram::r#async::io::Writer), +} + +impl Writer +where + W: AsyncWrite + Unpin, +{ + /// Writes a SAM header. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_sam as sam; + /// use noodles_util::alignment::r#async::io::writer::Builder; + /// use tokio::io; + /// use tokio::io::AsyncWriteExt; + /// + /// let mut writer = Builder::default().build_from_writer(io::sink()); + /// + /// let header = sam::Header::default(); + /// writer.write_header(&header).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn write_header(&mut self, header: &sam::Header) -> io::Result<()> { + match self { + Self::Sam(writer) => writer.write_header(header).await, + Self::Bam(writer) => writer.write_header(header).await, + Self::Cram(writer) => writer.write_file_header(header).await, + } + } + + /// Writes an alignment record. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_sam as sam; + /// use noodles_util::alignment::r#async::io::writer::Builder; + /// use tokio::io; + /// use tokio::io::AsyncWriteExt; + /// + /// let mut writer = Builder::default().build_from_writer(io::sink()); + /// + /// let record = sam::Record::default(); + /// writer.write_record(&record).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn write_record( + &mut self, + header: &sam::Header, + record: &dyn sam::alignment::Record, + ) -> io::Result<()> { + match self { + Self::Sam(writer) => writer.write_alignment_record(header, record).await, + Self::Bam(writer) => writer.write_alignment_record(header, record).await, + Self::Cram(writer) => writer.write_record(header, record).await, + } + } +} diff --git a/noodles-util/src/alignment/async/io/writer/builder.rs b/noodles-util/src/alignment/async/io/writer/builder.rs new file mode 100644 index 000000000..ff615c17a --- /dev/null +++ b/noodles-util/src/alignment/async/io/writer/builder.rs @@ -0,0 +1,183 @@ +use std::path::Path; + +use noodles_bam as bam; +use noodles_bgzf as bgzf; +use noodles_cram as cram; +use noodles_fasta as fasta; +use noodles_sam as sam; +use tokio::{ + fs::File, + io::{self, AsyncWrite}, +}; + +use super::Writer; +use crate::alignment::io::{CompressionMethod, Format}; + +/// An async alignment writer builder. +#[derive(Default)] +pub struct Builder { + compression_method: Option>, + format: Option, + reference_sequence_repository: fasta::Repository, +} + +impl Builder { + /// Sets the compression method. + /// + /// By default, the compression method is autodetected on build. This can be used to override + /// it. + /// + /// # Examples + /// + /// ``` + /// use noodles_util::alignment::{r#async::io::writer::Builder, io::CompressionMethod}; + /// let builder = Builder::default().set_compression_method(Some(CompressionMethod::Bgzf)); + /// ``` + pub fn set_compression_method(mut self, compression_method: Option) -> Self { + self.compression_method = Some(compression_method); + self + } + + /// Sets the format of the output. + /// + /// By default, the format is autodetected on build. This can be used to override it. + /// + /// # Examples + /// + /// ``` + /// use noodles_util::alignment::{r#async::io::writer::Builder, io::Format}; + /// let builder = Builder::default().set_format(Format::Sam); + /// ``` + pub fn set_format(mut self, format: Format) -> Self { + self.format = Some(format); + self + } + + /// Sets the reference sequence repository. + /// + /// # Examples + /// + /// ``` + /// use noodles_fasta as fasta; + /// use noodles_util::alignment::{r#async::io::writer::Builder, io::Format}; + /// let repository = fasta::Repository::default(); + /// let builder = Builder::default().set_reference_sequence_repository(repository); + /// ``` + pub fn set_reference_sequence_repository( + mut self, + reference_sequence_repository: fasta::Repository, + ) -> Self { + self.reference_sequence_repository = reference_sequence_repository; + self + } + + /// Builds an async alignment writer from a path. + /// + /// By default, the format and compression method will be detected from the path extension. + /// This can be overridden by using [`Self::set_format`] and [`Self::set_compression_method`]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_util::alignment::r#async::io::reader::Builder; + /// let reader = Builder::default().build_from_path("sample.bam").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn build_from_path

( + mut self, + src: P, + ) -> io::Result>> + where + P: AsRef, + { + use crate::alignment::io::writer::builder::{ + detect_compression_method_from_path_extension, detect_format_from_path_extension, + }; + + let src = src.as_ref(); + + if self.compression_method.is_none() { + self.compression_method = Some(detect_compression_method_from_path_extension(src)); + } + + if self.format.is_none() { + self.format = detect_format_from_path_extension(src); + } + + File::create(src) + .await + .map(|file| self.build_from_writer(file))? + } + + /// Builds an async alignment writer from a writer. + /// + /// If the format is not set, a default format is used. If the compression method is not set, a + /// default one is determined by the format. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_util::alignment::r#async::io::writer::Builder; + /// use tokio::io; + /// let reader = Builder::default().build_from_writer(io::sink()).await?; + /// # Ok(()) + /// # } + /// ``` + pub fn build_from_writer(self, writer: W) -> io::Result>> + where + W: AsyncWrite + Unpin + 'static, + { + use bam::r#async::io::Writer as AsyncBamWriter; + use sam::r#async::io::Writer as AsyncSamWriter; + + let format = self.format.unwrap_or(Format::Sam); + + let compression_method = match self.compression_method { + Some(compression_method) => compression_method, + None => match format { + Format::Sam | Format::Cram => None, + Format::Bam => Some(CompressionMethod::Bgzf), + }, + }; + + let writer = match (format, compression_method) { + (Format::Sam, None) => { + let inner: Box = Box::new(writer); + Writer::Sam(AsyncSamWriter::new(inner)) + } + (Format::Sam, Some(CompressionMethod::Bgzf)) => { + let encoder: Box = Box::new(bgzf::AsyncWriter::new(writer)); + Writer::Sam(AsyncSamWriter::new(encoder)) + } + (Format::Bam, None) => { + let inner: Box = Box::new(writer); + Writer::Bam(AsyncBamWriter::from(inner)) + } + (Format::Bam, Some(CompressionMethod::Bgzf)) => { + let encoder: Box = Box::new(bgzf::AsyncWriter::new(writer)); + Writer::Bam(AsyncBamWriter::from(encoder)) + } + (Format::Cram, None) => { + let inner: Box = Box::new(writer); + let inner = cram::r#async::io::writer::Builder::default() + .set_reference_sequence_repository(self.reference_sequence_repository) + .build_with_writer(inner); + Writer::Cram(inner) + } + (Format::Cram, Some(CompressionMethod::Bgzf)) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "CRAM cannot be compressed with BGZF", + )); + } + + }; + + Ok(writer) + } +} diff --git a/noodles-util/src/alignment/io/writer/builder.rs b/noodles-util/src/alignment/io/writer/builder.rs index 0a3063eee..314f1aba0 100644 --- a/noodles-util/src/alignment/io/writer/builder.rs +++ b/noodles-util/src/alignment/io/writer/builder.rs @@ -186,7 +186,7 @@ impl Builder { } } -fn detect_compression_method_from_path_extension

(path: P) -> Option +pub(crate) fn detect_compression_method_from_path_extension

(path: P) -> Option where P: AsRef, { @@ -196,7 +196,7 @@ where } } -fn detect_format_from_path_extension

(path: P) -> Option +pub(crate) fn detect_format_from_path_extension

(path: P) -> Option where P: AsRef, { From 89368c639398f750925f6be89fa9b0376052f4ae Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Thu, 22 Aug 2024 16:38:08 +1000 Subject: [PATCH 2/8] util/alignment/async: fmt --- noodles-util/src/alignment/async/io/writer/builder.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/noodles-util/src/alignment/async/io/writer/builder.rs b/noodles-util/src/alignment/async/io/writer/builder.rs index ff615c17a..87243d946 100644 --- a/noodles-util/src/alignment/async/io/writer/builder.rs +++ b/noodles-util/src/alignment/async/io/writer/builder.rs @@ -175,7 +175,6 @@ impl Builder { "CRAM cannot be compressed with BGZF", )); } - }; Ok(writer) From d43a9c3834eee99d0d85b9943947cd70e5535505 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Fri, 23 Aug 2024 10:12:35 +1000 Subject: [PATCH 3/8] util/alignment/async: explicitly convert to cram::Record --- noodles-util/src/alignment/async/io/writer.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/noodles-util/src/alignment/async/io/writer.rs b/noodles-util/src/alignment/async/io/writer.rs index 84dfeed5f..bf2e128c8 100644 --- a/noodles-util/src/alignment/async/io/writer.rs +++ b/noodles-util/src/alignment/async/io/writer.rs @@ -46,7 +46,10 @@ where match self { Self::Sam(writer) => writer.write_header(header).await, Self::Bam(writer) => writer.write_header(header).await, - Self::Cram(writer) => writer.write_file_header(header).await, + Self::Cram(writer) => { + writer.write_file_definition().await?; + writer.write_file_header(header).await + } } } @@ -77,7 +80,10 @@ where match self { Self::Sam(writer) => writer.write_alignment_record(header, record).await, Self::Bam(writer) => writer.write_alignment_record(header, record).await, - Self::Cram(writer) => writer.write_record(header, record).await, + Self::Cram(writer) => { + let record = cram::Record::try_from_alignment_record(header, record)?; + writer.write_record(header, record).await + } } } } From f0aa17eeeca72e588f3f6ae348676786de00496b Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Fri, 23 Aug 2024 10:38:13 +1000 Subject: [PATCH 4/8] util/alignment/async: add Writer::finish --- noodles-util/src/alignment/async/io/writer.rs | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/noodles-util/src/alignment/async/io/writer.rs b/noodles-util/src/alignment/async/io/writer.rs index bf2e128c8..8c5c70c3a 100644 --- a/noodles-util/src/alignment/async/io/writer.rs +++ b/noodles-util/src/alignment/async/io/writer.rs @@ -10,7 +10,7 @@ use tokio::io::{self, AsyncWrite}; pub use self::builder::Builder; /// An async alignment writer. -pub enum Writer { +pub enum Writer { /// SAM. Sam(sam::r#async::io::Writer), /// BAM. @@ -32,8 +32,7 @@ where /// # async fn main() -> tokio::io::Result<()> { /// use noodles_sam as sam; /// use noodles_util::alignment::r#async::io::writer::Builder; - /// use tokio::io; - /// use tokio::io::AsyncWriteExt; + /// use tokio::io::{self, AsyncWriteExt}; /// /// let mut writer = Builder::default().build_from_writer(io::sink()); /// @@ -62,8 +61,7 @@ where /// # async fn main() -> tokio::io::Result<()> { /// use noodles_sam as sam; /// use noodles_util::alignment::r#async::io::writer::Builder; - /// use tokio::io; - /// use tokio::io::AsyncWriteExt; + /// use tokio::io::{self, AsyncWriteExt}; /// /// let mut writer = Builder::default().build_from_writer(io::sink()); /// @@ -86,4 +84,32 @@ where } } } + + /// Shuts down the alignment format writer. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_sam as sam; + /// use noodles_util::alignment::{self, io::Format}; + /// use tokio::io; + /// + /// let mut writer = alignment::r#async::io::writer::Builder::default() + /// .set_format(Format::Sam) + /// .build_from_writer(io::sink()).await?; + /// + /// let header = sam::Header::default(); + /// writer.finish(&header).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn finish(&mut self, header: &sam::Header) -> io::Result<()> { + match self { + Self::Sam(_) => Ok(()), + Self::Bam(writer) => writer.shutdown().await, + Self::Cram(writer) => writer.shutdown(header).await, + } + } } From ee330e4e5a6c6a2f500b2eb6f11a1e9f13210ddf Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Fri, 23 Aug 2024 10:38:39 +1000 Subject: [PATCH 5/8] util/alignment/async: add Writer example --- .../examples/util_alignment_rewrite_async.rs | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 noodles-util/examples/util_alignment_rewrite_async.rs diff --git a/noodles-util/examples/util_alignment_rewrite_async.rs b/noodles-util/examples/util_alignment_rewrite_async.rs new file mode 100644 index 000000000..4edb15eac --- /dev/null +++ b/noodles-util/examples/util_alignment_rewrite_async.rs @@ -0,0 +1,48 @@ +//! Rewrites an alignment format to another alignment format asynchronously. +//! +//! The output format is determined from the extension of the destination. + +use std::env; + +use futures::TryStreamExt; +use noodles_fasta::{self as fasta, repository::adapters::IndexedReader}; +use noodles_util::alignment; +use tokio::io; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut args = env::args().skip(1); + + let src = args.next().expect("missing src"); + let dst = args.next().expect("missing dst"); + let fasta_src = args.next(); + + let repository = fasta_src + .map(|src| fasta::io::indexed_reader::Builder::default().build_from_path(src)) + .transpose()? + .map(IndexedReader::new) + .map(fasta::Repository::new) + .unwrap_or_default(); + + let mut reader = alignment::r#async::io::reader::Builder::default() + .set_reference_sequence_repository(repository.clone()) + .build_from_path(src) + .await?; + + let header = reader.read_header().await?; + + let mut writer = alignment::r#async::io::writer::Builder::default() + .set_reference_sequence_repository(repository) + .build_from_path(dst) + .await?; + + writer.write_header(&header).await?; + + while let Some(record) = reader.records(&header).try_next().await? { + writer.write_record(&header, &record).await?; + } + + writer.finish(&header).await?; + + Ok(()) +} From b99bbfc4a93ef0c2c95460ffa9fae7a8b4e20f8f Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Fri, 23 Aug 2024 11:24:42 +1000 Subject: [PATCH 6/8] util/alignment/async: add missing async declaration --- noodles-util/src/alignment/async/io/writer.rs | 7 ++++--- noodles-util/src/alignment/async/io/writer/builder.rs | 7 ++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/noodles-util/src/alignment/async/io/writer.rs b/noodles-util/src/alignment/async/io/writer.rs index 8c5c70c3a..2928d3ea2 100644 --- a/noodles-util/src/alignment/async/io/writer.rs +++ b/noodles-util/src/alignment/async/io/writer.rs @@ -34,7 +34,7 @@ where /// use noodles_util::alignment::r#async::io::writer::Builder; /// use tokio::io::{self, AsyncWriteExt}; /// - /// let mut writer = Builder::default().build_from_writer(io::sink()); + /// let mut writer = Builder::default().build_from_writer(io::sink()).await?; /// /// let header = sam::Header::default(); /// writer.write_header(&header).await?; @@ -63,10 +63,11 @@ where /// use noodles_util::alignment::r#async::io::writer::Builder; /// use tokio::io::{self, AsyncWriteExt}; /// - /// let mut writer = Builder::default().build_from_writer(io::sink()); + /// let mut writer = Builder::default().build_from_writer(io::sink()).await?; /// + /// let header = sam::Header::default(); /// let record = sam::Record::default(); - /// writer.write_record(&record).await?; + /// writer.write_record(&header, &record).await?; /// # Ok(()) /// # } /// ``` diff --git a/noodles-util/src/alignment/async/io/writer/builder.rs b/noodles-util/src/alignment/async/io/writer/builder.rs index 87243d946..ef0f2aeb9 100644 --- a/noodles-util/src/alignment/async/io/writer/builder.rs +++ b/noodles-util/src/alignment/async/io/writer/builder.rs @@ -110,6 +110,7 @@ impl Builder { File::create(src) .await .map(|file| self.build_from_writer(file))? + .await } /// Builds an async alignment writer from a writer. @@ -124,11 +125,15 @@ impl Builder { /// # async fn main() -> tokio::io::Result<()> { /// use noodles_util::alignment::r#async::io::writer::Builder; /// use tokio::io; + /// /// let reader = Builder::default().build_from_writer(io::sink()).await?; /// # Ok(()) /// # } /// ``` - pub fn build_from_writer(self, writer: W) -> io::Result>> + pub async fn build_from_writer( + self, + writer: W, + ) -> io::Result>> where W: AsyncWrite + Unpin + 'static, { From b558481320f3b73957db9dfa37afbe79e39de124 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Mon, 26 Aug 2024 14:18:18 +1000 Subject: [PATCH 7/8] util/alignment/async: change finish to shutdown --- noodles-util/examples/util_alignment_rewrite_async.rs | 2 +- noodles-util/src/alignment/async/io/writer.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/noodles-util/examples/util_alignment_rewrite_async.rs b/noodles-util/examples/util_alignment_rewrite_async.rs index 4edb15eac..f86e04884 100644 --- a/noodles-util/examples/util_alignment_rewrite_async.rs +++ b/noodles-util/examples/util_alignment_rewrite_async.rs @@ -42,7 +42,7 @@ async fn main() -> io::Result<()> { writer.write_record(&header, &record).await?; } - writer.finish(&header).await?; + writer.shutdown(&header).await?; Ok(()) } diff --git a/noodles-util/src/alignment/async/io/writer.rs b/noodles-util/src/alignment/async/io/writer.rs index 2928d3ea2..c0c156ab0 100644 --- a/noodles-util/src/alignment/async/io/writer.rs +++ b/noodles-util/src/alignment/async/io/writer.rs @@ -102,11 +102,11 @@ where /// .build_from_writer(io::sink()).await?; /// /// let header = sam::Header::default(); - /// writer.finish(&header).await?; + /// writer.shutdown(&header).await?; /// # Ok(()) /// # } /// ``` - pub async fn finish(&mut self, header: &sam::Header) -> io::Result<()> { + pub async fn shutdown(&mut self, header: &sam::Header) -> io::Result<()> { match self { Self::Sam(_) => Ok(()), Self::Bam(writer) => writer.shutdown().await, From 2797182787499ea51d300f64e74a3fb1fff8a221 Mon Sep 17 00:00:00 2001 From: Michael Hall Date: Mon, 26 Aug 2024 14:28:57 +1000 Subject: [PATCH 8/8] util/alignment/async: tidy up imports --- noodles-util/src/alignment/async/io.rs | 3 +-- noodles-util/src/alignment/async/io/writer/builder.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/noodles-util/src/alignment/async/io.rs b/noodles-util/src/alignment/async/io.rs index 311b39c1e..641d42c3e 100644 --- a/noodles-util/src/alignment/async/io.rs +++ b/noodles-util/src/alignment/async/io.rs @@ -3,5 +3,4 @@ pub mod reader; pub mod writer; -pub use self::reader::Reader; -pub use self::writer::Writer; +pub use self::{reader::Reader, writer::Writer}; diff --git a/noodles-util/src/alignment/async/io/writer/builder.rs b/noodles-util/src/alignment/async/io/writer/builder.rs index ef0f2aeb9..278a75ca4 100644 --- a/noodles-util/src/alignment/async/io/writer/builder.rs +++ b/noodles-util/src/alignment/async/io/writer/builder.rs @@ -59,7 +59,7 @@ impl Builder { /// /// ``` /// use noodles_fasta as fasta; - /// use noodles_util::alignment::{r#async::io::writer::Builder, io::Format}; + /// use noodles_util::alignment::r#async::io::writer::Builder; /// let repository = fasta::Repository::default(); /// let builder = Builder::default().set_reference_sequence_repository(repository); /// ```