Skip to content

Commit

Permalink
allow streaming decoder to also be used with a &mut FrameDecoder for …
Browse files Browse the repository at this point in the history
…easier reusing of the decoder
  • Loading branch information
KillingSpark committed Mar 31, 2023
1 parent 3b6403b commit fa7bd9c
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions src/streaming_decoder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use core::borrow::BorrowMut;

use crate::frame_decoder::{BlockDecodingStrategy, FrameDecoder, FrameDecoderError};
use crate::io::{Error, ErrorKind, Read};

Expand All @@ -6,22 +8,26 @@ use crate::io::{Error, ErrorKind, Read};
///
/// The lower level FrameDecoder by comparison allows for finer grained control but need sto have it's decode_blocks method called continously
/// to decode the zstd-frame.
pub struct StreamingDecoder<READ: Read> {
pub decoder: FrameDecoder,
pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
pub decoder: DEC,
source: READ,
}

impl<READ: Read> StreamingDecoder<READ> {
pub fn new(mut source: READ) -> Result<StreamingDecoder<READ>, FrameDecoderError> {
let mut decoder = FrameDecoder::new();
decoder.init(&mut source)?;
impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
pub fn new_with_decoder(
mut source: READ,
mut decoder: DEC,
) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
decoder.borrow_mut().init(&mut source)?;
Ok(StreamingDecoder { decoder, source })
}
}

pub fn new_with_decoder(
impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
pub fn new(
mut source: READ,
mut decoder: FrameDecoder,
) -> Result<StreamingDecoder<READ>, FrameDecoderError> {
) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
let mut decoder = FrameDecoder::new();
decoder.init(&mut source)?;
Ok(StreamingDecoder { decoder, source })
}
Expand All @@ -31,9 +37,10 @@ impl<READ: Read> StreamingDecoder<READ> {
}
}

impl<READ: Read> Read for StreamingDecoder<READ> {
impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
if self.decoder.is_finished() && self.decoder.can_collect() == 0 {
let decoder = self.decoder.borrow_mut();
if decoder.is_finished() && decoder.can_collect() == 0 {
//No more bytes can ever be decoded
return Ok(0);
}
Expand All @@ -43,10 +50,10 @@ impl<READ: Read> Read for StreamingDecoder<READ> {
// So we need to call this until we can actually collect enough bytes

// TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function
while self.decoder.can_collect() < buf.len() && !self.decoder.is_finished() {
while decoder.can_collect() < buf.len() && !decoder.is_finished() {
//More bytes can be decoded
let additional_bytes_needed = buf.len() - self.decoder.can_collect();
match self.decoder.decode_blocks(
let additional_bytes_needed = buf.len() - decoder.can_collect();
match decoder.decode_blocks(
&mut self.source,
BlockDecodingStrategy::UptoBytes(additional_bytes_needed),
) {
Expand All @@ -61,6 +68,6 @@ impl<READ: Read> Read for StreamingDecoder<READ> {
}
}

self.decoder.read(buf)
decoder.read(buf)
}
}

0 comments on commit fa7bd9c

Please sign in to comment.