diff --git a/src/codec/flate/encoder.rs b/src/codec/flate/encoder.rs index 5eab6b77..aab20a57 100644 --- a/src/codec/flate/encoder.rs +++ b/src/codec/flate/encoder.rs @@ -67,18 +67,6 @@ impl Encode for FlateEncoder { FlushCompress::Sync, )?; - loop { - let old_len = output.written().len(); - self.encode( - &mut PartialBuffer::new(&[][..]), - output, - FlushCompress::None, - )?; - if output.written().len() == old_len { - break; - } - } - self.flushed = true; Ok(!output.unwritten().is_empty()) } diff --git a/src/tokio/bufread/generic/encoder.rs b/src/tokio/bufread/generic/encoder.rs index a80fa122..5d4525f9 100644 --- a/src/tokio/bufread/generic/encoder.rs +++ b/src/tokio/bufread/generic/encoder.rs @@ -13,6 +13,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; enum State { Encoding, Flushing, + Finishing, Done, } @@ -57,27 +58,56 @@ impl Encoder { output: &mut PartialBuffer<&mut [u8]>, ) -> Poll> { let mut this = self.project(); + let mut read = 0usize; loop { *this.state = match this.state { State::Encoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Flushing - } else { - let mut input = PartialBuffer::new(input); + let res = this.reader.as_mut().poll_fill_buf(cx); + + match res { + Poll::Pending => { + if read == 0 { + return Poll::Pending; + } else { + State::Flushing + } + } + Poll::Ready(res) => { + let input = res?; + + if input.is_empty() { + State::Finishing + } else { + let mut input = PartialBuffer::new(input); + this.encoder.encode(&mut input, output)?; + let len = input.written().len(); + this.reader.as_mut().consume(len); + read += len; + State::Encoding + } + } + } + } + + State::Flushing => { + if read == 0 { + let mut input = PartialBuffer::new(&[][..]); this.encoder.encode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); + } + + if this.encoder.flush(output)? { State::Encoding + } else { + State::Flushing } } - State::Flushing => { + State::Finishing => { if this.encoder.finish(output)? { State::Done } else { - State::Flushing + State::Finishing } } @@ -87,6 +117,7 @@ impl Encoder { if let State::Done = *this.state { return Poll::Ready(Ok(())); } + if output.unwritten().is_empty() { return Poll::Ready(Ok(())); }