Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(listener): poll a port multiple times until either "max_poll" or returned "None" #78

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@

---

## unreleased

Unreleased

- Add `Port::set_max_poll` to set the amount a `Port` is polled in a single `Port::should_poll` (Default: `1` time)
- Add `EventListenerCfg::port_1` to add a manually constructed `Port`

## 1.9.2

Released on 04/03/2023
Expand Down
22 changes: 20 additions & 2 deletions src/listener/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,15 @@ where
}

/// Add a new Port (Poll, Interval) to the the event listener
pub fn port(mut self, poll: Box<dyn Poll<U>>, interval: Duration) -> Self {
self.ports.push(Port::new(poll, interval));
pub fn port(self, poll: Box<dyn Poll<U>>, interval: Duration) -> Self {
self.port_1(Port::new(poll, interval))
}

/// Add a new Port to the the event listener
///
/// The [`Port`] needs to be manually constructed, unlike [`Self::port`]
pub fn port_1(mut self, port: Port<U>) -> Self {
self.ports.push(port);
self
}

Expand Down Expand Up @@ -104,4 +111,15 @@ mod test {
.poll_timeout(Duration::from_secs(0))
.start();
}

#[test]
fn should_add_port_via_port_1() {
let builder = EventListenerCfg::<MockEvent>::default();
assert!(builder.ports.is_empty());
let builder = builder.port_1(Port::new(
Box::new(MockPoll::default()),
Duration::from_millis(1),
));
assert_eq!(builder.ports.len(), 1);
}
}
15 changes: 15 additions & 0 deletions src/listener/port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::time::{Duration, Instant};

use super::{Event, ListenerResult, Poll};

/// The Default number of how often a [`Port`] gets polled in a single [`Port::should_poll`]
pub const DEFAULT_MAX_POLL: usize = 1;

/// A port is a wrapper around the poll trait object, which also defines an interval, which defines
/// the amount of time between each poll() call.
/// Its purpose is to listen for incoming events of a user-defined type
Expand All @@ -17,6 +20,7 @@ where
poll: Box<dyn Poll<U>>,
interval: Duration,
next_poll: Instant,
max_poll: usize,
}

impl<U> Port<U>
Expand All @@ -29,9 +33,20 @@ where
poll,
interval,
next_poll: Instant::now(),
max_poll: DEFAULT_MAX_POLL,
}
}

/// Set how often a port should get polled in a single poll
pub fn set_max_poll(&mut self, max_poll: usize) {
self.max_poll = max_poll;
}

/// Get how often a port should get polled in a single poll
pub fn max_poll(&self) -> usize {
self.max_poll
}

/// Returns the interval for the current `Port`
pub fn interval(&self) -> &Duration {
&self.interval
Expand Down
75 changes: 48 additions & 27 deletions src/listener/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,35 +118,32 @@ where
/// Returns only the messages, while the None returned by poll are discarded
#[allow(clippy::needless_collect)]
fn poll(&mut self) -> Result<(), mpsc::SendError<ListenerMsg<U>>> {
let msg: Vec<ListenerMsg<U>> = self
.ports
.iter_mut()
.filter_map(|x| {
if x.should_poll() {
let msg = match x.poll() {
Ok(Some(ev)) => Some(ListenerMsg::User(ev)),
Ok(None) => None,
Err(err) => Some(ListenerMsg::Error(err)),
};
// Update next poll
x.calc_next_poll();
msg
} else {
None
let port_iter = self.ports.iter_mut().filter(|port| port.should_poll());

for port in port_iter {
let mut times_remaining = port.max_poll();
// poll a port until it has nothing anymore
loop {
let msg = match port.poll() {
Ok(Some(ev)) => ListenerMsg::User(ev),
Ok(None) => break,
Err(err) => ListenerMsg::Error(err),
};

self.sender.send(msg)?;

// do this at the end to at least call it once
times_remaining = times_remaining.saturating_sub(1);

if times_remaining == 0 {
break;
}
})
.collect();
// Send messages
match msg
.into_iter()
.map(|x| self.sender.send(x))
.filter(|x| x.is_err())
.map(|x| x.err().unwrap())
.next()
{
None => Ok(()),
Some(e) => Err(e),
}
// Update next poll
port.calc_next_poll();
}

Ok(())
}

/// thread run method
Expand Down Expand Up @@ -186,6 +183,30 @@ mod test {
use crate::mock::{MockEvent, MockPoll};
use crate::Event;

#[test]
fn worker_should_poll_multiple_times() {
let (tx, rx) = mpsc::channel();
let paused = Arc::new(RwLock::new(false));
let paused_t = Arc::clone(&paused);
let running = Arc::new(RwLock::new(true));
let running_t = Arc::clone(&running);

let mut mock_port = Port::new(Box::new(MockPoll::default()), Duration::from_secs(5));
mock_port.set_max_poll(10);

let mut worker =
EventListenerWorker::<MockEvent>::new(vec![mock_port], tx, paused_t, running_t, None);
assert!(worker.poll().is_ok());
assert!(worker.next_event() <= Duration::from_secs(5));
let mut recieved = Vec::new();

while let Ok(msg) = rx.try_recv() {
recieved.push(msg);
}

assert_eq!(recieved.len(), 10);
}

#[test]
fn worker_should_send_poll() {
let (tx, rx) = mpsc::channel();
Expand Down
Loading