Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::partition() method #7065

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

joshka
Copy link
Contributor

@joshka joshka commented Jan 3, 2025

This allows filtering items that match or don't match into separate
stream. It is analogous to std::iter::Iterator::partition()

let stream = stream::iter(0..4);
let (mut even, mut odd) = stream.partition(|v| v % 2 == 0);
assert_eq!(Some(0), even.next().await);
assert_eq!(Some(1), odd.next().await);

Motivation

I was working with some ideas around multiplexing using streams and wondered how feasible this would be using combinators rather than imperative logic.

Solution

Adds a new StreamExt method partition, which accepts a predicate and returns two streams, one for items that match the predicate, and one for items which do not. These are named explicitly (TruePartition, FalsePartition) to make it easy to understand which stream is which.

@joshka joshka force-pushed the jm/partition-stream branch from d0f1609 to 8961a53 Compare January 3, 2025 10:59
@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Jan 3, 2025
This allows filtering items that match or don't match into separate
stream. It is analogous to std::iter::Iterator::partition()

```rust
let stream = stream::iter(0..4);
let (mut even, mut odd) = stream.partition(|v| v % 2 == 0);
assert_eq!(Some(0), even.next().await);
assert_eq!(Some(1), odd.next().await);
```
@joshka joshka force-pushed the jm/partition-stream branch from 8961a53 to 006b3d7 Compare January 3, 2025 11:03
@github-actions github-actions bot removed the R-loom-sync Run loom sync tests on this PR label Jan 3, 2025
@Darksonn Darksonn added M-stream Module: tokio/stream A-tokio-stream Area: The tokio-stream crate labels Jan 3, 2025
@Darksonn
Copy link
Contributor

Darksonn commented Jan 3, 2025

Creating two different streams opens a lot of questions regarding buffering. Should we really allow you to consume unlimited amounts of memory by only reading from one half? The standard library partition consumes the iterator and creates two collections, which sidesteps this question.

I'm not a super big fan of this.

@joshka
Copy link
Contributor Author

joshka commented Jan 4, 2025

The standard library partition consumes the iterator and creates two collections, which sidesteps this question.

Isn't that also unlimited memory though? E.g. let (odd, even): (Vec<usize>, Vec<usize>) = (0..).partition(|&x| x % 2 == 0); has this same problem. In this approach, we only use unlimited memory when one partition is not read. Fun fact, while writing this comment I left that code running in the background and got a surprise popup from macOS about how I'd managed to use the entire 96GB of RAM in my laptop. It took me a few seconds to realized what I'd done.

Given this seems like a caller footgun which is shared with any unbounded approach to this problem, do you think there are situations where it would exhibit problematic behavior when used correctly (i.e. balanced reads). Is there some guidance on Tokio's approach to avoiding footguns like this written anywhere, or is it mostly one of those generally accepted convention things?

What about naming this unbounded_partition to make it clearer that this can potentially buffer forever if unbalanced reads occur. This could be in keeping with the naming within tokio and make adding a partition that accepts limit(s) an approach. Part of the difference from iter::partition here is that we're not providing some Extend impl to the call, so we can't artificially handle bounds that would block. Perhaps there's a design that would make that possible here?

Do you have any feedback on the code approach used here, particularly around where the Unpin constraints are added (I'm not sure if this should be added on all generic bounds), and testing each stream for readiness? Should the names of the streams be {Matching,NonMatching}Partition? Or perhaps just Partition but make the logic for true / false based on a field.

Additionally, should the predicate code be async too? tokio::StreamExt::Filter takes a sync predicate, but futures::StreamExt::filter is async.

Related to this I read a couple of users.rust-online.org / discord thread responses from you / @djc about using a task and a channels to represent this idea. This seemed to me like an incorrect layering to put into the tokio-stream lib. I wasn't sure though if perhaps this method should return a Future that resolves to a tuple of streams.

Edit:
An alternative approach to this would be to make this strictly ordered (i.e. an item from the matching partition, cannot be read if the next item in the stream is a non-matching one). This could still allow concurrent access to streams by implementing buffering on each stream. E.g.

let (even, odd = stream.iter(0..10).partition(|v| v %2 == 0);
let even = even.buffered(32);
let odd = odd.buffered(32);

A larger buffer of values for each partition can be added on top of a
single buffered value by wrapping each stream with code that prefetches
the values. Thus this approach has a better composability than using an
internal buffer. The tradeoff is that this may deadlock and calling
code that needs to concurrently process items in both streams must be
programmed in a way that ensures that waiting to consume from one stream
never blocks the other stream.
@joshka
Copy link
Contributor Author

joshka commented Jan 4, 2025

I updated this in ffcf729 to instead just buffer a single item.

A larger buffer of values for each partition can be added on top of a
single buffered value by wrapping each stream with code that prefetches
the values. Thus this approach has a better composability than using an
internal buffer. The tradeoff is that this may deadlock and calling
code that needs to concurrently process items in both streams must be
programmed in a way that ensures that waiting to consume from one stream
never blocks the other stream.

E.g. two naive tasks might do something like

async fn process_matches(matches: impl Stream<T>) {
    while let Some(item) = matches.next() {
        ...
    }
}

async fn process_non_matches(non_matches: impl Stream<T>) {
    while let Some(item) = non_matches.next() {
        ...
    }
}

But this would block the stream from producing the next item for the duration of each while block whenever there are multiple matching / non-matching items in a row.

}
}

fn poll_next(&mut self, cx: &mut Context<'_>, matches: bool) -> Poll<Option<St::Item>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually I dislike boolean params, but I think this one makes sense as an exception. Not sure there's a good concise replacement idea for this.

stream: St,
f: F,
buffered_value: Option<BufferedValue<St::Item>>,
waker: Option<Waker>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be nicer as just a Waker perhaps, using Waker::noop to make the default. But that's a rust 1.85 feature, which is not yet released (and way past the current MSRV). I considered AtomicWaker here too, but I didn't want to add futures-util as a dependency of tokio-stream.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you use it? I see only an assignment self.waker=Some(...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using Option is perfectly fine.

waker: Option<Waker>,
}

enum BufferedValue<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally modeled this as two opts (match_value, non_match_value), but it leads to simpler code using an enum here I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make both checks into a boolean, then you can do buffered_type == this_stream to check if the buffered value should be used or not.

f.debug_struct("Inner")
.field("stream", &self.stream)
.field("waker", &self.waker)
.finish_non_exhaustive()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure about what to do about the buffered value. I don't think I can write two implementations of Debug, one where the Item doesn't implement Debug, and one where it does. I'm not sure what scenarios you'd implement Debug on a stream impl of items that aren't Debug, but I'm not sure there's a way to express that in the type system neatly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just require St::Item: Debug.

/// # }
fn partition<F>(self, f: F) -> (Partition<Self, F>, Partition<Self, F>)
where
F: FnMut(&Self::Item) -> bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should return a future instead of a bool. The futures StreamExt has methods which tend to do so, but the tokio StreamExt does not. I kept this consistent with tokio here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine. People can embed async computation in the stream itself if they really need it.


#[tokio::test]
async fn partition() {
let stream = stream::iter(0..6);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only tests a situation where the items are ready. A test for when the values are not ready should be added.

@Darksonn
Copy link
Contributor

Darksonn commented Jan 8, 2025

The standard library partition consumes the iterator and creates two collections, which sidesteps this question.

Isn't that also unlimited memory though?

The difference is that the standard library method is like .collect(), and nobody is going to be surprised if .collect() allocates memory to hold all of the elements. But methods that create an iterator does not come with that expectation, so it does lead to surprises.

What about naming this unbounded_partition to make it clearer that this can potentially buffer forever if unbalanced reads occur. This could be in keeping with the naming within tokio and make adding a partition that accepts limit(s) an approach.

I think it could make sense to have partition accept an integer parameter called limit that specifies how "far behind" a receiver is allowed to get. And having another method where the limit is unbounded could also make sense. This makes the buffering more clear.

Comment on lines +17 to +20
/// A stream that yields items for which the predicate returns `true`.
Matches(Arc<Mutex<Inner<St, F>>>),
/// A stream that yields items for which the predicate returns `false`.
NonMatches(Arc<Mutex<Inner<St, F>>>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a const generic boolean.

{
stream: St,
f: F,
buffered_value: Option<BufferedValue<St::Item>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you go for allowing a user-specified limit, then you can let this be an Vec<St::Item> and have a boolean next to it that specifies which of the two streams the buffered values are for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate M-stream Module: tokio/stream
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants