Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Jun 17, 2024
1 parent 2f06f2f commit 6b73d56
Showing 1 changed file with 12 additions and 21 deletions.
33 changes: 12 additions & 21 deletions async-nats/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,30 +411,27 @@ impl<const CLIENT_NODE: bool> Handler<CLIENT_NODE> {
#[cfg(feature = "zstd")]
if m4ss_zstd {
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::task::{Context, Poll};

use tokio::io::{AsyncRead, AsyncWrite, BufReader, ReadBuf};

#[derive(Debug)]
struct Maybe<T> {
item: Option<T>,
waker: Option<Waker>,
}

impl<T> Maybe<T> {
fn new(item: Option<T>) -> Self {
Self { item, waker: None }
Self { item }
}

fn take_item(&mut self) -> Option<T> {
self.item.take()
}

fn set_item(&mut self, item: T) {
debug_assert!(self.item.is_none());
self.item = Some(item);
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}

Expand All @@ -446,10 +443,7 @@ impl<const CLIENT_NODE: bool> Handler<CLIENT_NODE> {
) -> Poll<io::Result<()>> {
match &mut self.item {
Some(item) => Pin::new(item).poll_read(cx, buf),
None => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
None => unreachable!(),
}
}
}
Expand All @@ -462,10 +456,7 @@ impl<const CLIENT_NODE: bool> Handler<CLIENT_NODE> {
) -> Poll<io::Result<usize>> {
match &mut self.item {
Some(item) => Pin::new(item).poll_write(cx, buf),
None => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
None => unreachable!(),
}
}

Expand All @@ -475,18 +466,18 @@ impl<const CLIENT_NODE: bool> Handler<CLIENT_NODE> {
) -> Poll<io::Result<()>> {
match &mut self.item {
Some(item) => Pin::new(item).poll_flush(cx),
None => {
self.waker = Some(cx.waker().clone());
Poll::Pending
}
None => unreachable!(),
}
}

fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
match &mut self.item {
Some(item) => Pin::new(item).poll_shutdown(cx),
None => unreachable!(),
}
}
}

Expand Down

0 comments on commit 6b73d56

Please sign in to comment.