diff --git a/Cargo.lock b/Cargo.lock index a113867..b96781b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,6 +80,71 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "gimli" version = "0.31.1" @@ -184,6 +249,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.89" @@ -232,6 +303,15 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.13.2" @@ -305,6 +385,7 @@ name = "whirlwind" version = "0.1.1" dependencies = [ "crossbeam-utils", + "futures", "hashbrown", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 87092ea..410247c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ categories = ["asynchronous", "concurrency", "data-structures"] crossbeam-utils = "0.8.20" hashbrown = { version = "0.15.1" } tokio = { version = "1.41.0", features = ["sync"] } +futures = { version = "0.3.31", default-features = false, features = ["std"] } [dev-dependencies] tokio = { version = "1.41.0", features = ["full"] } diff --git a/src/shard_map.rs b/src/shard_map.rs index c2858f4..ef73f2a 100644 --- a/src/shard_map.rs +++ b/src/shard_map.rs @@ -18,14 +18,15 @@ //! assert_eq!(map.remove(&"foo").await, Some("bar")); //! }); //! ``` +use crossbeam_utils::CachePadded; +use futures::future::join_all; +use hashbrown::hash_table::{Entry, Iter, IterMut}; use std::{ hash::{BuildHasher, RandomState}, sync::{Arc, OnceLock}, }; -use crossbeam_utils::CachePadded; -use hashbrown::hash_table::Entry; - +use crate::shard::{ShardReader, ShardWriter}; use crate::{ mapref::{MapRef, MapRefMut}, shard::Shard, @@ -448,4 +449,275 @@ where shard.write().await.clear(); } } + + /// Returns an iterator over the key-value pairs in the map. + /// + /// **Warning**: This method acquires read locks on *all* shards of the map, which may block other operations + /// (such as `insert`, `remove`, or `get_mut`) until the iterator is dropped. Use with caution in + /// concurrent environments to avoid performance bottlenecks. + /// + /// The iterator yields references to the key-value pairs in the map, allowing read-only access to the + /// map's contents. The order of iteration is not guaranteed, as it depends on the internal sharding + /// structure. + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// + /// rt.block_on(async { + /// map.insert("foo", "bar").await; + /// map.insert("baz", "qux").await; + /// + /// let mut pairs = Vec::new(); + /// for (key, value) in map.iter().await { + /// pairs.push((key.clone(), value.clone())); + /// } + /// + /// assert_eq!(pairs.len(), 2); + /// assert!(pairs.contains(&(&"foo", &"bar"))); + /// assert!(pairs.contains(&(&"baz", &"qux"))); + /// }); + /// ``` + pub async fn iter(&self) -> ShardIter { + let guard_futures = self.inner.iter().map(|shard| shard.read()); + let guards = join_all(guard_futures).await; + ShardIter::new(guards) + } + + /// Returns an iterator over the keys in the map. + /// + /// **Warning**: This method acquires read locks on *all* shards of the map, which may block other operations + /// (such as `insert`, `remove`, or `get_mut`) until the iterator is dropped. Use with caution in + /// concurrent environments to avoid performance bottlenecks. + /// + /// The iterator yields references to the keys in the map, allowing read-only access. The order of + /// iteration is not guaranteed, as it depends on the internal sharding structure. + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// + /// rt.block_on(async { + /// map.insert("foo", "bar").await; + /// map.insert("baz", "qux").await; + /// + /// let mut keys = Vec::new(); + /// for key in map.keys().await { + /// keys.push(key.clone()); + /// } + /// + /// assert_eq!(keys.len(), 2); + /// assert!(keys.contains(&"foo")); + /// assert!(keys.contains(&"baz")); + /// }); + /// ``` + pub async fn keys<'a>(&'a self) -> impl Iterator { + self.iter().await.map(|(k, _)| k) + } + + /// Returns an iterator over the values in the map. + /// + /// **Warning**: This method acquires read locks on *all* shards of the map, which may block other operations + /// (such as `insert`, `remove`, or `get_mut`) until the iterator is dropped. Use with caution in + /// concurrent environments to avoid performance bottlenecks. + /// + /// The iterator yields references to the values in the map, allowing read-only access. The order of + /// iteration is not guaranteed, as it depends on the internal sharding structure. + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// + /// rt.block_on(async { + /// map.insert("foo", "bar").await; + /// map.insert("baz", "qux").await; + /// + /// let mut values = Vec::new(); + /// for value in map.values().await { + /// values.push(value.clone()); + /// } + /// + /// assert_eq!(values.len(), 2); + /// assert!(values.contains(&"bar")); + /// assert!(values.contains(&"qux")); + /// }); + /// ``` + pub async fn values<'a>(&'a self) -> impl Iterator { + self.iter().await.map(|(_, v)| v) + } + + /// Returns a mutable iterator over the key-value pairs in the map. + /// + /// **Warning**: This method acquires write locks on *all* shards of the map, which will block *all* + /// other operations (including `get`, `insert`, `remove`, etc.) until the iterator is dropped. Use with + /// extreme caution in concurrent environments, as this can significantly impact performance. + /// + /// The iterator yields mutable references to the key-value pairs in the map, allowing modification of + /// the values (but not the keys, as they are immutable). The order of iteration is not guaranteed, as + /// it depends on the internal sharding structure. + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// + /// rt.block_on(async { + /// map.insert("foo", "bar").await; + /// map.insert("baz", "qux").await; + /// + /// for (key, value) in map.iter_mut().await { + /// if *key == "foo" { + /// *value = "updated"; + /// } + /// } + /// + /// assert_eq!(map.get(&"foo").await.unwrap().value(), &"updated"); + /// assert_eq!(map.get(&"baz").await.unwrap().value(), &"qux"); + /// }); + /// ``` + pub async fn iter_mut(&self) -> ShardIterMut { + let guard_futures = self.inner.iter().map(|shard| shard.write()); + let guards = join_all(guard_futures).await; + ShardIterMut::new(guards) + } + + /// Returns a mutable iterator over the values in the map. + /// + /// **Warning**: This method acquires write locks on *all* shards of the map, which will block *all* + /// other operations (including `get`, `insert`, `remove`, etc.) until the iterator is dropped. Use with + /// extreme caution in concurrent environments, as this can significantly impact performance. + /// + /// The iterator yields mutable references to the values in the map, allowing modification of the values. + /// The order of iteration is not guaranteed, as it depends on the internal sharding structure. + /// + /// # Example + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use std::sync::Arc; + /// use whirlwind::ShardMap; + /// + /// let rt = Runtime::new().unwrap(); + /// let map = Arc::new(ShardMap::new()); + /// + /// rt.block_on(async { + /// map.insert("foo", "bar").await; + /// map.insert("baz", "qux").await; + /// + /// for value in map.values_mut().await { + /// if *value == "bar" { + /// *value = "updated"; + /// } + /// } + /// + /// assert_eq!(map.get(&"foo").await.unwrap().value(), &"updated"); + /// assert_eq!(map.get(&"baz").await.unwrap().value(), &"qux"); + /// }); + /// ``` + pub async fn values_mut<'a>(&'a self) -> impl Iterator { + self.iter_mut().await.map(|(_, v)| v) + } +} + +pub struct ShardIter<'a, K, V> { + _guards: Vec>, + iters: Vec>, + current_shard: usize, +} + +impl<'a, K, V> ShardIter<'a, K, V> { + fn new(guards: Vec>) -> Self { + // SAFETY: We're extending the lifetime of the HashMap references + // The guards ensure the HashMaps remain valid for the lifetime of the iterator + let iters: Vec<_> = guards + .iter() + .map(|guard| unsafe { + std::mem::transmute::, Iter<'_, (K, V)>>(guard.iter()) + }) + .collect(); + + Self { + _guards: guards, + iters, + current_shard: 0, + } + } +} + +impl<'a, K, V> Iterator for ShardIter<'a, K, V> { + type Item = (&'a K, &'a V); + + fn next(&mut self) -> Option { + while self.current_shard < self.iters.len() { + if let Some(item) = self.iters[self.current_shard].next() { + let (key, value) = item; + return Some((key, value)); + } + self.current_shard += 1; + } + None + } +} + +pub struct ShardIterMut<'a, K, V> { + _guards: Vec>, + iters: Vec>, + current_shard: usize, +} + +impl<'a, K, V> ShardIterMut<'a, K, V> { + fn new(mut guards: Vec>) -> Self { + // SAFETY: We're extending the lifetime of the HashMap references + // The guards ensure the HashMaps remain valid for the lifetime of the iterator + let iters: Vec<_> = guards + .iter_mut() + .map(|guard| unsafe { + std::mem::transmute::, IterMut<'_, (K, V)>>(guard.iter_mut()) + }) + .collect(); + + Self { + _guards: guards, + iters, + current_shard: 0, + } + } +} + +impl<'a, K, V> Iterator for ShardIterMut<'a, K, V> { + type Item = (&'a K, &'a mut V); + + fn next(&mut self) -> Option { + while self.current_shard < self.iters.len() { + if let Some(item) = self.iters[self.current_shard].next() { + let (key, value) = item; + return Some((key, value)); + } + self.current_shard += 1; + } + None + } } diff --git a/src/shard_set.rs b/src/shard_set.rs index f719d89..3d3b2e1 100644 --- a/src/shard_set.rs +++ b/src/shard_set.rs @@ -120,4 +120,9 @@ where pub async fn clear(&self) { self.inner.clear().await; } + + /// Returns an iterator over the values in the set. + pub async fn iter<'a>(&'a self) -> impl Iterator { + self.inner.keys().await + } } diff --git a/tests/basic.rs b/tests/basic.rs index 7fd45e1..80be182 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -65,3 +65,133 @@ async fn test_shardmap_is_empty() { map.remove(&"foo").await; assert_eq!(map.is_empty().await, true); } + +#[tokio::test] +async fn test_shardmap_iter() { + let map = ShardMap::new(); + map.insert("foo", "bar").await; + map.insert("baz", "qux").await; + + let mut pairs = Vec::new(); + for (key, value) in map.iter().await { + pairs.push((*key, *value)); + } + + assert_eq!(pairs.len(), 2); + assert!(pairs.contains(&(&"foo", &"bar"))); + assert!(pairs.contains(&(&"baz", &"qux"))); + // Verify map state is unchanged + assert_eq!(map.len().await, 2); + assert_eq!(map.get(&"foo").await.unwrap().value(), &"bar"); + assert_eq!(map.get(&"baz").await.unwrap().value(), &"qux"); +} + +#[tokio::test] +async fn test_shardmap_iter_empty() { + let map = ShardMap::new(); + let pairs: Vec<(&&str, &&str)> = map.iter().await.collect(); + assert_eq!(pairs.len(), 0); + assert_eq!(map.len().await, 0); +} + +#[tokio::test] +async fn test_shardmap_keys() { + let map = ShardMap::new(); + map.insert("foo", "bar").await; + map.insert("baz", "qux").await; + + let mut keys = Vec::new(); + for key in map.keys().await { + keys.push(*key); + } + + assert_eq!(keys.len(), 2); + assert!(keys.contains(&"foo")); + assert!(keys.contains(&"baz")); + // Verify map state is unchanged + assert_eq!(map.len().await, 2); + assert_eq!(map.get(&"foo").await.unwrap().value(), &"bar"); + assert_eq!(map.get(&"baz").await.unwrap().value(), &"qux"); +} + +#[tokio::test] +async fn test_shardmap_values() { + let map = ShardMap::new(); + map.insert("foo", "bar").await; + map.insert("baz", "qux").await; + + let mut values = Vec::new(); + for value in map.values().await { + values.push(*value); + } + + assert_eq!(values.len(), 2); + assert!(values.contains(&"bar")); + assert!(values.contains(&"qux")); + // Verify map state is unchanged + assert_eq!(map.len().await, 2); + assert_eq!(map.get(&"foo").await.unwrap().value(), &"bar"); + assert_eq!(map.get(&"baz").await.unwrap().value(), &"qux"); +} + +#[tokio::test] +async fn test_shardmap_iter_mut() { + let map = ShardMap::new(); + map.insert("foo", "bar").await; + map.insert("baz", "qux").await; + + for (key, value) in map.iter_mut().await { + if *key == "foo" { + *value = "updated"; + } + } + + assert_eq!(map.len().await, 2); + assert_eq!(map.get(&"foo").await.unwrap().value(), &"updated"); + assert_eq!(map.get(&"baz").await.unwrap().value(), &"qux"); +} + +#[tokio::test] +async fn test_shardmap_values_mut() { + let map = ShardMap::new(); + map.insert("foo", "bar").await; + map.insert("baz", "qux").await; + + for value in map.values_mut().await { + if *value == "bar" { + *value = "updated"; + } + } + + assert_eq!(map.len().await, 2); + assert_eq!(map.get(&"foo").await.unwrap().value(), &"updated"); + assert_eq!(map.get(&"baz").await.unwrap().value(), &"qux"); +} + +#[tokio::test] +async fn test_shardmap_iter_mut_empty() { + let map = ShardMap::new(); + let pairs: Vec<(&&str, &mut &str)> = map.iter_mut().await.collect(); + assert_eq!(pairs.len(), 0); + assert_eq!(map.len().await, 0); +} + +#[tokio::test] +async fn test_shardset_iter() { + let set = ShardSet::new(); + set.insert("foo").await; + set.insert("bar").await; + + let mut items = Vec::new(); + for item in set.iter().await { + items.push(*item); + } + + assert_eq!(items.len(), 2); + assert!(items.contains(&"foo")); + assert!(items.contains(&"bar")); + // Verify set state is unchanged + assert_eq!(set.len().await, 2); + assert!(set.contains(&"foo").await); + assert!(set.contains(&"bar").await); +} \ No newline at end of file