Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-threading API clarification #115

Open
d-cameron opened this issue Oct 6, 2022 · 4 comments
Open

Multi-threading API clarification #115

d-cameron opened this issue Oct 6, 2022 · 4 comments
Labels

Comments

@d-cameron
Copy link

I'm attempting to write a small utility that process bam records and I can't figure out the aysnc API. I'm attempting to adapt the noodles-bam/examples/bam_reheader_async.rs example and I've been running into problems with isolating the async code.

A common design pattern for bioinformatics tool is to iterate over one or more files in genomic coordinate order, process the records, then write (typically a subset of) the records to new files. The cost of the processing for many of these programs is small and the bottleneck is I/O and record parsing. For these sorts of programs, HTSJDK/htslib expose an API that allow offloading of the I/O and record parsing to background threads.

Does noodles support a synchronous *AM read/write API in which the compression/decompression and serialisation/parsing are off-loaded to background threads? Something along the lines of builder().set_worker_threads(8).set_buffer_size(8 * 65536).build(); ?

@zaeleus zaeleus added the bam label Oct 10, 2022
@zaeleus
Copy link
Owner

zaeleus commented Oct 10, 2022

The short answer is no; this is currently not supported.

Sorry for the delayed response. I originally thought this was doable with the current API and wanted to give an example. Unfortunately, while you can do this for reading and parsing now, there is no way to preserialize a record for writing.

Thank you for bringing up this use case. I'll investigate it further.

@d-cameron
Copy link
Author

What's the current state of this? Does the API now support multi-threading? Even just multithreading support for reading would be immensely useful.

Thanks

@zaeleus
Copy link
Owner

zaeleus commented Feb 9, 2024

With respect to bgzip-compressed formats, you can compose a multithreaded decoder with a format reader. E.g.,

let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);
let file = File::open(src)?;
let decoder = bgzf::MultithreadedReader::with_worker_count(worker_count, file);
let mut reader = bam::io::Reader::from(decoder);

This can also be done with a BGZF encoder and format writer. I think this is half of what you originally requested, and it does greatly help with compression/decompression performance. The reader, in particular, does have the limitation of not implementing random access though.

But there is still no multithreaded format readers/writers. In a passthrough context, the SAM/BAM readers don't eagerly decode record fields anymore, so I tried to write an example for parallel serialization, which is feasible with the current API. It's a POC and nontrivial, but it does show something is possible.

main.rs
// cargo add crossbeam-channel
// cargo add noodles --features bam,bgzf,sam
// cargo add noodles-bgzf --features libdeflate

use std::{
    env,
    fs::File,
    io::{self, Write},
    mem,
    num::NonZeroUsize,
    sync::Arc,
    thread::{self, JoinHandle},
};

use crossbeam_channel::{Receiver, Sender};
use noodles::{bam, bgzf, sam};

const CHUNK_SIZE: usize = 1 << 12;

fn main() -> io::Result<()> {
    let mut args = env::args().skip(1);
    let src = args.next().expect("missing src");
    let dst = args.next().expect("missing dst");

    let worker_count = thread::available_parallelism().unwrap_or(NonZeroUsize::MIN);

    let src_file = File::open(src)?;
    let decoder = bgzf::MultithreadedReader::with_worker_count(worker_count, src_file);
    let mut reader = bam::io::Reader::from(decoder);

    let header = reader.read_header().map(Arc::new)?;

    let dst_file = File::create(dst)?;
    let encoder = bgzf::MultithreadedWriter::with_worker_count(worker_count, dst_file);
    let mut writer = bam::io::Writer::from(encoder);

    writer.write_header(&header)?;

    let mut writer = MultithreadedBamWriter::new(worker_count, header, writer);

    for result in reader.records() {
        let record = result?;
        writer.write_record(record)?;
    }

    writer.finish()?;

    Ok(())
}

type BufferedTx = Sender<io::Result<Vec<u8>>>;
type BufferedRx = Receiver<io::Result<Vec<u8>>>;
type SerializeTx = Sender<(Vec<bam::Record>, BufferedTx)>;
type SerializeRx = Receiver<(Vec<bam::Record>, BufferedTx)>;
type WriteTx = Sender<BufferedRx>;
type WriteRx = Receiver<BufferedRx>;
type RecycleTx = Sender<Vec<u8>>;
type RecycleRx = Receiver<Vec<u8>>;

struct MultithreadedBamWriter {
    writer_handle: Option<JoinHandle<io::Result<()>>>,
    serializer_handles: Vec<JoinHandle<()>>,
    chunk: Vec<bam::Record>,
    write_tx: Option<WriteTx>,
    serialize_tx: Option<SerializeTx>,
    recycle_tx: Option<RecycleTx>,
}

impl MultithreadedBamWriter {
    fn new(
        worker_count: NonZeroUsize,
        header: Arc<sam::Header>,
        inner: bam::io::Writer<bgzf::MultithreadedWriter>,
    ) -> Self {
        let (write_tx, write_rx) = crossbeam_channel::bounded(worker_count.get());
        let (serialize_tx, serialize_rx) = crossbeam_channel::bounded(worker_count.get());
        let (recycle_tx, recycle_rx) = crossbeam_channel::bounded(worker_count.get());

        for _ in 0..worker_count.get() {
            recycle_tx.send(Vec::new()).unwrap();
        }

        let writer_handle = spawn_writer(inner, write_rx, recycle_tx.clone());
        let serializer_handles = spawn_serializers(worker_count, serialize_rx, recycle_rx, header);

        Self {
            writer_handle: Some(writer_handle),
            serializer_handles,
            chunk: Vec::with_capacity(CHUNK_SIZE),
            write_tx: Some(write_tx),
            serialize_tx: Some(serialize_tx),
            recycle_tx: Some(recycle_tx),
        }
    }

    fn write_record(&mut self, record: bam::Record) -> io::Result<()> {
        self.chunk.push(record);

        if self.chunk.len() >= self.chunk.capacity() {
            self.flush();
        }

        Ok(())
    }

    fn flush(&mut self) {
        let records = mem::replace(&mut self.chunk, Vec::with_capacity(CHUNK_SIZE));

        let (buffered_tx, buffered_rx) = crossbeam_channel::bounded(1);

        self.write_tx.as_ref().unwrap().send(buffered_rx).unwrap();

        self.serialize_tx
            .as_ref()
            .unwrap()
            .send((records, buffered_tx))
            .unwrap();
    }

    fn finish(&mut self) -> io::Result<()> {
        self.flush();

        self.recycle_tx.take();
        self.serialize_tx.take();

        for handle in self.serializer_handles.drain(..) {
            handle.join().unwrap();
        }

        self.write_tx.take();

        if let Some(handle) = self.writer_handle.take() {
            handle.join().unwrap()?;
        }

        Ok(())
    }
}

fn spawn_writer(
    mut writer: bam::io::Writer<bgzf::MultithreadedWriter>,
    write_rx: WriteRx,
    recycle_tx: RecycleTx,
) -> JoinHandle<io::Result<()>> {
    thread::spawn(move || {
        while let Ok(buffered_rx) = write_rx.recv() {
            if let Ok(result) = buffered_rx.recv() {
                let buf = result?;
                writer.get_mut().write_all(&buf[..])?;
                recycle_tx.send(buf).ok();
            }
        }

        Ok(())
    })
}

fn spawn_serializers(
    worker_count: NonZeroUsize,
    serialize_rx: SerializeRx,
    recycle_rx: RecycleRx,
    header: Arc<sam::Header>,
) -> Vec<JoinHandle<()>> {
    (0..worker_count.get())
        .map(|_| {
            let serialize_rx = serialize_rx.clone();
            let recycle_rx = recycle_rx.clone();
            let header = header.clone();

            thread::spawn(move || {
                while let Ok((records, buffered_tx)) = serialize_rx.recv() {
                    if let Ok(mut dst) = recycle_rx.recv() {
                        let result = serialize(&header, &records, &mut dst).map(|_| dst);
                        buffered_tx.send(result).ok();
                    }
                }
            })
        })
        .collect()
}

fn serialize(header: &sam::Header, records: &[bam::Record], dst: &mut Vec<u8>) -> io::Result<()> {
    dst.clear();

    let mut serializer = bam::io::Writer::from(dst);

    for record in records {
        serializer.write_record(header, record)?;
    }

    Ok(())
}

@ghuls
Copy link
Contributor

ghuls commented Feb 29, 2024

Is it possible to set the compression level when using: bgzf::MultithreadedWriter?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants