diff --git a/Cargo.lock b/Cargo.lock index a113867..fa4f6c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,6 +23,28 @@ version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.4.0" @@ -80,6 +102,95 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "gimli" version = "0.31.1" @@ -184,6 +295,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.89" @@ -232,6 +349,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "smallvec" version = "1.13.2" @@ -304,7 +427,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" name = "whirlwind" version = "0.1.1" dependencies = [ + "async-stream", "crossbeam-utils", + "futures", "hashbrown", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 87092ea..e92f192 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,15 @@ categories = ["asynchronous", "concurrency", "data-structures"] [dependencies] +async-stream = { version = "0.3.6", optional = true } crossbeam-utils = "0.8.20" +futures = { version = "0.3.31", optional = true } hashbrown = { version = "0.15.1" } tokio = { version = "1.41.0", features = ["sync"] } [dev-dependencies] tokio = { version = "1.41.0", features = ["full"] } + +[features] +default = [] +stream = ["async-stream", "futures"] diff --git a/src/shard_map.rs b/src/shard_map.rs index c2858f4..22865e7 100644 --- a/src/shard_map.rs +++ b/src/shard_map.rs @@ -26,6 +26,15 @@ use std::{ use crossbeam_utils::CachePadded; use hashbrown::hash_table::Entry; +#[cfg(feature = "stream")] +use async_stream::stream; +#[cfg(feature = "stream")] +use futures::{ + pin_mut, + stream::{self, Stream}, + StreamExt, +}; + use crate::{ mapref::{MapRef, MapRefMut}, shard::Shard, @@ -167,8 +176,7 @@ where } let shard_capacity = cap / shards; - let shards = std::iter::repeat(()) - .take(shards) + let shards = std::iter::repeat_n((), shards) .map(|_| CachePadded::new(Shard::with_capacity(shard_capacity))) .collect(); @@ -448,4 +456,237 @@ where shard.write().await.clear(); } } + + #[cfg(feature = "stream")] + /// Stream over all shards in the map. + /// + /// Each item is a `ShardRead` that *holds a read-lock* on that shard while you iterate it + /// synchronously. Writes to **other shards** continue to proceed concurrently. + /// + /// # Example + /// ``` + /// use tokio::runtime::Runtime; + /// use futures::{pin_mut, StreamExt}; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// rt.block_on(async { + /// map.insert(1, "a").await; + /// map.insert(2, "b").await; + /// + /// let shards = map.stream_shards(); + /// pin_mut!(shards); // Stream is not Unpin + /// + /// let mut seen = 0; + /// while let Some(sh) = shards.next().await { + /// for (_k, _v) in sh.iter() { + /// seen += 1; + /// } + /// } + /// assert_eq!(seen, 2); + /// }); + /// ``` + pub fn stream_shards(&self) -> impl Stream> + '_ { + let total = self.inner.len(); + + stream::unfold(0usize, move |mut idx| async move { + if idx >= total { + return None; + } + + // SAFETY: idx is checked against total above. + let shard = unsafe { self.inner.get_unchecked(idx) }; + + let guard = shard.read().await; + + idx += 1; + Some((ShardRead { guard }, idx)) + }) + } + + #[cfg(feature = "stream")] + /// Flattened stream of **owned** `(K, V)` items. + /// + /// Locks one shard at a time, snapshots (clones) its entries into a `Vec`, drops the lock, + /// then yields items. This allows concurrent writes to other shards. + /// + /// # Example + /// ``` + /// use tokio::runtime::Runtime; + /// use futures::{pin_mut, StreamExt}; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// rt.block_on(async { + /// map.insert(1, "a".to_string()).await; + /// map.insert(2, "b".to_string()).await; + /// + /// let s = map.stream_owned(); + /// pin_mut!(s); + /// + /// let mut items = Vec::new(); + /// while let Some((k, v)) = s.next().await { + /// items.push((k, v)); + /// } + /// items.sort_by_key(|(k, _)| *k); + /// assert_eq!(items, vec![(1, "a".into()), (2, "b".into())]); + /// }); + /// ``` + pub fn stream_owned(&self) -> impl Stream + '_ + where + K: Clone, + V: Clone, + { + let shard_stream = self.stream_shards(); + + stream! { + pin_mut!(shard_stream); + + while let Some(shard) = shard_stream.next().await { + let items: Vec<(K, V)> = + shard.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + drop(shard); + for item in items { + yield item; + } + } + } + } + + #[cfg(feature = "stream")] + /// Collect all entries into a `Vec<(K, V)>` by cloning. + /// + /// Iterates shard-by-shard, cloning items under a read lock, then releasing the lock + /// before pushing into the result. + /// + /// # Example + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// rt.block_on(async { + /// map.insert(1, "a".to_string()).await; + /// map.insert(2, "b".to_string()).await; + /// + /// let mut items = map.entries().await; + /// items.sort_by_key(|(k, _)| *k); + /// assert_eq!(items, vec![(1, "a".into()), (2, "b".into())]); + /// }); + /// ``` + pub async fn entries(&self) -> Vec<(K, V)> + where + K: Clone, + V: Clone, + { + self.stream_owned().collect::>().await + } + + #[cfg(feature = "stream")] + /// Collect all keys into a `Vec` by cloning. + /// + /// # Example + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// rt.block_on(async { + /// map.insert(10, "x").await; + /// map.insert(20, "y").await; + /// + /// let mut ks = map.keys().await; + /// ks.sort(); + /// assert_eq!(ks, vec![10, 20]); + /// }); + /// ``` + pub async fn keys(&self) -> Vec + where + K: Clone, + { + let shard_stream = self.stream_shards(); + pin_mut!(shard_stream); + let mut keys = Vec::new(); + while let Some(shard) = shard_stream.next().await { + let mut shard_keys: Vec = shard.keys().cloned().collect(); + drop(shard); + keys.append(&mut shard_keys); + } + keys + } + + #[cfg(feature = "stream")] + /// Collect all values into a `Vec` by cloning. + /// + /// # Example + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// rt.block_on(async { + /// map.insert(1, "a".to_string()).await; + /// map.insert(2, "b".to_string()).await; + /// + /// let mut vs = map.values().await; + /// vs.sort(); + /// assert_eq!(vs, vec!["a".to_string(), "b".to_string()]); + /// }); + /// ``` + pub async fn values(&self) -> Vec + where + V: Clone, + { + let shard_stream = self.stream_shards(); + pin_mut!(shard_stream); + let mut values = Vec::new(); + while let Some(shard) = shard_stream.next().await { + let mut shard_values: Vec = shard.values().cloned().collect(); + drop(shard); + values.append(&mut shard_values); + } + values + } +} + +#[cfg(feature = "stream")] +pub struct ShardRead<'a, K, V> { + guard: crate::shard::ShardReader<'a, K, V>, +} + +#[cfg(feature = "stream")] +impl<'a, K, V> ShardRead<'a, K, V> +where + K: Eq + std::hash::Hash + 'static, + V: 'static, +{ + pub fn iter(&self) -> impl Iterator { + self.guard.iter().map(|(k, v)| (k, v)) + } + + pub fn keys(&self) -> impl Iterator { + self.guard.iter().map(|(k, _)| k) + } + + pub fn values(&self) -> impl Iterator { + self.guard.iter().map(|(_, v)| v) + } + + pub fn len(&self) -> usize { + self.guard.len() + } + + pub fn is_empty(&self) -> bool { + self.guard.len() == 0 + } } diff --git a/src/shard_set.rs b/src/shard_set.rs index f719d89..0a50223 100644 --- a/src/shard_set.rs +++ b/src/shard_set.rs @@ -120,4 +120,13 @@ where pub async fn clear(&self) { self.inner.clear().await; } + + /// Returns a vector of all values in the set. + #[cfg(feature = "stream")] + pub async fn values(&self) -> Vec + where + T: Clone, + { + self.inner.keys().await + } }