-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add StreamExt::partition() method #7065
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
#![allow(dead_code)] | ||
|
||
use crate::Stream; | ||
|
||
use std::{ | ||
fmt, | ||
pin::Pin, | ||
sync::{Arc, Mutex}, | ||
task::{Context, Poll, Waker}, | ||
}; | ||
|
||
/// A stream returned by the [`partition`](super::StreamExt::partition) method. | ||
pub enum Partition<St, F> | ||
where | ||
St: Stream, | ||
{ | ||
/// A stream that yields items for which the predicate returns `true`. | ||
Matches(Arc<Mutex<Inner<St, F>>>), | ||
/// A stream that yields items for which the predicate returns `false`. | ||
NonMatches(Arc<Mutex<Inner<St, F>>>), | ||
Comment on lines
+17
to
+20
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be a const generic boolean. |
||
} | ||
|
||
impl<St, F> fmt::Debug for Partition<St, F> | ||
where | ||
St: fmt::Debug + Stream, | ||
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
match self { | ||
Partition::Matches(inner) => f.debug_tuple("Partition::Matches").field(inner).finish(), | ||
Partition::NonMatches(inner) => { | ||
f.debug_tuple("Partition::NonMatches").field(inner).finish() | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl<St, F> Partition<St, F> | ||
where | ||
St: Stream + Unpin, | ||
F: FnMut(&St::Item) -> bool, | ||
{ | ||
pub(super) fn new(stream: St, f: F) -> (Self, Self) { | ||
let inner = Arc::new(Mutex::new(Inner::new(stream, f))); | ||
( | ||
Partition::Matches(inner.clone()), | ||
Partition::NonMatches(inner), | ||
) | ||
} | ||
} | ||
|
||
impl<St, F> Stream for Partition<St, F> | ||
where | ||
St: Stream + Unpin, | ||
F: FnMut(&St::Item) -> bool, | ||
{ | ||
type Item = St::Item; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
match self.get_mut() { | ||
Partition::Matches(inner) => inner.lock().unwrap().poll_next(cx, true), | ||
Partition::NonMatches(inner) => inner.lock().unwrap().poll_next(cx, false), | ||
} | ||
} | ||
} | ||
|
||
pub struct Inner<St, F> | ||
where | ||
St: Stream, | ||
{ | ||
stream: St, | ||
f: F, | ||
buffered_value: Option<BufferedValue<St::Item>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you go for allowing a user-specified limit, then you can let this be an |
||
waker: Option<Waker>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be nicer as just a Waker perhaps, using Waker::noop to make the default. But that's a rust 1.85 feature, which is not yet released (and way past the current MSRV). I considered AtomicWaker here too, but I didn't want to add futures-util as a dependency of tokio-stream. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you use it? I see only an assignment self.waker=Some(...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think using |
||
} | ||
|
||
enum BufferedValue<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I originally modeled this as two opts ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you make both checks into a boolean, then you can do |
||
Match(T), | ||
NonMatch(T), | ||
} | ||
|
||
impl<St, F> fmt::Debug for Inner<St, F> | ||
where | ||
St: fmt::Debug + Stream, | ||
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.debug_struct("Inner") | ||
.field("stream", &self.stream) | ||
.field("waker", &self.waker) | ||
.finish_non_exhaustive() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasn't sure about what to do about the buffered value. I don't think I can write two implementations of Debug, one where the Item doesn't implement Debug, and one where it does. I'm not sure what scenarios you'd implement Debug on a stream impl of items that aren't Debug, but I'm not sure there's a way to express that in the type system neatly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can just require |
||
} | ||
} | ||
|
||
impl<St, F> Inner<St, F> | ||
where | ||
St: Stream + Unpin, | ||
F: FnMut(&St::Item) -> bool, | ||
{ | ||
pub(super) fn new(stream: St, f: F) -> Self { | ||
Self { | ||
stream, | ||
f, | ||
buffered_value: None, | ||
waker: None, | ||
} | ||
} | ||
|
||
fn poll_next(&mut self, cx: &mut Context<'_>, matches: bool) -> Poll<Option<St::Item>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Usually I dislike boolean params, but I think this one makes sense as an exception. Not sure there's a good concise replacement idea for this. |
||
// Check if there is a buffered value | ||
match self.buffered_value.take() { | ||
Some(BufferedValue::Match(value)) if matches => return Poll::Ready(Some(value)), | ||
Some(BufferedValue::NonMatch(value)) if !matches => return Poll::Ready(Some(value)), | ||
Some(value) => { | ||
self.buffered_value = Some(value); | ||
self.waker = Some(cx.waker().clone()); | ||
return Poll::Pending; | ||
} | ||
None => {} | ||
} | ||
|
||
// Poll the underlying stream | ||
match Pin::new(&mut self.stream).poll_next(cx) { | ||
Poll::Ready(Some(value)) => match (self.f)(&value) { | ||
result if result == matches => Poll::Ready(Some(value)), | ||
true => { | ||
self.buffered_value = Some(BufferedValue::Match(value)); | ||
self.waker = Some(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
false => { | ||
self.buffered_value = Some(BufferedValue::NonMatch(value)); | ||
self.waker = Some(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
}, | ||
Poll::Ready(None) => Poll::Ready(None), // Stream is exhausted | ||
Poll::Pending => { | ||
self.waker = Some(cx.waker().clone()); | ||
cx.waker().wake_by_ref(); | ||
Poll::Pending | ||
} | ||
} | ||
} | ||
} |
joshka marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
use tokio_stream::{self as stream, StreamExt}; | ||
use tokio_test::{assert_pending, assert_ready_eq, task}; | ||
|
||
mod support { | ||
pub(crate) mod mpsc; | ||
} | ||
|
||
#[tokio::test] | ||
async fn partition() { | ||
let stream = stream::iter(0..6); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test only tests a situation where the items are ready. A test for when the values are not ready should be added. |
||
let (matches, non_matches) = stream.partition(|v| v % 2 == 0); | ||
let mut matches = task::spawn(matches); | ||
let mut non_matches = task::spawn(non_matches); | ||
|
||
// polling matches when the next item matches returns the item from the stream. | ||
assert_ready_eq!(matches.poll_next(), Some(0)); | ||
|
||
// polling non_matches when the next item doesn't match returns the item from the stream. | ||
assert_ready_eq!(non_matches.poll_next(), Some(1)); | ||
|
||
// polling non_matches when the next item matches buffers the item. | ||
assert_pending!(non_matches.poll_next()); | ||
|
||
// polling matches when there is a bufferred match returns the buffered item. | ||
assert_ready_eq!(matches.poll_next(), Some(2)); | ||
|
||
// polling matches when the next item doesn't match buffers the item. | ||
assert_pending!(matches.poll_next()); | ||
|
||
// polling non_matches when there is a bufferred non-match returns the buffered item. | ||
assert_ready_eq!(non_matches.poll_next(), Some(3)); | ||
|
||
// polling non_matches twice when the next item matches buffers the item only once. | ||
assert_pending!(non_matches.poll_next()); | ||
assert_pending!(non_matches.poll_next()); | ||
assert_ready_eq!(matches.poll_next(), Some(4)); | ||
|
||
// polling matches twice when the next item doesn't match buffers the item only once. | ||
assert_pending!(matches.poll_next()); | ||
assert_pending!(matches.poll_next()); | ||
assert_ready_eq!(non_matches.poll_next(), Some(5)); | ||
|
||
// polling matches and non_matches when the stream is exhausted returns None. | ||
assert_ready_eq!(matches.poll_next(), None); | ||
assert_ready_eq!(non_matches.poll_next(), None); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should return a future instead of a bool. The futures StreamExt has methods which tend to do so, but the tokio StreamExt does not. I kept this consistent with tokio here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine. People can embed async computation in the stream itself if they really need it.