Skip to content

Commit

Permalink
bgzf/multithreaded_reader: Add done state
Browse files Browse the repository at this point in the history
`State::Done` replaces the usage of `None`, as the inner reader is no
longer owned by the reader.
  • Loading branch information
zaeleus committed Apr 29, 2024
1 parent 426714f commit 90a11b9
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions noodles-bgzf/src/multithreaded_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum State<R> {
read_rx: ReadRx,
recycle_tx: RecycleTx,
},
Done,
}

#[derive(Debug, Default)]
Expand All @@ -39,7 +40,7 @@ struct Buffer {
/// This is a multithreaded BGZF reader that uses a thread pool to decompress block data. It places
/// the inner reader on its own thread to read raw frames asynchronously.
pub struct MultithreadedReader<R> {
state: Option<State<R>>,
state: State<R>,
worker_count: NonZeroUsize,
position: u64,
buffer: Buffer,
Expand Down Expand Up @@ -86,7 +87,9 @@ impl<R> MultithreadedReader<R> {
/// # Ok::<_, io::Error>(())
/// ```
pub fn finish(&mut self) -> io::Result<R> {
match self.state.take().unwrap() {
let state = mem::replace(&mut self.state, State::Done);

match state {
State::Paused(inner) => Ok(inner),
State::Running {
reader_handle,
Expand All @@ -102,6 +105,7 @@ impl<R> MultithreadedReader<R> {

reader_handle.join().unwrap().map_err(|e| e.1)
}
State::Done => panic!("invalid state"),
}
}
}
Expand Down Expand Up @@ -135,7 +139,7 @@ where
/// ```
pub fn with_worker_count(worker_count: NonZeroUsize, inner: R) -> Self {
Self {
state: Some(State::Paused(inner)),
state: State::Paused(inner),
worker_count,
position: 0,
buffer: Buffer::default(),
Expand All @@ -155,18 +159,20 @@ where
pub fn get_mut(&mut self) -> &mut R {
self.pause();

match self.state.as_mut() {
Some(State::Paused(inner)) => inner,
match &mut self.state {
State::Paused(inner) => inner,
_ => panic!("invalid state"),
}
}

fn resume(&mut self) {
if matches!(self.state, Some(State::Running { .. })) {
if matches!(self.state, State::Running { .. }) {
return;
}

let Some(State::Paused(inner)) = self.state.take() else {
let state = mem::replace(&mut self.state, State::Done);

let State::Paused(inner) = state else {
panic!("invalid state");
};

Expand All @@ -183,25 +189,27 @@ where
let reader_handle = spawn_reader(inner, inflate_tx, read_tx, recycle_rx);
let inflater_handles = spawn_inflaters(self.worker_count, inflate_rx);

self.state = Some(State::Running {
self.state = State::Running {
reader_handle,
inflater_handles,
read_rx,
recycle_tx,
});
};
}

fn pause(&mut self) {
if matches!(self.state, Some(State::Paused(_))) {
if matches!(self.state, State::Paused(_)) {
return;
}

let state = mem::replace(&mut self.state, State::Done);

let State::Running {
reader_handle,
mut inflater_handles,
recycle_tx,
..
} = self.state.take().unwrap()
} = state
else {
panic!("invalid state");
};
Expand All @@ -218,7 +226,7 @@ where
Err(ReadError(inner, _)) => inner,
};

self.state = Some(State::Paused(inner));
self.state = State::Paused(inner);
}

fn read_block(&mut self) -> io::Result<()> {
Expand All @@ -228,7 +236,7 @@ where
read_rx,
recycle_tx,
..
} = self.state.as_ref().unwrap()
} = &self.state
else {
panic!("invalid state");
};
Expand Down Expand Up @@ -280,7 +288,7 @@ where

impl<R> Drop for MultithreadedReader<R> {
fn drop(&mut self) {
if self.state.is_some() {
if !matches!(self.state, State::Done) {
let _ = self.finish();
}
}
Expand Down

0 comments on commit 90a11b9

Please sign in to comment.