Skip to content

Commit

Permalink
bgzf/multithreaded_writer: Move constructor to builder
Browse files Browse the repository at this point in the history
  • Loading branch information
zaeleus committed Apr 29, 2024
1 parent 44e17d7 commit 426714f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 37 deletions.
37 changes: 5 additions & 32 deletions noodles-bgzf/src/multithreaded_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use crossbeam_channel::{Receiver, Sender};

pub use self::builder::Builder;
use super::{
gz,
writer::{CompressionLevel, CompressionLevelImpl},
};
use super::{gz, writer::CompressionLevelImpl};

type BufferedTx = Sender<io::Result<Vec<u8>>>;
type BufferedRx = Receiver<io::Result<Vec<u8>>>;
Expand Down Expand Up @@ -48,28 +45,6 @@ impl<W> MultithreadedWriter<W>
where
W: Write + Send + 'static,
{
fn with_compression_level_and_worker_count(
compression_level: CompressionLevel,
worker_count: NonZeroUsize,
inner: W,
) -> Self {
let (write_tx, write_rx) = crossbeam_channel::bounded(worker_count.get());
let (deflate_tx, deflate_rx) = crossbeam_channel::bounded(worker_count.get());

let writer_handle = spawn_writer(inner, write_rx);
let deflater_handles = spawn_deflaters(compression_level, worker_count, deflate_rx);

Self {
state: Some(State::Running {
writer_handle,
deflater_handles,
write_tx,
deflate_tx,
}),
buf: BytesMut::new(),
}
}

/// Creates a multithreaded BGZF writer with a default worker count.
///
/// # Examples
Expand All @@ -80,7 +55,7 @@ where
/// let writer = bgzf::MultithreadedWriter::new(io::sink());
/// ```
pub fn new(inner: W) -> Self {
Self::with_worker_count(NonZeroUsize::MIN, inner)
Builder::default().build_from_writer(inner)
}

/// Creates a multithreaded BGZF writer with a worker count.
Expand All @@ -94,11 +69,9 @@ where
/// let writer = bgzf::MultithreadedWriter::with_worker_count(NonZeroUsize::MIN, io::sink());
/// ```
pub fn with_worker_count(worker_count: NonZeroUsize, inner: W) -> Self {
Self::with_compression_level_and_worker_count(
CompressionLevel::default(),
worker_count,
inner,
)
Builder::default()
.set_worker_count(worker_count)
.build_from_writer(inner)
}

/// Finishes the output stream by flushing any remaining buffers.
Expand Down
27 changes: 22 additions & 5 deletions noodles-bgzf/src/multithreaded_writer/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{io::Write, num::NonZeroUsize};

use bytes::BytesMut;

use super::MultithreadedWriter;
use crate::writer::CompressionLevel;

Expand Down Expand Up @@ -50,11 +52,26 @@ impl Builder {
where
W: Write + Send + 'static,
{
MultithreadedWriter::with_compression_level_and_worker_count(
self.compression_level,
self.worker_count,
writer,
)
use super::{spawn_deflaters, spawn_writer, State};

let worker_count = self.worker_count.get();

let (write_tx, write_rx) = crossbeam_channel::bounded(worker_count);
let (deflate_tx, deflate_rx) = crossbeam_channel::bounded(worker_count);

let writer_handle = spawn_writer(writer, write_rx);
let deflater_handles =
spawn_deflaters(self.compression_level, self.worker_count, deflate_rx);

MultithreadedWriter {
state: Some(State::Running {
writer_handle,
deflater_handles,
write_tx,
deflate_tx,
}),
buf: BytesMut::new(),
}
}
}

Expand Down

0 comments on commit 426714f

Please sign in to comment.