Skip to content

Commit b464cb5

Browse files
committed
stream: add ChunksTimeout::into_remainder
Summary: When the underlying stream is an exclusive reference (&mut stream), and we need to drop the `ChunksTimout` stream without losing the buffered items.
1 parent 5a709e3 commit b464cb5

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

tokio-stream/src/stream_ext/chunks_timeout.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ impl<S: Stream> ChunksTimeout<S> {
3333
cap: max_size,
3434
}
3535
}
36+
37+
/// Consumes the [`ChunksTimeout`] and then returns all buffered items.
38+
pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
39+
let me = self.as_mut().project();
40+
std::mem::take(me.items)
41+
}
3642
}
3743

3844
impl<S: Stream> Stream for ChunksTimeout<S> {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#![warn(rust_2018_idioms)]
2+
3+
use futures::FutureExt;
4+
use std::error::Error;
5+
use tokio::time;
6+
use tokio::time::Duration;
7+
use tokio_stream::{self as stream, StreamExt};
8+
use tokio_test::assert_pending;
9+
use tokio_test::task;
10+
11+
#[tokio::test(start_paused = true)]
12+
async fn stream_chunks_remainder() -> Result<(), Box<dyn Error>> {
13+
let stream1 =
14+
stream::iter([5]).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
15+
16+
let inner = stream::iter([1, 2, 3, 4]).chain(stream1);
17+
tokio::pin!(inner);
18+
19+
let chunked = (&mut inner).chunks_timeout(10, Duration::from_millis(20));
20+
21+
let mut chunked = task::spawn(chunked);
22+
assert_pending!(chunked.poll_next());
23+
24+
let remainder = chunked.enter(|_, stream| stream.into_remainder());
25+
26+
assert_eq!(remainder, vec![1, 2, 3, 4]);
27+
time::advance(Duration::from_secs(2)).await;
28+
assert_eq!(inner.next().now_or_never(), Some(Some(5)));
29+
Ok(())
30+
}

0 commit comments

Comments
 (0)