Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async alignment writer #292

Merged
merged 8 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions noodles-util/examples/util_alignment_rewrite_async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! Rewrites an alignment format to another alignment format asynchronously.
//!
//! The output format is determined from the extension of the destination.

use std::env;

use futures::TryStreamExt;
use noodles_fasta::{self as fasta, repository::adapters::IndexedReader};
use noodles_util::alignment;
use tokio::io;

#[tokio::main]
async fn main() -> io::Result<()> {
let mut args = env::args().skip(1);

let src = args.next().expect("missing src");
let dst = args.next().expect("missing dst");
let fasta_src = args.next();

let repository = fasta_src
.map(|src| fasta::io::indexed_reader::Builder::default().build_from_path(src))
.transpose()?
.map(IndexedReader::new)
.map(fasta::Repository::new)
.unwrap_or_default();

let mut reader = alignment::r#async::io::reader::Builder::default()
.set_reference_sequence_repository(repository.clone())
.build_from_path(src)
.await?;

let header = reader.read_header().await?;

let mut writer = alignment::r#async::io::writer::Builder::default()
.set_reference_sequence_repository(repository)
.build_from_path(dst)
.await?;

writer.write_header(&header).await?;

while let Some(record) = reader.records(&header).try_next().await? {
writer.write_record(&header, &record).await?;
}

writer.shutdown(&header).await?;

Ok(())
}
3 changes: 2 additions & 1 deletion noodles-util/src/alignment/async/io.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Async alignment format I/O.

pub mod reader;
pub mod writer;

pub use self::reader::Reader;
pub use self::{reader::Reader, writer::Writer};
116 changes: 116 additions & 0 deletions noodles-util/src/alignment/async/io/writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//! Async alignment writer.

mod builder;

use noodles_bam as bam;
use noodles_cram as cram;
use noodles_sam as sam;
use tokio::io::{self, AsyncWrite};

pub use self::builder::Builder;

/// An async alignment writer.
pub enum Writer<W: AsyncWrite> {
/// SAM.
Sam(sam::r#async::io::Writer<W>),
/// BAM.
Bam(bam::r#async::io::Writer<W>),
/// CRAM.
Cram(cram::r#async::io::Writer<W>),
}

impl<W> Writer<W>
where
W: AsyncWrite + Unpin,
{
/// Writes a SAM header.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_sam as sam;
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// use tokio::io::{self, AsyncWriteExt};
///
/// let mut writer = Builder::default().build_from_writer(io::sink()).await?;
///
/// let header = sam::Header::default();
/// writer.write_header(&header).await?;
/// # Ok(())
/// # }
/// ```
pub async fn write_header(&mut self, header: &sam::Header) -> io::Result<()> {
match self {
Self::Sam(writer) => writer.write_header(header).await,
Self::Bam(writer) => writer.write_header(header).await,
Self::Cram(writer) => {
writer.write_file_definition().await?;
writer.write_file_header(header).await
}
}
}

/// Writes an alignment record.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_sam as sam;
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// use tokio::io::{self, AsyncWriteExt};
///
/// let mut writer = Builder::default().build_from_writer(io::sink()).await?;
///
/// let header = sam::Header::default();
/// let record = sam::Record::default();
/// writer.write_record(&header, &record).await?;
/// # Ok(())
/// # }
/// ```
pub async fn write_record(
&mut self,
header: &sam::Header,
record: &dyn sam::alignment::Record,
) -> io::Result<()> {
match self {
Self::Sam(writer) => writer.write_alignment_record(header, record).await,
Self::Bam(writer) => writer.write_alignment_record(header, record).await,
Self::Cram(writer) => {
let record = cram::Record::try_from_alignment_record(header, record)?;
writer.write_record(header, record).await
}
}
}

/// Shuts down the alignment format writer.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_sam as sam;
/// use noodles_util::alignment::{self, io::Format};
/// use tokio::io;
///
/// let mut writer = alignment::r#async::io::writer::Builder::default()
/// .set_format(Format::Sam)
/// .build_from_writer(io::sink()).await?;
///
/// let header = sam::Header::default();
/// writer.shutdown(&header).await?;
/// # Ok(())
/// # }
/// ```
pub async fn shutdown(&mut self, header: &sam::Header) -> io::Result<()> {
match self {
Self::Sam(_) => Ok(()),
Self::Bam(writer) => writer.shutdown().await,
Self::Cram(writer) => writer.shutdown(header).await,
}
}
}
187 changes: 187 additions & 0 deletions noodles-util/src/alignment/async/io/writer/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::path::Path;

use noodles_bam as bam;
use noodles_bgzf as bgzf;
use noodles_cram as cram;
use noodles_fasta as fasta;
use noodles_sam as sam;
use tokio::{
fs::File,
io::{self, AsyncWrite},
};

use super::Writer;
use crate::alignment::io::{CompressionMethod, Format};

/// An async alignment writer builder.
#[derive(Default)]
pub struct Builder {
compression_method: Option<Option<CompressionMethod>>,
format: Option<Format>,
reference_sequence_repository: fasta::Repository,
}

impl Builder {
/// Sets the compression method.
///
/// By default, the compression method is autodetected on build. This can be used to override
/// it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::writer::Builder, io::CompressionMethod};
/// let builder = Builder::default().set_compression_method(Some(CompressionMethod::Bgzf));
/// ```
pub fn set_compression_method(mut self, compression_method: Option<CompressionMethod>) -> Self {
self.compression_method = Some(compression_method);
self
}

/// Sets the format of the output.
///
/// By default, the format is autodetected on build. This can be used to override it.
///
/// # Examples
///
/// ```
/// use noodles_util::alignment::{r#async::io::writer::Builder, io::Format};
/// let builder = Builder::default().set_format(Format::Sam);
/// ```
pub fn set_format(mut self, format: Format) -> Self {
self.format = Some(format);
self
}

/// Sets the reference sequence repository.
///
/// # Examples
///
/// ```
/// use noodles_fasta as fasta;
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// let repository = fasta::Repository::default();
/// let builder = Builder::default().set_reference_sequence_repository(repository);
/// ```
pub fn set_reference_sequence_repository(
mut self,
reference_sequence_repository: fasta::Repository,
) -> Self {
self.reference_sequence_repository = reference_sequence_repository;
self
}

/// Builds an async alignment writer from a path.
///
/// By default, the format and compression method will be detected from the path extension.
/// This can be overridden by using [`Self::set_format`] and [`Self::set_compression_method`].
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::reader::Builder;
/// let reader = Builder::default().build_from_path("sample.bam").await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_path<P>(
mut self,
src: P,
) -> io::Result<Writer<Box<dyn AsyncWrite + Unpin>>>
where
P: AsRef<Path>,
{
use crate::alignment::io::writer::builder::{
detect_compression_method_from_path_extension, detect_format_from_path_extension,
};

let src = src.as_ref();

if self.compression_method.is_none() {
self.compression_method = Some(detect_compression_method_from_path_extension(src));
}

if self.format.is_none() {
self.format = detect_format_from_path_extension(src);
}

File::create(src)
.await
.map(|file| self.build_from_writer(file))?
.await
}

/// Builds an async alignment writer from a writer.
///
/// If the format is not set, a default format is used. If the compression method is not set, a
/// default one is determined by the format.
///
/// # Examples
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> tokio::io::Result<()> {
/// use noodles_util::alignment::r#async::io::writer::Builder;
/// use tokio::io;
///
/// let reader = Builder::default().build_from_writer(io::sink()).await?;
/// # Ok(())
/// # }
/// ```
pub async fn build_from_writer<W>(
self,
writer: W,
) -> io::Result<Writer<Box<dyn AsyncWrite + Unpin>>>
where
W: AsyncWrite + Unpin + 'static,
{
use bam::r#async::io::Writer as AsyncBamWriter;
use sam::r#async::io::Writer as AsyncSamWriter;

let format = self.format.unwrap_or(Format::Sam);

let compression_method = match self.compression_method {
Some(compression_method) => compression_method,
None => match format {
Format::Sam | Format::Cram => None,
Format::Bam => Some(CompressionMethod::Bgzf),
},
};

let writer = match (format, compression_method) {
(Format::Sam, None) => {
let inner: Box<dyn AsyncWrite + Unpin> = Box::new(writer);
Writer::Sam(AsyncSamWriter::new(inner))
}
(Format::Sam, Some(CompressionMethod::Bgzf)) => {
let encoder: Box<dyn AsyncWrite + Unpin> = Box::new(bgzf::AsyncWriter::new(writer));
Writer::Sam(AsyncSamWriter::new(encoder))
}
(Format::Bam, None) => {
let inner: Box<dyn AsyncWrite + Unpin> = Box::new(writer);
Writer::Bam(AsyncBamWriter::from(inner))
}
(Format::Bam, Some(CompressionMethod::Bgzf)) => {
let encoder: Box<dyn AsyncWrite + Unpin> = Box::new(bgzf::AsyncWriter::new(writer));
Writer::Bam(AsyncBamWriter::from(encoder))
}
(Format::Cram, None) => {
let inner: Box<dyn AsyncWrite + Unpin> = Box::new(writer);
let inner = cram::r#async::io::writer::Builder::default()
.set_reference_sequence_repository(self.reference_sequence_repository)
.build_with_writer(inner);
Writer::Cram(inner)
}
(Format::Cram, Some(CompressionMethod::Bgzf)) => {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"CRAM cannot be compressed with BGZF",
));
}
};

Ok(writer)
}
}
4 changes: 2 additions & 2 deletions noodles-util/src/alignment/io/writer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl Builder {
}
}

fn detect_compression_method_from_path_extension<P>(path: P) -> Option<CompressionMethod>
pub(crate) fn detect_compression_method_from_path_extension<P>(path: P) -> Option<CompressionMethod>
where
P: AsRef<Path>,
{
Expand All @@ -196,7 +196,7 @@ where
}
}

fn detect_format_from_path_extension<P>(path: P) -> Option<Format>
pub(crate) fn detect_format_from_path_extension<P>(path: P) -> Option<Format>
where
P: AsRef<Path>,
{
Expand Down