From 07a7ba2b7b766c6a4eb5341330c99c898fa53d6b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 2 Sep 2024 20:17:11 +0300 Subject: [PATCH 1/3] substream: Save waker for the substream-set Signed-off-by: Alexandru Vasile --- src/substream/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/substream/mod.rs b/src/substream/mod.rs index 37ba98c0..878e7433 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -47,7 +47,7 @@ use std::{ hash::Hash, io::ErrorKind, pin::Pin, - task::{Context, Poll}, + task::{Context, Poll, Waker}, }; /// Logging target for the file. @@ -766,6 +766,7 @@ where S: Stream> + Unpin, { substreams: HashMap, + waker: Option, } impl SubstreamSet @@ -777,6 +778,7 @@ where pub fn new() -> Self { Self { substreams: HashMap::new(), + waker: None, } } @@ -791,10 +793,13 @@ where debug_assert!(false); } } + + self.waker.take().map(|waker| waker.wake()); } /// Remove substream from the set. pub fn remove(&mut self, key: &K) -> Option { + self.waker.take().map(|waker| waker.wake()); self.substreams.remove(key) } @@ -838,6 +843,7 @@ where } } + inner.waker = Some(cx.waker().clone()); Poll::Pending } } From 7e4f9f5acd58b8a769ead6c05316fd45ad8868ca Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Sep 2024 10:07:39 +0300 Subject: [PATCH 2/3] substream-set: Poll substreams fairly with poll indexes Signed-off-by: Alexandru Vasile --- src/substream/mod.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/substream/mod.rs b/src/substream/mod.rs index 878e7433..22709f19 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -766,6 +766,8 @@ where S: Stream> + Unpin, { substreams: HashMap, + keys: Vec, + poll_index: usize, waker: Option, } @@ -778,6 +780,8 @@ where pub fn new() -> Self { Self { substreams: HashMap::new(), + keys: Vec::new(), + poll_index: 0, waker: None, } } @@ -787,20 +791,25 @@ where match self.substreams.entry(key) { Entry::Vacant(entry) => { entry.insert(substream); + self.keys.push(key); + self.waker.take().map(|waker| waker.wake()); } Entry::Occupied(_) => { tracing::error!(?key, "substream already exists"); debug_assert!(false); } } - - self.waker.take().map(|waker| waker.wake()); } /// Remove substream from the set. pub fn remove(&mut self, key: &K) -> Option { + let Some(substream) = self.substreams.remove(key) else { + return None; + }; + self.waker.take().map(|waker| waker.wake()); - self.substreams.remove(key) + self.keys.retain(|k| k != key); + Some(substream) } /// Get mutable reference to stored substream. @@ -830,8 +839,15 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let inner = Pin::into_inner(self); - // TODO: poll the streams more randomly - for (key, mut substream) in inner.substreams.iter_mut() { + let len = inner.keys.len(); + for _ in 0..len { + let key = &inner.keys[inner.poll_index]; + inner.poll_index = (inner.poll_index + 1) % len; + + let Some(mut substream) = inner.substreams.get_mut(key) else { + continue; + }; + match Pin::new(&mut substream).poll_next(cx) { Poll::Pending => continue, Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))), From 450e135089c50b0ac71b81a2f99bb610fb90aa79 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 3 Sep 2024 13:01:14 +0300 Subject: [PATCH 3/3] substream-set: Change poll index via modulo len Signed-off-by: Alexandru Vasile --- src/substream/mod.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/substream/mod.rs b/src/substream/mod.rs index 22709f19..f314cec4 100644 --- a/src/substream/mod.rs +++ b/src/substream/mod.rs @@ -841,21 +841,26 @@ where let len = inner.keys.len(); for _ in 0..len { - let key = &inner.keys[inner.poll_index]; inner.poll_index = (inner.poll_index + 1) % len; + let key = &inner.keys[inner.poll_index]; let Some(mut substream) = inner.substreams.get_mut(key) else { continue; }; match Pin::new(&mut substream).poll_next(cx) { - Poll::Pending => continue, - Poll::Ready(Some(data)) => return Poll::Ready(Some((*key, data))), - Poll::Ready(None) => + Poll::Pending => { + continue; + } + Poll::Ready(Some(data)) => { + return Poll::Ready(Some((*key, data))); + } + Poll::Ready(None) => { return Poll::Ready(Some(( *key, Err(Error::SubstreamError(SubstreamError::ConnectionClosed)), - ))), + ))); + } } }