Skip to content

Commit 19d513b

Browse files
committed
[ChunksTimeout] Consumes the stream reminder and return the items immediately
Summary: When the underlying stream is an exclusive reference (&mut stream), and we need to drop the `ChunksTimout` stream without losing the buffered items.
1 parent 5a709e3 commit 19d513b

File tree

2 files changed

+53
-0
lines changed

2 files changed

+53
-0
lines changed

tokio-stream/src/stream_ext/chunks_timeout.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,34 @@ impl<S: Stream> ChunksTimeout<S> {
3333
cap: max_size,
3434
}
3535
}
36+
37+
/// Consumes the [`ChunksTimeout`] and then returns all buffered items.
38+
///
39+
/// ```
40+
/// use tokio::time::Duration;
41+
/// use tokio_stream::{self as stream, StreamExt};
42+
///
43+
/// #[tokio::main]
44+
/// async fn main() {
45+
/// let chunked = stream::iter([1, 2, 3, 4])
46+
/// .throttle(Duration::from_millis(2))
47+
/// .chunks_timeout(4, Duration::from_millis(8));
48+
///
49+
/// tokio::pin!(chunked);
50+
///
51+
/// // race with another future
52+
/// tokio::select! {
53+
/// Some(_chunk) = chunked.next() => {}
54+
/// _ = tokio::time::sleep(Duration::from_millis(3)) => {}
55+
/// }
56+
///
57+
/// assert_eq!(chunked.into_remainder(), vec![1, 2]);
58+
/// }
59+
/// ```
60+
pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
61+
let me = self.as_mut().project();
62+
std::mem::take(me.items)
63+
}
3664
}
3765

3866
impl<S: Stream> Stream for ChunksTimeout<S> {
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#![warn(rust_2018_idioms)]
2+
#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery
3+
#![cfg(panic = "unwind")]
4+
5+
use std::error::Error;
6+
use tokio::time::Duration;
7+
use tokio_stream::{self as stream, StreamExt};
8+
9+
#[tokio::test]
10+
async fn stream_chunks_remainder() -> Result<(), Box<dyn Error>> {
11+
let inner = stream::iter([1, 2, 3, 4]).throttle(Duration::from_millis(5));
12+
tokio::pin!(inner);
13+
14+
let chunked = (&mut inner).chunks_timeout(4, Duration::from_millis(20));
15+
tokio::pin!(chunked);
16+
17+
tokio::select! {
18+
Some(_chunk) = chunked.next() => {}
19+
_ = tokio::time::sleep(Duration::from_millis(9)) => {}
20+
}
21+
22+
assert_eq!(chunked.into_remainder(), vec![1, 2]);
23+
assert_eq!(inner.next().await, Some(3));
24+
Ok(())
25+
}

0 commit comments

Comments
 (0)