diff --git a/noodles-util/examples/util_alignment_rewrite_async.rs b/noodles-util/examples/util_alignment_rewrite_async.rs new file mode 100644 index 000000000..f86e04884 --- /dev/null +++ b/noodles-util/examples/util_alignment_rewrite_async.rs @@ -0,0 +1,48 @@ +//! Rewrites an alignment format to another alignment format asynchronously. +//! +//! The output format is determined from the extension of the destination. + +use std::env; + +use futures::TryStreamExt; +use noodles_fasta::{self as fasta, repository::adapters::IndexedReader}; +use noodles_util::alignment; +use tokio::io; + +#[tokio::main] +async fn main() -> io::Result<()> { + let mut args = env::args().skip(1); + + let src = args.next().expect("missing src"); + let dst = args.next().expect("missing dst"); + let fasta_src = args.next(); + + let repository = fasta_src + .map(|src| fasta::io::indexed_reader::Builder::default().build_from_path(src)) + .transpose()? + .map(IndexedReader::new) + .map(fasta::Repository::new) + .unwrap_or_default(); + + let mut reader = alignment::r#async::io::reader::Builder::default() + .set_reference_sequence_repository(repository.clone()) + .build_from_path(src) + .await?; + + let header = reader.read_header().await?; + + let mut writer = alignment::r#async::io::writer::Builder::default() + .set_reference_sequence_repository(repository) + .build_from_path(dst) + .await?; + + writer.write_header(&header).await?; + + while let Some(record) = reader.records(&header).try_next().await? { + writer.write_record(&header, &record).await?; + } + + writer.shutdown(&header).await?; + + Ok(()) +} diff --git a/noodles-util/src/alignment/async/io.rs b/noodles-util/src/alignment/async/io.rs index 17b3f262f..641d42c3e 100644 --- a/noodles-util/src/alignment/async/io.rs +++ b/noodles-util/src/alignment/async/io.rs @@ -1,5 +1,6 @@ //! Async alignment format I/O. pub mod reader; +pub mod writer; -pub use self::reader::Reader; +pub use self::{reader::Reader, writer::Writer}; diff --git a/noodles-util/src/alignment/async/io/writer.rs b/noodles-util/src/alignment/async/io/writer.rs new file mode 100644 index 000000000..c0c156ab0 --- /dev/null +++ b/noodles-util/src/alignment/async/io/writer.rs @@ -0,0 +1,116 @@ +//! Async alignment writer. + +mod builder; + +use noodles_bam as bam; +use noodles_cram as cram; +use noodles_sam as sam; +use tokio::io::{self, AsyncWrite}; + +pub use self::builder::Builder; + +/// An async alignment writer. +pub enum Writer { + /// SAM. + Sam(sam::r#async::io::Writer), + /// BAM. + Bam(bam::r#async::io::Writer), + /// CRAM. + Cram(cram::r#async::io::Writer), +} + +impl Writer +where + W: AsyncWrite + Unpin, +{ + /// Writes a SAM header. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_sam as sam; + /// use noodles_util::alignment::r#async::io::writer::Builder; + /// use tokio::io::{self, AsyncWriteExt}; + /// + /// let mut writer = Builder::default().build_from_writer(io::sink()).await?; + /// + /// let header = sam::Header::default(); + /// writer.write_header(&header).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn write_header(&mut self, header: &sam::Header) -> io::Result<()> { + match self { + Self::Sam(writer) => writer.write_header(header).await, + Self::Bam(writer) => writer.write_header(header).await, + Self::Cram(writer) => { + writer.write_file_definition().await?; + writer.write_file_header(header).await + } + } + } + + /// Writes an alignment record. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_sam as sam; + /// use noodles_util::alignment::r#async::io::writer::Builder; + /// use tokio::io::{self, AsyncWriteExt}; + /// + /// let mut writer = Builder::default().build_from_writer(io::sink()).await?; + /// + /// let header = sam::Header::default(); + /// let record = sam::Record::default(); + /// writer.write_record(&header, &record).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn write_record( + &mut self, + header: &sam::Header, + record: &dyn sam::alignment::Record, + ) -> io::Result<()> { + match self { + Self::Sam(writer) => writer.write_alignment_record(header, record).await, + Self::Bam(writer) => writer.write_alignment_record(header, record).await, + Self::Cram(writer) => { + let record = cram::Record::try_from_alignment_record(header, record)?; + writer.write_record(header, record).await + } + } + } + + /// Shuts down the alignment format writer. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_sam as sam; + /// use noodles_util::alignment::{self, io::Format}; + /// use tokio::io; + /// + /// let mut writer = alignment::r#async::io::writer::Builder::default() + /// .set_format(Format::Sam) + /// .build_from_writer(io::sink()).await?; + /// + /// let header = sam::Header::default(); + /// writer.shutdown(&header).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn shutdown(&mut self, header: &sam::Header) -> io::Result<()> { + match self { + Self::Sam(_) => Ok(()), + Self::Bam(writer) => writer.shutdown().await, + Self::Cram(writer) => writer.shutdown(header).await, + } + } +} diff --git a/noodles-util/src/alignment/async/io/writer/builder.rs b/noodles-util/src/alignment/async/io/writer/builder.rs new file mode 100644 index 000000000..278a75ca4 --- /dev/null +++ b/noodles-util/src/alignment/async/io/writer/builder.rs @@ -0,0 +1,187 @@ +use std::path::Path; + +use noodles_bam as bam; +use noodles_bgzf as bgzf; +use noodles_cram as cram; +use noodles_fasta as fasta; +use noodles_sam as sam; +use tokio::{ + fs::File, + io::{self, AsyncWrite}, +}; + +use super::Writer; +use crate::alignment::io::{CompressionMethod, Format}; + +/// An async alignment writer builder. +#[derive(Default)] +pub struct Builder { + compression_method: Option>, + format: Option, + reference_sequence_repository: fasta::Repository, +} + +impl Builder { + /// Sets the compression method. + /// + /// By default, the compression method is autodetected on build. This can be used to override + /// it. + /// + /// # Examples + /// + /// ``` + /// use noodles_util::alignment::{r#async::io::writer::Builder, io::CompressionMethod}; + /// let builder = Builder::default().set_compression_method(Some(CompressionMethod::Bgzf)); + /// ``` + pub fn set_compression_method(mut self, compression_method: Option) -> Self { + self.compression_method = Some(compression_method); + self + } + + /// Sets the format of the output. + /// + /// By default, the format is autodetected on build. This can be used to override it. + /// + /// # Examples + /// + /// ``` + /// use noodles_util::alignment::{r#async::io::writer::Builder, io::Format}; + /// let builder = Builder::default().set_format(Format::Sam); + /// ``` + pub fn set_format(mut self, format: Format) -> Self { + self.format = Some(format); + self + } + + /// Sets the reference sequence repository. + /// + /// # Examples + /// + /// ``` + /// use noodles_fasta as fasta; + /// use noodles_util::alignment::r#async::io::writer::Builder; + /// let repository = fasta::Repository::default(); + /// let builder = Builder::default().set_reference_sequence_repository(repository); + /// ``` + pub fn set_reference_sequence_repository( + mut self, + reference_sequence_repository: fasta::Repository, + ) -> Self { + self.reference_sequence_repository = reference_sequence_repository; + self + } + + /// Builds an async alignment writer from a path. + /// + /// By default, the format and compression method will be detected from the path extension. + /// This can be overridden by using [`Self::set_format`] and [`Self::set_compression_method`]. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_util::alignment::r#async::io::reader::Builder; + /// let reader = Builder::default().build_from_path("sample.bam").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn build_from_path

( + mut self, + src: P, + ) -> io::Result>> + where + P: AsRef, + { + use crate::alignment::io::writer::builder::{ + detect_compression_method_from_path_extension, detect_format_from_path_extension, + }; + + let src = src.as_ref(); + + if self.compression_method.is_none() { + self.compression_method = Some(detect_compression_method_from_path_extension(src)); + } + + if self.format.is_none() { + self.format = detect_format_from_path_extension(src); + } + + File::create(src) + .await + .map(|file| self.build_from_writer(file))? + .await + } + + /// Builds an async alignment writer from a writer. + /// + /// If the format is not set, a default format is used. If the compression method is not set, a + /// default one is determined by the format. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> tokio::io::Result<()> { + /// use noodles_util::alignment::r#async::io::writer::Builder; + /// use tokio::io; + /// + /// let reader = Builder::default().build_from_writer(io::sink()).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn build_from_writer( + self, + writer: W, + ) -> io::Result>> + where + W: AsyncWrite + Unpin + 'static, + { + use bam::r#async::io::Writer as AsyncBamWriter; + use sam::r#async::io::Writer as AsyncSamWriter; + + let format = self.format.unwrap_or(Format::Sam); + + let compression_method = match self.compression_method { + Some(compression_method) => compression_method, + None => match format { + Format::Sam | Format::Cram => None, + Format::Bam => Some(CompressionMethod::Bgzf), + }, + }; + + let writer = match (format, compression_method) { + (Format::Sam, None) => { + let inner: Box = Box::new(writer); + Writer::Sam(AsyncSamWriter::new(inner)) + } + (Format::Sam, Some(CompressionMethod::Bgzf)) => { + let encoder: Box = Box::new(bgzf::AsyncWriter::new(writer)); + Writer::Sam(AsyncSamWriter::new(encoder)) + } + (Format::Bam, None) => { + let inner: Box = Box::new(writer); + Writer::Bam(AsyncBamWriter::from(inner)) + } + (Format::Bam, Some(CompressionMethod::Bgzf)) => { + let encoder: Box = Box::new(bgzf::AsyncWriter::new(writer)); + Writer::Bam(AsyncBamWriter::from(encoder)) + } + (Format::Cram, None) => { + let inner: Box = Box::new(writer); + let inner = cram::r#async::io::writer::Builder::default() + .set_reference_sequence_repository(self.reference_sequence_repository) + .build_with_writer(inner); + Writer::Cram(inner) + } + (Format::Cram, Some(CompressionMethod::Bgzf)) => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "CRAM cannot be compressed with BGZF", + )); + } + }; + + Ok(writer) + } +} diff --git a/noodles-util/src/alignment/io/writer/builder.rs b/noodles-util/src/alignment/io/writer/builder.rs index 0a3063eee..314f1aba0 100644 --- a/noodles-util/src/alignment/io/writer/builder.rs +++ b/noodles-util/src/alignment/io/writer/builder.rs @@ -186,7 +186,7 @@ impl Builder { } } -fn detect_compression_method_from_path_extension

(path: P) -> Option +pub(crate) fn detect_compression_method_from_path_extension

(path: P) -> Option where P: AsRef, { @@ -196,7 +196,7 @@ where } } -fn detect_format_from_path_extension

(path: P) -> Option +pub(crate) fn detect_format_from_path_extension

(path: P) -> Option where P: AsRef, {