Skip to content

Commit 9800cd0

Browse files
author
Geoffroy Couprie
committed
support intermediate flushes when encoding
due to the use of ready!(), whenever the underlying reader returns Poll::Pending, it is transmitted directly to the caller, so there is no flush until the entire data has been read. This commit flushes compressed data when we get Poll::Pending, and stays in the flushing state in case there is more data to send.
1 parent 149c9f8 commit 9800cd0

File tree

2 files changed

+40
-21
lines changed

2 files changed

+40
-21
lines changed

src/codec/flate/encoder.rs

-12
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,6 @@ impl Encode for FlateEncoder {
6767
FlushCompress::Sync,
6868
)?;
6969

70-
loop {
71-
let old_len = output.written().len();
72-
self.encode(
73-
&mut PartialBuffer::new(&[][..]),
74-
output,
75-
FlushCompress::None,
76-
)?;
77-
if output.written().len() == old_len {
78-
break;
79-
}
80-
}
81-
8270
self.flushed = true;
8371
Ok(!output.unwritten().is_empty())
8472
}

src/tokio/bufread/generic/encoder.rs

+40-9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
1313
enum State {
1414
Encoding,
1515
Flushing,
16+
Finishing,
1617
Done,
1718
}
1819

@@ -57,27 +58,56 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
5758
output: &mut PartialBuffer<&mut [u8]>,
5859
) -> Poll<Result<()>> {
5960
let mut this = self.project();
61+
let mut read = 0usize;
6062

6163
loop {
6264
*this.state = match this.state {
6365
State::Encoding => {
64-
let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
65-
if input.is_empty() {
66-
State::Flushing
67-
} else {
68-
let mut input = PartialBuffer::new(input);
66+
let res = this.reader.as_mut().poll_fill_buf(cx);
67+
68+
match res {
69+
Poll::Pending => {
70+
if read == 0 {
71+
return Poll::Pending;
72+
} else {
73+
State::Flushing
74+
}
75+
}
76+
Poll::Ready(res) => {
77+
let input = res?;
78+
79+
if input.is_empty() {
80+
State::Finishing
81+
} else {
82+
let mut input = PartialBuffer::new(input);
83+
this.encoder.encode(&mut input, output)?;
84+
let len = input.written().len();
85+
this.reader.as_mut().consume(len);
86+
read += len;
87+
State::Encoding
88+
}
89+
}
90+
}
91+
}
92+
93+
State::Flushing => {
94+
if read == 0 {
95+
let mut input = PartialBuffer::new(&[][..]);
6996
this.encoder.encode(&mut input, output)?;
70-
let len = input.written().len();
71-
this.reader.as_mut().consume(len);
97+
}
98+
99+
if this.encoder.flush(output)? {
72100
State::Encoding
101+
} else {
102+
State::Flushing
73103
}
74104
}
75105

76-
State::Flushing => {
106+
State::Finishing => {
77107
if this.encoder.finish(output)? {
78108
State::Done
79109
} else {
80-
State::Flushing
110+
State::Finishing
81111
}
82112
}
83113

@@ -87,6 +117,7 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
87117
if let State::Done = *this.state {
88118
return Poll::Ready(Ok(()));
89119
}
120+
90121
if output.unwritten().is_empty() {
91122
return Poll::Ready(Ok(()));
92123
}

0 commit comments

Comments
 (0)