Skip to content

Commit

Permalink
bgzf/multithreaded_writer: Add builder
Browse files Browse the repository at this point in the history
This also adds the ability to set the compression level.

Closes #238.
  • Loading branch information
zaeleus committed Mar 1, 2024
1 parent 43da5c5 commit f80ac45
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 10 deletions.
10 changes: 10 additions & 0 deletions noodles-bgzf/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Changelog

## Unreleased

### Added

* bgzf/multithreaded_writer: Add builder ([#238]).

This also adds the ability to set the compression level.

[#238]: https://github.com/zaeleus/noodles/issues/238

## 0.26.0 - 2023-12-14

### Changed
Expand Down
2 changes: 1 addition & 1 deletion noodles-bgzf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod gz;
pub mod gzi;
pub mod indexed_reader;
mod multithreaded_reader;
mod multithreaded_writer;
pub mod multithreaded_writer;
pub mod reader;
pub mod virtual_position;
pub mod writer;
Expand Down
40 changes: 32 additions & 8 deletions noodles-bgzf/src/multithreaded_writer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//! Multithreaded BGZF writer.

mod builder;

use std::{
io::{self, Write},
num::NonZeroUsize,
Expand All @@ -7,7 +11,11 @@ use std::{
use bytes::{BufMut, Bytes, BytesMut};
use crossbeam_channel::{Receiver, Sender};

use super::gz;
pub use self::builder::Builder;
use super::{
gz,
writer::{CompressionLevel, CompressionLevelImpl},
};

type BufferedTx = Sender<io::Result<Vec<u8>>>;
type BufferedRx = Receiver<io::Result<Vec<u8>>>;
Expand All @@ -28,16 +36,15 @@ pub struct MultithreadedWriter {
}

impl MultithreadedWriter {
/// Creates a multithreaded BGZF writer.
pub fn with_worker_count<W>(worker_count: NonZeroUsize, inner: W) -> Self
fn new<W>(compression_level: CompressionLevel, worker_count: NonZeroUsize, inner: W) -> Self
where
W: Write + Send + 'static,
{
let (write_tx, write_rx) = crossbeam_channel::bounded(worker_count.get());
let (deflate_tx, deflate_rx) = crossbeam_channel::bounded(worker_count.get());

let writer_handle = spawn_writer(inner, write_rx);
let deflater_handles = spawn_deflaters(worker_count, deflate_rx);
let deflater_handles = spawn_deflaters(compression_level, worker_count, deflate_rx);

Self {
writer_handle: Some(writer_handle),
Expand All @@ -48,6 +55,14 @@ impl MultithreadedWriter {
}
}

/// Creates a multithreaded BGZF writer.
pub fn with_worker_count<W>(worker_count: NonZeroUsize, inner: W) -> Self
where
W: Write + Send + 'static,
{
Self::new(CompressionLevel::default(), worker_count, inner)
}

/// Finishes the output stream by flushing any remaining buffers.
///
/// This shuts down the writer and deflater workers and appends the final BGZF EOF block.
Expand Down Expand Up @@ -135,27 +150,36 @@ where
})
}

fn spawn_deflaters(worker_count: NonZeroUsize, deflate_rx: DeflateRx) -> Vec<JoinHandle<()>> {
fn spawn_deflaters<L>(
compression_level: L,
worker_count: NonZeroUsize,
deflate_rx: DeflateRx,
) -> Vec<JoinHandle<()>>
where
L: Into<CompressionLevelImpl>,
{
let compression_level = compression_level.into();

(0..worker_count.get())
.map(|_| {
let deflate_rx = deflate_rx.clone();

thread::spawn(move || {
while let Ok((src, buffered_tx)) = deflate_rx.recv() {
let result = compress(&src);
let result = compress(&src, compression_level);
buffered_tx.send(result).ok();
}
})
})
.collect()
}

fn compress(src: &[u8]) -> io::Result<Vec<u8>> {
fn compress(src: &[u8], compression_level: CompressionLevelImpl) -> io::Result<Vec<u8>> {
use super::{writer::deflate_data, BGZF_HEADER_SIZE};

let mut dst = Vec::new();

let (cdata, crc32, _) = deflate_data(src, Default::default())?;
let (cdata, crc32, _) = deflate_data(src, compression_level)?;

let block_size = BGZF_HEADER_SIZE + cdata.len() + gz::TRAILER_SIZE;
put_header(&mut dst, block_size)?;
Expand Down
41 changes: 41 additions & 0 deletions noodles-bgzf/src/multithreaded_writer/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::{io::Write, num::NonZeroUsize};

use super::MultithreadedWriter;
use crate::writer::CompressionLevel;

/// A multithreaded BGZF writer builder.
pub struct Builder {
compression_level: CompressionLevel,
worker_count: NonZeroUsize,
}

impl Builder {
/// Sets the compression level.
pub fn set_compression_level(mut self, compression_level: CompressionLevel) -> Self {
self.compression_level = compression_level;
self
}

/// Sets the worker count.
pub fn set_worker_count(mut self, worker_count: NonZeroUsize) -> Self {
self.worker_count = worker_count;
self
}

/// Builds a multithreaded BGZF writer from a writer.
pub fn build_from_writer<W>(self, writer: W) -> MultithreadedWriter
where
W: Write + Send + 'static,
{
MultithreadedWriter::new(self.compression_level, self.worker_count, writer)
}
}

impl Default for Builder {
fn default() -> Self {
Self {
compression_level: CompressionLevel::default(),
worker_count: NonZeroUsize::MIN,
}
}
}
2 changes: 1 addition & 1 deletion noodles-bgzf/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) static BGZF_EOF: &[u8] = &[
];

#[cfg(feature = "libdeflate")]
type CompressionLevelImpl = libdeflater::CompressionLvl;
pub(crate) type CompressionLevelImpl = libdeflater::CompressionLvl;
#[cfg(not(feature = "libdeflate"))]
type CompressionLevelImpl = flate2::Compression;

Expand Down

0 comments on commit f80ac45

Please sign in to comment.