-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
stream: add StreamExt::partition() method #7065
base: master
Are you sure you want to change the base?
Conversation
d0f1609
to
8961a53
Compare
This allows filtering items that match or don't match into separate stream. It is analogous to std::iter::Iterator::partition() ```rust let stream = stream::iter(0..4); let (mut even, mut odd) = stream.partition(|v| v % 2 == 0); assert_eq!(Some(0), even.next().await); assert_eq!(Some(1), odd.next().await); ```
8961a53
to
006b3d7
Compare
Creating two different streams opens a lot of questions regarding buffering. Should we really allow you to consume unlimited amounts of memory by only reading from one half? The standard library I'm not a super big fan of this. |
Isn't that also unlimited memory though? E.g. Given this seems like a caller footgun which is shared with any unbounded approach to this problem, do you think there are situations where it would exhibit problematic behavior when used correctly (i.e. balanced reads). Is there some guidance on Tokio's approach to avoiding footguns like this written anywhere, or is it mostly one of those generally accepted convention things? What about naming this Do you have any feedback on the code approach used here, particularly around where the Unpin constraints are added (I'm not sure if this should be added on all generic bounds), and testing each stream for readiness? Should the names of the streams be Additionally, should the predicate code be async too? tokio::StreamExt::Filter takes a sync predicate, but futures::StreamExt::filter is async. Related to this I read a couple of users.rust-online.org / discord thread responses from you / @djc about using a task and a channels to represent this idea. This seemed to me like an incorrect layering to put into the tokio-stream lib. I wasn't sure though if perhaps this method should return a Future that resolves to a tuple of streams. Edit: let (even, odd = stream.iter(0..10).partition(|v| v %2 == 0);
let even = even.buffered(32);
let odd = odd.buffered(32); |
A larger buffer of values for each partition can be added on top of a single buffered value by wrapping each stream with code that prefetches the values. Thus this approach has a better composability than using an internal buffer. The tradeoff is that this may deadlock and calling code that needs to concurrently process items in both streams must be programmed in a way that ensures that waiting to consume from one stream never blocks the other stream.
I updated this in ffcf729 to instead just buffer a single item.
E.g. two naive tasks might do something like async fn process_matches(matches: impl Stream<T>) {
while let Some(item) = matches.next() {
...
}
}
async fn process_non_matches(non_matches: impl Stream<T>) {
while let Some(item) = non_matches.next() {
...
}
} But this would block the stream from producing the next item for the duration of each while block whenever there are multiple matching / non-matching items in a row. |
} | ||
} | ||
|
||
fn poll_next(&mut self, cx: &mut Context<'_>, matches: bool) -> Poll<Option<St::Item>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually I dislike boolean params, but I think this one makes sense as an exception. Not sure there's a good concise replacement idea for this.
stream: St, | ||
f: F, | ||
buffered_value: Option<BufferedValue<St::Item>>, | ||
waker: Option<Waker>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be nicer as just a Waker perhaps, using Waker::noop to make the default. But that's a rust 1.85 feature, which is not yet released (and way past the current MSRV). I considered AtomicWaker here too, but I didn't want to add futures-util as a dependency of tokio-stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you use it? I see only an assignment self.waker=Some(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using Option
is perfectly fine.
waker: Option<Waker>, | ||
} | ||
|
||
enum BufferedValue<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally modeled this as two opts (match_value
, non_match_value
), but it leads to simpler code using an enum here I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you make both checks into a boolean, then you can do buffered_type == this_stream
to check if the buffered value should be used or not.
f.debug_struct("Inner") | ||
.field("stream", &self.stream) | ||
.field("waker", &self.waker) | ||
.finish_non_exhaustive() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure about what to do about the buffered value. I don't think I can write two implementations of Debug, one where the Item doesn't implement Debug, and one where it does. I'm not sure what scenarios you'd implement Debug on a stream impl of items that aren't Debug, but I'm not sure there's a way to express that in the type system neatly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just require St::Item: Debug
.
/// # } | ||
fn partition<F>(self, f: F) -> (Partition<Self, F>, Partition<Self, F>) | ||
where | ||
F: FnMut(&Self::Item) -> bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should return a future instead of a bool. The futures StreamExt has methods which tend to do so, but the tokio StreamExt does not. I kept this consistent with tokio here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine. People can embed async computation in the stream itself if they really need it.
|
||
#[tokio::test] | ||
async fn partition() { | ||
let stream = stream::iter(0..6); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test only tests a situation where the items are ready. A test for when the values are not ready should be added.
The difference is that the standard library method is like
I think it could make sense to have |
/// A stream that yields items for which the predicate returns `true`. | ||
Matches(Arc<Mutex<Inner<St, F>>>), | ||
/// A stream that yields items for which the predicate returns `false`. | ||
NonMatches(Arc<Mutex<Inner<St, F>>>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a const generic boolean.
{ | ||
stream: St, | ||
f: F, | ||
buffered_value: Option<BufferedValue<St::Item>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you go for allowing a user-specified limit, then you can let this be an Vec<St::Item>
and have a boolean next to it that specifies which of the two streams the buffered values are for.
This allows filtering items that match or don't match into separate
stream. It is analogous to std::iter::Iterator::partition()
Motivation
I was working with some ideas around multiplexing using streams and wondered how feasible this would be using combinators rather than imperative logic.
Solution
Adds a new StreamExt method partition, which accepts a predicate and returns two streams, one for items that match the predicate, and one for items which do not. These are named explicitly (TruePartition, FalsePartition) to make it easy to understand which stream is which.