Skip to content

Commit

Permalink
Lift work limiter start/end out of receive logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nemethf authored and Ralith committed Apr 20, 2024
1 parent 720383a commit 27d8b7d
Showing 1 changed file with 13 additions and 11 deletions.
24 changes: 13 additions & 11 deletions quinn/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,14 @@ impl Future for EndpointDriver {

#[allow(unused_mut)] // MSRV
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut endpoint_guard = self.0.state.lock().unwrap();
let endpoint = &mut *endpoint_guard;
let mut endpoint = self.0.state.lock().unwrap();
if endpoint.driver.is_none() {
endpoint.driver = Some(cx.waker().clone());
}

let now = Instant::now();
let mut keep_going = false;
keep_going |=
endpoint
.recv_state
.drive_recv(cx, &mut endpoint.inner, &*endpoint.socket, now)?;
keep_going |= endpoint.drive_recv(cx, now)?;
keep_going |= endpoint.handle_events(cx, &self.0.shared);
keep_going |= endpoint.drive_send(cx)?;

Expand All @@ -334,7 +330,7 @@ impl Future for EndpointDriver {
if endpoint.ref_count == 0 && endpoint.recv_state.connections.is_empty() {
Poll::Ready(Ok(()))
} else {
drop(endpoint_guard);
drop(endpoint);
// If there is more work to do schedule the endpoint task again.
// `wake_by_ref()` is called outside the lock to minimize
// lock contention on a multithreaded runtime.
Expand Down Expand Up @@ -441,6 +437,15 @@ pub(crate) struct Shared {
}

impl State {
fn drive_recv(&mut self, cx: &mut Context, now: Instant) -> Result<bool, io::Error> {
self.recv_state.recv_limiter.start_cycle();
let poll_res = self
.recv_state
.poll_socket(cx, &mut self.inner, &*self.socket, now)?;
self.recv_state.recv_limiter.finish_cycle();
Ok(poll_res)
}

fn drive_send(&mut self, cx: &mut Context) -> Result<bool, io::Error> {
self.send_limiter.start_cycle();

Expand Down Expand Up @@ -761,14 +766,13 @@ impl RecvState {
}
}

fn drive_recv(
fn poll_socket(
&mut self,
cx: &mut Context,
endpoint: &mut proto::Endpoint,
socket: &dyn AsyncUdpSocket,
now: Instant,
) -> Result<bool, io::Error> {
self.recv_limiter.start_cycle();
let mut metas = [RecvMeta::default(); BATCH_SIZE];
let mut iovs: [IoSliceMut; BATCH_SIZE] = {
let mut bufs = self
Expand Down Expand Up @@ -837,12 +841,10 @@ impl RecvState {
}
}
if !self.recv_limiter.allow_work() {
self.recv_limiter.finish_cycle();
return Ok(true);
}
}

self.recv_limiter.finish_cycle();
Ok(false)
}
}

0 comments on commit 27d8b7d

Please sign in to comment.