feat: Implement stream iterator#18
Open
klaatu01 wants to merge 5 commits intowillothy:mainfrom
Open
Conversation
klaatu01
commented
Oct 27, 2025
Comment on lines
+509
to
+558
| #[cfg(feature = "stream")] | ||
| /// Flattened stream of **owned** `(K, V)` items. | ||
| /// | ||
| /// Locks one shard at a time, snapshots (clones) its entries into a `Vec`, drops the lock, | ||
| /// then yields items. This allows concurrent writes to other shards. | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
| /// use tokio::runtime::Runtime; | ||
| /// use futures::{pin_mut, StreamExt}; | ||
| /// use std::sync::Arc; | ||
| /// use whirlwind::ShardMap; | ||
| /// | ||
| /// let rt = Runtime::new().unwrap(); | ||
| /// let map = Arc::new(ShardMap::new()); | ||
| /// rt.block_on(async { | ||
| /// map.insert(1, "a".to_string()).await; | ||
| /// map.insert(2, "b".to_string()).await; | ||
| /// | ||
| /// let s = map.stream_owned(); | ||
| /// pin_mut!(s); | ||
| /// | ||
| /// let mut items = Vec::new(); | ||
| /// while let Some((k, v)) = s.next().await { | ||
| /// items.push((k, v)); | ||
| /// } | ||
| /// items.sort_by_key(|(k, _)| *k); | ||
| /// assert_eq!(items, vec![(1, "a".into()), (2, "b".into())]); | ||
| /// }); | ||
| /// ``` | ||
| pub fn stream_owned(&self) -> impl Stream<Item = (K, V)> + '_ | ||
| where | ||
| K: Clone, | ||
| V: Clone, | ||
| { | ||
| let shard_stream = self.stream_shards(); | ||
|
|
||
| stream! { | ||
| pin_mut!(shard_stream); | ||
|
|
||
| while let Some(shard) = shard_stream.next().await { | ||
| let items: Vec<(K, V)> = | ||
| shard.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); | ||
| drop(shard); | ||
| for item in items { | ||
| yield item; | ||
| } | ||
| } | ||
| } | ||
| } |
Author
There was a problem hiding this comment.
I might actually remove stream_owned. It’s not particularly ergonomic or idiomatic in practice, mainly because it requires pin_mut! just to iterate. I think it would take some rejigging of the internals to get this working without pin_mut!, and even then would probably need to return a BoxedStream.
Now that we have entries (and keys / values) implemented without locking all shards, stream_owned feels a bit redundant and outside the core responsibility of Whirlwind.
If someone really needs to iterate efficiently, they can always use stream_shards.
In that case I think we could also remove the stream feature.
@willothy what do you think?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
As mentioned in #17, your preferred strategy is to use a Stream-based approach so that we don’t need to hold read locks on all shards simultaneously. This PR is a best-effort implementation of that approach.
stream_ownedstreams cloned(K, V)items by acquiring a read lock on one shard at a time, cloning its contents, and then releasing the lock before yielding. This allows concurrent writes to other shards to continue while iteration proceeds.stream_shardsis a best-effort attempt to provide a borrowed stream of entries(&K, &V)without cloning. Achieving fully borrowed iteration across asynchronous boundaries would require deeper changes to whirlwind internals (for example, storing shard data in Arc or using a custom pinned stream type), which could impact performance and complexity. For now, this implementation yieldsShardReadguards one shard at a time, so by-reference iteration is possible per-shard, but not across shards concurrently.Implements
keys,valuesandentriesforKandVthat implementClone.In summary:
I have put all of this behind a
streamfeature as it doesn't seem to be an extremely in-demand feature.